Skip to main content

mz_sql/
plan.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL planning.
11//!
12//! SQL planning is the process of taking the abstract syntax tree of a
13//! [`Statement`] and turning it into a [`Plan`] that the dataflow layer can
14//! execute.
15//!
16//! Statements must be purified before they can be planned. See the
17//! [`pure`](crate::pure) module for details.
18
19// Internal module layout.
20//
21// The entry point for planning is `statement::handle_statement`. That function
22// dispatches to a more specific `handle` function for the particular statement
23// type. For most statements, this `handle` function is uninteresting and short,
24// but anything involving a `SELECT` statement gets complicated. `SELECT`
25// queries wind through the functions in the `query` module, starting with
26// `plan_root_query` and fanning out based on the contents of the `SELECT`
27// statement.
28
29use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{CollectionPlan, ColumnOrder, MapFilterProject, MirScalarExpr, RowSetFinishing};
41use mz_ore::now::{self, NOW_ZERO};
42use mz_pgcopy::CopyFormatParams;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
44use mz_repr::explain::{ExplainConfig, ExplainFormat};
45use mz_repr::network_policy_id::NetworkPolicyId;
46use mz_repr::optimize::OptimizerFeatureOverrides;
47use mz_repr::refresh_schedule::RefreshSchedule;
48use mz_repr::role_id::RoleId;
49use mz_repr::{
50    CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, ReprColumnType, Row,
51    SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
52};
53use mz_sql_parser::ast::{
54    AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
55    RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
56    Value, WithOptionValue,
57};
58use mz_ssh_util::keys::SshKeyPair;
59use mz_storage_types::connections::aws::AwsConnection;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::connections::{
62    AwsPrivatelinkConnection, CsrConnection, GlueSchemaRegistryConnection,
63    IcebergCatalogConnection, KafkaConnection, MySqlConnection, PostgresConnection,
64    SqlServerConnectionDetails, SshConnection,
65};
66use mz_storage_types::instances::StorageInstanceId;
67use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
68use mz_storage_types::sources::{
69    SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
70};
71use proptest_derive::Arbitrary;
72use serde::{Deserialize, Serialize};
73
74use crate::ast::{
75    ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
76    TransactionAccessMode,
77};
78use crate::catalog::{
79    CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
80    RoleAttributesRaw,
81};
82use crate::names::{
83    Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
84    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
85};
86
87pub(crate) mod error;
88pub(crate) mod explain;
89pub(crate) mod hir;
90pub(crate) mod literal;
91pub(crate) mod lowering;
92pub(crate) mod notice;
93pub(crate) mod plan_utils;
94pub(crate) mod query;
95pub(crate) mod scope;
96pub(crate) mod side_effecting_func;
97pub(crate) mod statement;
98pub(crate) mod transform_ast;
99pub(crate) mod transform_hir;
100pub(crate) mod typeconv;
101pub(crate) mod with_options;
102
103use crate::plan;
104use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
105use crate::plan::with_options::OptionalDuration;
106pub use error::PlanError;
107pub use explain::normalize_subqueries;
108pub use hir::{
109    AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
110    WindowExprType,
111};
112pub use lowering::Config as HirToMirConfig;
113pub use notice::PlanNotice;
114pub use query::{ExprContext, QueryContext, QueryLifetime};
115pub use scope::Scope;
116pub use side_effecting_func::SideEffectingFunc;
117pub use statement::ddl::{
118    AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
119    PlannedAlterRoleOption, PlannedRoleAttributes, PlannedRoleVariable,
120    SqlServerConfigOptionExtracted,
121};
122pub use statement::{
123    StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
124    resolve_cluster_for_materialized_view,
125};
126
127use self::statement::ddl::ClusterAlterOptionExtracted;
128use self::with_options::TryFromValue;
129
130/// Instructions for executing a SQL query.
131#[derive(Debug, EnumKind)]
132#[enum_kind(PlanKind)]
133pub enum Plan {
134    CreateConnection(CreateConnectionPlan),
135    CreateDatabase(CreateDatabasePlan),
136    CreateSchema(CreateSchemaPlan),
137    CreateRole(CreateRolePlan),
138    CreateCluster(CreateClusterPlan),
139    CreateClusterReplica(CreateClusterReplicaPlan),
140    CreateSource(CreateSourcePlan),
141    CreateSources(Vec<CreateSourcePlanBundle>),
142    CreateSecret(CreateSecretPlan),
143    CreateSink(CreateSinkPlan),
144    CreateTable(CreateTablePlan),
145    CreateView(CreateViewPlan),
146    CreateMaterializedView(CreateMaterializedViewPlan),
147    CreateNetworkPolicy(CreateNetworkPolicyPlan),
148    CreateIndex(CreateIndexPlan),
149    CreateType(CreateTypePlan),
150    Comment(CommentPlan),
151    DiscardTemp,
152    DiscardAll,
153    DropObjects(DropObjectsPlan),
154    DropOwned(DropOwnedPlan),
155    EmptyQuery,
156    ShowAllVariables,
157    ShowCreate(ShowCreatePlan),
158    ShowColumns(ShowColumnsPlan),
159    ShowVariable(ShowVariablePlan),
160    InspectShard(InspectShardPlan),
161    SetVariable(SetVariablePlan),
162    ResetVariable(ResetVariablePlan),
163    SetTransaction(SetTransactionPlan),
164    StartTransaction(StartTransactionPlan),
165    CommitTransaction(CommitTransactionPlan),
166    AbortTransaction(AbortTransactionPlan),
167    Select(SelectPlan),
168    Subscribe(SubscribePlan),
169    CopyFrom(CopyFromPlan),
170    CopyTo(CopyToPlan),
171    ExplainPlan(ExplainPlanPlan),
172    ExplainPushdown(ExplainPushdownPlan),
173    ExplainTimestamp(ExplainTimestampPlan),
174    ExplainSinkSchema(ExplainSinkSchemaPlan),
175    Insert(InsertPlan),
176    AlterCluster(AlterClusterPlan),
177    AlterClusterSwap(AlterClusterSwapPlan),
178    AlterNoop(AlterNoopPlan),
179    AlterSetCluster(AlterSetClusterPlan),
180    AlterConnection(AlterConnectionPlan),
181    AlterSource(AlterSourcePlan),
182    AlterClusterRename(AlterClusterRenamePlan),
183    AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
184    AlterItemRename(AlterItemRenamePlan),
185    AlterSchemaRename(AlterSchemaRenamePlan),
186    AlterSchemaSwap(AlterSchemaSwapPlan),
187    AlterSecret(AlterSecretPlan),
188    AlterSink(AlterSinkPlan),
189    AlterSystemSet(AlterSystemSetPlan),
190    AlterSystemReset(AlterSystemResetPlan),
191    AlterSystemResetAll(AlterSystemResetAllPlan),
192    AlterRole(AlterRolePlan),
193    AlterOwner(AlterOwnerPlan),
194    AlterTableAddColumn(AlterTablePlan),
195    AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementPlan),
196    AlterNetworkPolicy(AlterNetworkPolicyPlan),
197    Declare(DeclarePlan),
198    Fetch(FetchPlan),
199    Close(ClosePlan),
200    ReadThenWrite(ReadThenWritePlan),
201    Prepare(PreparePlan),
202    Execute(ExecutePlan),
203    Deallocate(DeallocatePlan),
204    Raise(RaisePlan),
205    GrantRole(GrantRolePlan),
206    RevokeRole(RevokeRolePlan),
207    GrantPrivileges(GrantPrivilegesPlan),
208    RevokePrivileges(RevokePrivilegesPlan),
209    AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
210    ReassignOwned(ReassignOwnedPlan),
211    SideEffectingFunc(SideEffectingFunc),
212    ValidateConnection(ValidateConnectionPlan),
213    AlterRetainHistory(AlterRetainHistoryPlan),
214    AlterSourceTimestampInterval(AlterSourceTimestampIntervalPlan),
215}
216
217impl Plan {
218    /// Expresses which [`StatementKind`] can generate which set of
219    /// [`PlanKind`].
220    pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
221        match stmt {
222            StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
223            StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
224            StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
225            StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
226            StatementKind::AlterObjectRename => &[
227                PlanKind::AlterClusterRename,
228                PlanKind::AlterClusterReplicaRename,
229                PlanKind::AlterItemRename,
230                PlanKind::AlterSchemaRename,
231                PlanKind::AlterNoop,
232            ],
233            StatementKind::AlterObjectSwap => &[
234                PlanKind::AlterClusterSwap,
235                PlanKind::AlterSchemaSwap,
236                PlanKind::AlterNoop,
237            ],
238            StatementKind::AlterRole => &[PlanKind::AlterRole],
239            StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
240            StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
241            StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
242            StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
243            StatementKind::AlterSource => &[
244                PlanKind::AlterNoop,
245                PlanKind::AlterSource,
246                PlanKind::AlterRetainHistory,
247                PlanKind::AlterSourceTimestampInterval,
248            ],
249            StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
250            StatementKind::AlterSystemResetAll => {
251                &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
252            }
253            StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
254            StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
255            StatementKind::AlterTableAddColumn => {
256                &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
257            }
258            StatementKind::AlterMaterializedViewApplyReplacement => &[
259                PlanKind::AlterNoop,
260                PlanKind::AlterMaterializedViewApplyReplacement,
261            ],
262            StatementKind::Close => &[PlanKind::Close],
263            StatementKind::Comment => &[PlanKind::Comment],
264            StatementKind::Commit => &[PlanKind::CommitTransaction],
265            StatementKind::Copy => &[
266                PlanKind::CopyFrom,
267                PlanKind::Select,
268                PlanKind::Subscribe,
269                PlanKind::CopyTo,
270            ],
271            StatementKind::CreateCluster => &[PlanKind::CreateCluster],
272            StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
273            StatementKind::CreateConnection => &[PlanKind::CreateConnection],
274            StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
275            StatementKind::CreateIndex => &[PlanKind::CreateIndex],
276            StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
277            StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
278            StatementKind::CreateRole => &[PlanKind::CreateRole],
279            StatementKind::CreateSchema => &[PlanKind::CreateSchema],
280            StatementKind::CreateSecret => &[PlanKind::CreateSecret],
281            StatementKind::CreateSink => &[PlanKind::CreateSink],
282            StatementKind::CreateSource | StatementKind::CreateSubsource => {
283                &[PlanKind::CreateSource]
284            }
285            StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
286            StatementKind::CreateTable => &[PlanKind::CreateTable],
287            StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
288            StatementKind::CreateType => &[PlanKind::CreateType],
289            StatementKind::CreateView => &[PlanKind::CreateView],
290            StatementKind::Deallocate => &[PlanKind::Deallocate],
291            StatementKind::Declare => &[PlanKind::Declare],
292            StatementKind::Delete => &[PlanKind::ReadThenWrite],
293            StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
294            StatementKind::DropObjects => &[PlanKind::DropObjects],
295            StatementKind::DropOwned => &[PlanKind::DropOwned],
296            StatementKind::Execute => &[PlanKind::Execute],
297            StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
298            StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
299            StatementKind::ExplainAnalyzeObject => &[PlanKind::Select],
300            StatementKind::ExplainAnalyzeCluster => &[PlanKind::Select],
301            StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
302            StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
303            StatementKind::Fetch => &[PlanKind::Fetch],
304            StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
305            StatementKind::GrantRole => &[PlanKind::GrantRole],
306            StatementKind::Insert => &[PlanKind::Insert],
307            StatementKind::Prepare => &[PlanKind::Prepare],
308            StatementKind::Raise => &[PlanKind::Raise],
309            StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
310            StatementKind::ResetVariable => &[PlanKind::ResetVariable],
311            StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
312            StatementKind::RevokeRole => &[PlanKind::RevokeRole],
313            StatementKind::Rollback => &[PlanKind::AbortTransaction],
314            StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
315            StatementKind::SetTransaction => &[PlanKind::SetTransaction],
316            StatementKind::SetVariable => &[PlanKind::SetVariable],
317            StatementKind::Show => &[
318                PlanKind::Select,
319                PlanKind::ShowVariable,
320                PlanKind::ShowCreate,
321                PlanKind::ShowColumns,
322                PlanKind::ShowAllVariables,
323                PlanKind::InspectShard,
324            ],
325            StatementKind::StartTransaction => &[PlanKind::StartTransaction],
326            StatementKind::Subscribe => &[PlanKind::Subscribe],
327            StatementKind::Update => &[PlanKind::ReadThenWrite],
328            StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
329            StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
330            StatementKind::ExecuteUnitTest => &[],
331        }
332    }
333
334    /// Returns a human readable name of the plan. Meant for use in messages sent back to a user.
335    pub fn name(&self) -> &str {
336        match self {
337            Plan::CreateConnection(_) => "create connection",
338            Plan::CreateDatabase(_) => "create database",
339            Plan::CreateSchema(_) => "create schema",
340            Plan::CreateRole(_) => "create role",
341            Plan::CreateCluster(_) => "create cluster",
342            Plan::CreateClusterReplica(_) => "create cluster replica",
343            Plan::CreateSource(_) => "create source",
344            Plan::CreateSources(_) => "create source",
345            Plan::CreateSecret(_) => "create secret",
346            Plan::CreateSink(_) => "create sink",
347            Plan::CreateTable(_) => "create table",
348            Plan::CreateView(_) => "create view",
349            Plan::CreateMaterializedView(_) => "create materialized view",
350            Plan::CreateIndex(_) => "create index",
351            Plan::CreateType(_) => "create type",
352            Plan::CreateNetworkPolicy(_) => "create network policy",
353            Plan::Comment(_) => "comment",
354            Plan::DiscardTemp => "discard temp",
355            Plan::DiscardAll => "discard all",
356            Plan::DropObjects(plan) => match plan.object_type {
357                ObjectType::Table => "drop table",
358                ObjectType::View => "drop view",
359                ObjectType::MaterializedView => "drop materialized view",
360                ObjectType::Source => "drop source",
361                ObjectType::Sink => "drop sink",
362                ObjectType::Index => "drop index",
363                ObjectType::Type => "drop type",
364                ObjectType::Role => "drop roles",
365                ObjectType::Cluster => "drop clusters",
366                ObjectType::ClusterReplica => "drop cluster replicas",
367                ObjectType::Secret => "drop secret",
368                ObjectType::Connection => "drop connection",
369                ObjectType::Database => "drop database",
370                ObjectType::Schema => "drop schema",
371                ObjectType::Func => "drop function",
372                ObjectType::NetworkPolicy => "drop network policy",
373            },
374            Plan::DropOwned(_) => "drop owned",
375            Plan::EmptyQuery => "do nothing",
376            Plan::ShowAllVariables => "show all variables",
377            Plan::ShowCreate(_) => "show create",
378            Plan::ShowColumns(_) => "show columns",
379            Plan::ShowVariable(_) => "show variable",
380            Plan::InspectShard(_) => "inspect shard",
381            Plan::SetVariable(_) => "set variable",
382            Plan::ResetVariable(_) => "reset variable",
383            Plan::SetTransaction(_) => "set transaction",
384            Plan::StartTransaction(_) => "start transaction",
385            Plan::CommitTransaction(_) => "commit",
386            Plan::AbortTransaction(_) => "abort",
387            Plan::Select(_) => "select",
388            Plan::Subscribe(_) => "subscribe",
389            Plan::CopyFrom(_) => "copy from",
390            Plan::CopyTo(_) => "copy to",
391            Plan::ExplainPlan(_) => "explain plan",
392            Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
393            Plan::ExplainTimestamp(_) => "explain timestamp",
394            Plan::ExplainSinkSchema(_) => "explain schema",
395            Plan::Insert(_) => "insert",
396            Plan::AlterNoop(plan) => match plan.object_type {
397                ObjectType::Table => "alter table",
398                ObjectType::View => "alter view",
399                ObjectType::MaterializedView => "alter materialized view",
400                ObjectType::Source => "alter source",
401                ObjectType::Sink => "alter sink",
402                ObjectType::Index => "alter index",
403                ObjectType::Type => "alter type",
404                ObjectType::Role => "alter role",
405                ObjectType::Cluster => "alter cluster",
406                ObjectType::ClusterReplica => "alter cluster replica",
407                ObjectType::Secret => "alter secret",
408                ObjectType::Connection => "alter connection",
409                ObjectType::Database => "alter database",
410                ObjectType::Schema => "alter schema",
411                ObjectType::Func => "alter function",
412                ObjectType::NetworkPolicy => "alter network policy",
413            },
414            Plan::AlterCluster(_) => "alter cluster",
415            Plan::AlterClusterRename(_) => "alter cluster rename",
416            Plan::AlterClusterSwap(_) => "alter cluster swap",
417            Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
418            Plan::AlterSetCluster(_) => "alter set cluster",
419            Plan::AlterConnection(_) => "alter connection",
420            Plan::AlterSource(_) => "alter source",
421            Plan::AlterItemRename(_) => "rename item",
422            Plan::AlterSchemaRename(_) => "alter rename schema",
423            Plan::AlterSchemaSwap(_) => "alter swap schema",
424            Plan::AlterSecret(_) => "alter secret",
425            Plan::AlterSink(_) => "alter sink",
426            Plan::AlterSystemSet(_) => "alter system",
427            Plan::AlterSystemReset(_) => "alter system",
428            Plan::AlterSystemResetAll(_) => "alter system",
429            Plan::AlterRole(_) => "alter role",
430            Plan::AlterNetworkPolicy(_) => "alter network policy",
431            Plan::AlterOwner(plan) => match plan.object_type {
432                ObjectType::Table => "alter table owner",
433                ObjectType::View => "alter view owner",
434                ObjectType::MaterializedView => "alter materialized view owner",
435                ObjectType::Source => "alter source owner",
436                ObjectType::Sink => "alter sink owner",
437                ObjectType::Index => "alter index owner",
438                ObjectType::Type => "alter type owner",
439                ObjectType::Role => "alter role owner",
440                ObjectType::Cluster => "alter cluster owner",
441                ObjectType::ClusterReplica => "alter cluster replica owner",
442                ObjectType::Secret => "alter secret owner",
443                ObjectType::Connection => "alter connection owner",
444                ObjectType::Database => "alter database owner",
445                ObjectType::Schema => "alter schema owner",
446                ObjectType::Func => "alter function owner",
447                ObjectType::NetworkPolicy => "alter network policy owner",
448            },
449            Plan::AlterTableAddColumn(_) => "alter table add column",
450            Plan::AlterMaterializedViewApplyReplacement(_) => {
451                "alter materialized view apply replacement"
452            }
453            Plan::Declare(_) => "declare",
454            Plan::Fetch(_) => "fetch",
455            Plan::Close(_) => "close",
456            Plan::ReadThenWrite(plan) => match plan.kind {
457                MutationKind::Insert => "insert into select",
458                MutationKind::Update => "update",
459                MutationKind::Delete => "delete",
460            },
461            Plan::Prepare(_) => "prepare",
462            Plan::Execute(_) => "execute",
463            Plan::Deallocate(_) => "deallocate",
464            Plan::Raise(_) => "raise",
465            Plan::GrantRole(_) => "grant role",
466            Plan::RevokeRole(_) => "revoke role",
467            Plan::GrantPrivileges(_) => "grant privilege",
468            Plan::RevokePrivileges(_) => "revoke privilege",
469            Plan::AlterDefaultPrivileges(_) => "alter default privileges",
470            Plan::ReassignOwned(_) => "reassign owned",
471            Plan::SideEffectingFunc(_) => "side effecting func",
472            Plan::ValidateConnection(_) => "validate connection",
473            Plan::AlterRetainHistory(_) => "alter retain history",
474            Plan::AlterSourceTimestampInterval(_) => "alter source timestamp interval",
475        }
476    }
477
478    /// Returns `true` iff this `Plan` is allowed to be executed in read-only
479    /// mode.
480    ///
481    /// We use an explicit allow-list, to avoid future additions automatically
482    /// falling into the `true` category.
483    pub fn allowed_in_read_only(&self) -> bool {
484        match self {
485            // These two set non-durable session variables, so are okay in
486            // read-only mode.
487            Plan::SetVariable(_) => true,
488            Plan::ResetVariable(_) => true,
489            Plan::SetTransaction(_) => true,
490            Plan::StartTransaction(_) => true,
491            Plan::CommitTransaction(_) => true,
492            Plan::AbortTransaction(_) => true,
493            Plan::Select(_) => true,
494            Plan::EmptyQuery => true,
495            Plan::ShowAllVariables => true,
496            Plan::ShowCreate(_) => true,
497            Plan::ShowColumns(_) => true,
498            Plan::ShowVariable(_) => true,
499            Plan::InspectShard(_) => true,
500            Plan::Subscribe(_) => true,
501            Plan::CopyTo(_) => true,
502            Plan::ExplainPlan(_) => true,
503            Plan::ExplainPushdown(_) => true,
504            Plan::ExplainTimestamp(_) => true,
505            Plan::ExplainSinkSchema(_) => true,
506            Plan::ValidateConnection(_) => true,
507            _ => false,
508        }
509    }
510}
511
512#[derive(Debug)]
513pub struct StartTransactionPlan {
514    pub access: Option<TransactionAccessMode>,
515    pub isolation_level: Option<TransactionIsolationLevel>,
516}
517
518#[derive(Debug)]
519pub enum TransactionType {
520    Explicit,
521    Implicit,
522}
523
524impl TransactionType {
525    pub fn is_explicit(&self) -> bool {
526        matches!(self, TransactionType::Explicit)
527    }
528
529    pub fn is_implicit(&self) -> bool {
530        matches!(self, TransactionType::Implicit)
531    }
532}
533
534#[derive(Debug)]
535pub struct CommitTransactionPlan {
536    pub transaction_type: TransactionType,
537}
538
539#[derive(Debug)]
540pub struct AbortTransactionPlan {
541    pub transaction_type: TransactionType,
542}
543
544#[derive(Debug)]
545pub struct CreateDatabasePlan {
546    pub name: String,
547    pub if_not_exists: bool,
548}
549
550#[derive(Debug)]
551pub struct CreateSchemaPlan {
552    pub database_spec: ResolvedDatabaseSpecifier,
553    pub schema_name: String,
554    pub if_not_exists: bool,
555}
556
557#[derive(Debug)]
558pub struct CreateRolePlan {
559    pub name: String,
560    pub attributes: RoleAttributesRaw,
561}
562
563#[derive(Debug, PartialEq, Eq, Clone)]
564pub struct CreateClusterPlan {
565    pub name: String,
566    pub variant: CreateClusterVariant,
567    pub workload_class: Option<String>,
568}
569
570#[derive(Debug, PartialEq, Eq, Clone)]
571pub enum CreateClusterVariant {
572    Managed(CreateClusterManagedPlan),
573    Unmanaged(CreateClusterUnmanagedPlan),
574}
575
576#[derive(Debug, PartialEq, Eq, Clone)]
577pub struct CreateClusterUnmanagedPlan {
578    pub replicas: Vec<(String, ReplicaConfig)>,
579}
580
581#[derive(Debug, PartialEq, Eq, Clone)]
582pub struct CreateClusterManagedPlan {
583    pub replication_factor: u32,
584    pub size: String,
585    pub availability_zones: Vec<String>,
586    pub compute: ComputeReplicaConfig,
587    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
588    pub schedule: ClusterSchedule,
589}
590
591#[derive(Debug)]
592pub struct CreateClusterReplicaPlan {
593    pub cluster_id: ClusterId,
594    pub name: String,
595    pub config: ReplicaConfig,
596}
597
598/// Configuration of introspection for a cluster replica.
599#[derive(
600    Clone,
601    Copy,
602    Debug,
603    Serialize,
604    Deserialize,
605    PartialOrd,
606    Ord,
607    PartialEq,
608    Eq
609)]
610pub struct ComputeReplicaIntrospectionConfig {
611    /// Whether to introspect the introspection.
612    pub debugging: bool,
613    /// The interval at which to introspect.
614    pub interval: Duration,
615}
616
617#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
618pub struct ComputeReplicaConfig {
619    pub introspection: Option<ComputeReplicaIntrospectionConfig>,
620}
621
622#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
623pub enum ReplicaConfig {
624    Unorchestrated {
625        storagectl_addrs: Vec<String>,
626        computectl_addrs: Vec<String>,
627        compute: ComputeReplicaConfig,
628    },
629    Orchestrated {
630        size: String,
631        availability_zone: Option<String>,
632        compute: ComputeReplicaConfig,
633        internal: bool,
634        billed_as: Option<String>,
635    },
636}
637
638#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
639pub enum ClusterSchedule {
640    /// The system won't automatically turn the cluster On or Off.
641    Manual,
642    /// The cluster will be On when a REFRESH materialized view on it needs to refresh.
643    /// `hydration_time_estimate` determines how much time before a refresh to turn the
644    /// cluster On, so that it can rehydrate already before the refresh time.
645    Refresh { hydration_time_estimate: Duration },
646}
647
648impl Default for ClusterSchedule {
649    fn default() -> Self {
650        // (Has to be consistent with `impl Default for ClusterScheduleOptionValue`.)
651        ClusterSchedule::Manual
652    }
653}
654
655#[derive(Debug)]
656pub struct CreateSourcePlan {
657    pub name: QualifiedItemName,
658    pub source: Source,
659    pub if_not_exists: bool,
660    pub timeline: Timeline,
661    // None for subsources, which run on the parent cluster.
662    pub in_cluster: Option<ClusterId>,
663}
664
665#[derive(Clone, Debug, PartialEq, Eq)]
666pub struct SourceReferences {
667    pub updated_at: u64,
668    pub references: Vec<SourceReference>,
669}
670
671/// An available external reference for a source and if possible to retrieve,
672/// any column names it contains.
673#[derive(Clone, Debug, PartialEq, Eq)]
674pub struct SourceReference {
675    pub name: String,
676    pub namespace: Option<String>,
677    pub columns: Vec<String>,
678}
679
680/// A [`CreateSourcePlan`] and the metadata necessary to sequence it.
681#[derive(Debug)]
682pub struct CreateSourcePlanBundle {
683    /// ID of this source in the Catalog.
684    pub item_id: CatalogItemId,
685    /// ID used to reference this source from outside the catalog, e.g. compute.
686    pub global_id: GlobalId,
687    /// Details of the source to create.
688    pub plan: CreateSourcePlan,
689    /// Other catalog objects that are referenced by this source, determined at name resolution.
690    pub resolved_ids: ResolvedIds,
691    /// All the available upstream references for this source.
692    /// Populated for top-level sources that can contain subsources/tables
693    /// and used during sequencing to populate the appropriate catalog fields.
694    pub available_source_references: Option<SourceReferences>,
695}
696
697#[derive(Debug)]
698pub struct CreateConnectionPlan {
699    pub name: QualifiedItemName,
700    pub if_not_exists: bool,
701    pub connection: Connection,
702    pub validate: bool,
703}
704
705#[derive(Debug)]
706pub struct ValidateConnectionPlan {
707    /// ID of the connection in the Catalog.
708    pub id: CatalogItemId,
709    /// The connection to validate.
710    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
711}
712
713#[derive(Debug)]
714pub struct CreateSecretPlan {
715    pub name: QualifiedItemName,
716    pub secret: Secret,
717    pub if_not_exists: bool,
718}
719
720#[derive(Debug)]
721pub struct CreateSinkPlan {
722    pub name: QualifiedItemName,
723    pub sink: Sink,
724    pub with_snapshot: bool,
725    pub if_not_exists: bool,
726    pub in_cluster: ClusterId,
727}
728
729#[derive(Debug)]
730pub struct CreateTablePlan {
731    pub name: QualifiedItemName,
732    pub table: Table,
733    pub if_not_exists: bool,
734}
735
736#[derive(Debug, Clone)]
737pub struct CreateViewPlan {
738    pub name: QualifiedItemName,
739    pub view: View,
740    /// The Catalog objects that this view is replacing, if any.
741    pub replace: Option<CatalogItemId>,
742    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
743    pub drop_ids: Vec<CatalogItemId>,
744    pub if_not_exists: bool,
745    /// True if the view contains an expression that can make the exact column list
746    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
747    pub ambiguous_columns: bool,
748}
749
750#[derive(Debug, Clone)]
751pub struct CreateMaterializedViewPlan {
752    pub name: QualifiedItemName,
753    pub materialized_view: MaterializedView,
754    /// The Catalog objects that this materialized view is replacing, if any.
755    pub replace: Option<CatalogItemId>,
756    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
757    pub drop_ids: Vec<CatalogItemId>,
758    pub if_not_exists: bool,
759    /// True if the materialized view contains an expression that can make the exact column list
760    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
761    pub ambiguous_columns: bool,
762}
763
764#[derive(Debug, Clone)]
765pub struct CreateNetworkPolicyPlan {
766    pub name: String,
767    pub rules: Vec<NetworkPolicyRule>,
768}
769
770#[derive(Debug, Clone)]
771pub struct AlterNetworkPolicyPlan {
772    pub id: NetworkPolicyId,
773    pub name: String,
774    pub rules: Vec<NetworkPolicyRule>,
775}
776
777#[derive(Debug, Clone)]
778pub struct CreateIndexPlan {
779    pub name: QualifiedItemName,
780    pub index: Index,
781    pub if_not_exists: bool,
782}
783
784#[derive(Debug)]
785pub struct CreateTypePlan {
786    pub name: QualifiedItemName,
787    pub typ: Type,
788}
789
790#[derive(Debug)]
791pub struct DropObjectsPlan {
792    /// The IDs of only the objects directly referenced in the `DROP` statement.
793    pub referenced_ids: Vec<ObjectId>,
794    /// All object IDs to drop. Includes `referenced_ids` and all descendants.
795    pub drop_ids: Vec<ObjectId>,
796    /// The type of object that was dropped explicitly in the DROP statement. `ids` may contain
797    /// objects of different types due to CASCADE.
798    pub object_type: ObjectType,
799}
800
801#[derive(Debug)]
802pub struct DropOwnedPlan {
803    /// The role IDs that own the objects.
804    pub role_ids: Vec<RoleId>,
805    /// All object IDs to drop.
806    pub drop_ids: Vec<ObjectId>,
807    /// The privileges to revoke.
808    pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
809    /// The default privileges to revoke.
810    pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
811}
812
813#[derive(Debug)]
814pub struct ShowVariablePlan {
815    pub name: String,
816}
817
818#[derive(Debug)]
819pub struct InspectShardPlan {
820    /// ID of the storage collection to inspect.
821    pub id: GlobalId,
822}
823
824#[derive(Debug)]
825pub struct SetVariablePlan {
826    pub name: String,
827    pub value: VariableValue,
828    pub local: bool,
829}
830
831#[derive(Debug)]
832pub enum VariableValue {
833    Default,
834    Values(Vec<String>),
835}
836
837#[derive(Debug)]
838pub struct ResetVariablePlan {
839    pub name: String,
840}
841
842#[derive(Debug)]
843pub struct SetTransactionPlan {
844    pub local: bool,
845    pub modes: Vec<TransactionMode>,
846}
847
848/// A plan for select statements.
849#[derive(Clone, Debug)]
850pub struct SelectPlan {
851    /// The `SELECT` statement itself. Used for explain/notices, but not otherwise
852    /// load-bearing. Boxed to save stack space.
853    pub select: Option<Box<SelectStatement<Aug>>>,
854    /// The plan as a HIR.
855    pub source: HirRelationExpr,
856    /// At what time should this select happen?
857    pub when: QueryWhen,
858    /// Instructions how to form the result set.
859    pub finishing: RowSetFinishing,
860    /// For `COPY TO STDOUT`, the format to use.
861    pub copy_to: Option<CopyFormat>,
862}
863
864impl SelectPlan {
865    pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
866        let arity = typ.arity();
867        SelectPlan {
868            select: None,
869            source: HirRelationExpr::Constant { rows, typ },
870            when: QueryWhen::Immediately,
871            finishing: RowSetFinishing::trivial(arity),
872            copy_to: None,
873        }
874    }
875}
876
877#[derive(Debug, Clone)]
878pub enum SubscribeOutput {
879    Diffs,
880    WithinTimestampOrderBy {
881        /// We pretend that mz_diff is prepended to the normal columns, making it index 0
882        order_by: Vec<ColumnOrder>,
883    },
884    EnvelopeUpsert {
885        /// Order by with just keys
886        order_by_keys: Vec<ColumnOrder>,
887    },
888    EnvelopeDebezium {
889        /// Order by with just keys
890        order_by_keys: Vec<ColumnOrder>,
891    },
892}
893
894impl SubscribeOutput {
895    pub fn row_order(&self) -> &[ColumnOrder] {
896        match self {
897            SubscribeOutput::Diffs => &[],
898            // This ordering prepends the diff, so its `order_by` field cannot be applied to rows.
899            SubscribeOutput::WithinTimestampOrderBy { .. } => &[],
900            SubscribeOutput::EnvelopeUpsert { order_by_keys } => order_by_keys,
901            SubscribeOutput::EnvelopeDebezium { order_by_keys } => order_by_keys,
902        }
903    }
904}
905
906#[derive(Debug, Clone)]
907pub struct SubscribePlan {
908    pub from: SubscribeFrom,
909    pub with_snapshot: bool,
910    pub when: QueryWhen,
911    pub up_to: Option<Timestamp>,
912    pub copy_to: Option<CopyFormat>,
913    pub emit_progress: bool,
914    pub output: SubscribeOutput,
915}
916
917#[derive(Debug, Clone)]
918pub enum SubscribeFrom {
919    /// ID of the collection to subscribe to.
920    Id(GlobalId),
921    /// Query to subscribe to.
922    Query {
923        expr: HirRelationExpr,
924        desc: RelationDesc,
925    },
926}
927
928impl SubscribeFrom {
929    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
930        match self {
931            SubscribeFrom::Id(id) => BTreeSet::from([*id]),
932            SubscribeFrom::Query { expr, .. } => expr.depends_on(),
933        }
934    }
935
936    pub fn contains_temporal(&self) -> bool {
937        match self {
938            SubscribeFrom::Id(_) => false,
939            SubscribeFrom::Query { expr, .. } => expr
940                .contains_temporal()
941                .expect("Unexpected error in `visit_scalars` call"),
942        }
943    }
944}
945
946#[derive(Debug)]
947pub struct ShowCreatePlan {
948    pub id: ObjectId,
949    pub row: Row,
950}
951
952#[derive(Debug)]
953pub struct ShowColumnsPlan {
954    pub id: CatalogItemId,
955    pub select_plan: SelectPlan,
956    pub new_resolved_ids: ResolvedIds,
957}
958
959#[derive(Debug)]
960pub struct CopyFromPlan {
961    /// Table we're copying into.
962    pub target_id: CatalogItemId,
963    /// Human-readable full name of the target table.
964    pub target_name: String,
965    /// Source we're copying data from.
966    pub source: CopyFromSource,
967    /// How input columns map to those on the destination table.
968    ///
969    /// TODO(cf2): Remove this field in favor of the mfp.
970    pub columns: Vec<ColumnIndex>,
971    /// [`RelationDesc`] describing the input data.
972    pub source_desc: RelationDesc,
973    /// Changes the shape of the input data to match the destination table.
974    pub mfp: MapFilterProject,
975    /// Format specific params for copying the input data.
976    pub params: CopyFormatParams<'static>,
977    /// Filter for the source files we're copying from, e.g. an S3 prefix.
978    pub filter: Option<CopyFromFilter>,
979}
980
981#[derive(Debug)]
982pub enum CopyFromSource {
983    /// Copying from a file local to the user, transmitted via pgwire.
984    Stdin,
985    /// A remote resource, e.g. HTTP file.
986    ///
987    /// The contained [`HirScalarExpr`] evaluates to the Url for the remote resource.
988    Url(HirScalarExpr),
989    /// A file in an S3 bucket.
990    AwsS3 {
991        /// Expression that evaluates to the file we want to copy.
992        uri: HirScalarExpr,
993        /// Details for how we connect to AWS S3.
994        connection: AwsConnection,
995        /// ID of the connection object.
996        connection_id: CatalogItemId,
997    },
998}
999
1000#[derive(Debug)]
1001pub enum CopyFromFilter {
1002    Files(Vec<String>),
1003    Pattern(String),
1004}
1005
1006/// `COPY TO S3`
1007///
1008/// (This is a completely different thing from `COPY TO STDOUT`. That is a `Plan::Select` with
1009/// `copy_to` set.)
1010#[derive(Debug, Clone)]
1011pub struct CopyToPlan {
1012    /// The select query plan whose data will be copied to destination uri.
1013    pub select_plan: SelectPlan,
1014    pub desc: RelationDesc,
1015    /// The scalar expression to be resolved to get the destination uri.
1016    pub to: HirScalarExpr,
1017    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
1018    /// The ID of the connection.
1019    pub connection_id: CatalogItemId,
1020    pub format: S3SinkFormat,
1021    pub max_file_size: u64,
1022}
1023
1024#[derive(Clone, Debug)]
1025pub struct ExplainPlanPlan {
1026    pub stage: ExplainStage,
1027    pub format: ExplainFormat,
1028    pub config: ExplainConfig,
1029    pub explainee: Explainee,
1030}
1031
1032/// The type of object to be explained
1033#[derive(Clone, Debug)]
1034pub enum Explainee {
1035    /// Lookup and explain a plan saved for an view.
1036    View(CatalogItemId),
1037    /// Lookup and explain a plan saved for an existing materialized view.
1038    MaterializedView(CatalogItemId),
1039    /// Lookup and explain a plan saved for an existing index.
1040    Index(CatalogItemId),
1041    /// Replan an existing view.
1042    ReplanView(CatalogItemId),
1043    /// Replan an existing materialized view.
1044    ReplanMaterializedView(CatalogItemId),
1045    /// Replan an existing index.
1046    ReplanIndex(CatalogItemId),
1047    /// A SQL statement.
1048    Statement(ExplaineeStatement),
1049}
1050
1051/// Explainee types that are statements.
1052#[derive(Clone, Debug, EnumKind)]
1053#[enum_kind(ExplaineeStatementKind)]
1054pub enum ExplaineeStatement {
1055    /// The object to be explained is a SELECT statement.
1056    Select {
1057        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1058        broken: bool,
1059        plan: plan::SelectPlan,
1060        desc: RelationDesc,
1061    },
1062    /// The object to be explained is a CREATE VIEW.
1063    CreateView {
1064        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1065        broken: bool,
1066        plan: plan::CreateViewPlan,
1067    },
1068    /// The object to be explained is a CREATE MATERIALIZED VIEW.
1069    CreateMaterializedView {
1070        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1071        broken: bool,
1072        plan: plan::CreateMaterializedViewPlan,
1073    },
1074    /// The object to be explained is a CREATE INDEX.
1075    CreateIndex {
1076        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1077        broken: bool,
1078        plan: plan::CreateIndexPlan,
1079    },
1080    /// The object to be explained is a SUBSCRIBE statement.
1081    Subscribe {
1082        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1083        broken: bool,
1084        plan: plan::SubscribePlan,
1085    },
1086}
1087
1088impl ExplaineeStatement {
1089    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1090        match self {
1091            Self::Select { plan, .. } => plan.source.depends_on(),
1092            Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1093            Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1094            Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1095            Self::Subscribe { plan, .. } => plan.from.depends_on(),
1096        }
1097    }
1098
1099    /// Statements that have their `broken` flag set are expected to cause a
1100    /// panic in the optimizer code. In this case:
1101    ///
1102    /// 1. The optimizer pipeline execution will stop, but the panic will be
1103    ///    intercepted and will not propagate to the caller. The partial
1104    ///    optimizer trace collected until this point will be available.
1105    /// 2. The optimizer trace tracing subscriber will delegate regular tracing
1106    ///    spans and events to the default subscriber.
1107    ///
1108    /// This is useful when debugging queries that cause panics.
1109    pub fn broken(&self) -> bool {
1110        match self {
1111            Self::Select { broken, .. } => *broken,
1112            Self::CreateView { broken, .. } => *broken,
1113            Self::CreateMaterializedView { broken, .. } => *broken,
1114            Self::CreateIndex { broken, .. } => *broken,
1115            Self::Subscribe { broken, .. } => *broken,
1116        }
1117    }
1118}
1119
1120impl ExplaineeStatementKind {
1121    pub fn supports(&self, stage: &ExplainStage) -> bool {
1122        use ExplainStage::*;
1123        match self {
1124            Self::Select => true,
1125            Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1126            Self::CreateMaterializedView => true,
1127            Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1128            // SUBSCRIBE doesn't support RAW, DECORRELATED, or LOCAL stages because
1129            // it takes MIR directly rather than going through HIR lowering.
1130            Self::Subscribe => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1131        }
1132    }
1133}
1134
1135impl std::fmt::Display for ExplaineeStatementKind {
1136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1137        match self {
1138            Self::Select => write!(f, "SELECT"),
1139            Self::CreateView => write!(f, "CREATE VIEW"),
1140            Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1141            Self::CreateIndex => write!(f, "CREATE INDEX"),
1142            Self::Subscribe => write!(f, "SUBSCRIBE"),
1143        }
1144    }
1145}
1146
1147#[derive(Clone, Debug)]
1148pub struct ExplainPushdownPlan {
1149    pub explainee: Explainee,
1150}
1151
1152#[derive(Clone, Debug)]
1153pub struct ExplainTimestampPlan {
1154    pub format: ExplainFormat,
1155    pub raw_plan: HirRelationExpr,
1156    pub when: QueryWhen,
1157}
1158
1159#[derive(Debug)]
1160pub struct ExplainSinkSchemaPlan {
1161    pub sink_from: GlobalId,
1162    pub json_schema: String,
1163}
1164
1165#[derive(Debug)]
1166pub struct SendDiffsPlan {
1167    pub id: CatalogItemId,
1168    pub updates: Vec<(Row, Diff)>,
1169    pub kind: MutationKind,
1170    pub returning: Vec<(Row, NonZeroUsize)>,
1171    pub max_result_size: u64,
1172}
1173
1174#[derive(Debug)]
1175pub struct InsertPlan {
1176    pub id: CatalogItemId,
1177    pub values: HirRelationExpr,
1178    pub returning: Vec<mz_expr::MirScalarExpr>,
1179}
1180
1181#[derive(Debug)]
1182pub struct ReadThenWritePlan {
1183    pub id: CatalogItemId,
1184    pub selection: HirRelationExpr,
1185    pub finishing: RowSetFinishing,
1186    pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1187    pub kind: MutationKind,
1188    pub returning: Vec<mz_expr::MirScalarExpr>,
1189}
1190
1191/// Generated by `ALTER ... IF EXISTS` if the named object did not exist.
1192#[derive(Debug)]
1193pub struct AlterNoopPlan {
1194    pub object_type: ObjectType,
1195}
1196
1197#[derive(Debug)]
1198pub struct AlterSetClusterPlan {
1199    pub id: CatalogItemId,
1200    pub set_cluster: ClusterId,
1201}
1202
1203#[derive(Debug)]
1204pub struct AlterRetainHistoryPlan {
1205    pub id: CatalogItemId,
1206    pub value: Option<Value>,
1207    pub window: CompactionWindow,
1208    pub object_type: ObjectType,
1209}
1210
1211#[derive(Debug)]
1212pub struct AlterSourceTimestampIntervalPlan {
1213    pub id: CatalogItemId,
1214    pub value: Option<Value>,
1215    pub interval: Duration,
1216}
1217
1218#[derive(Debug, Clone)]
1219
1220pub enum AlterOptionParameter<T = String> {
1221    Set(T),
1222    Reset,
1223    Unchanged,
1224}
1225
1226#[derive(Debug)]
1227pub enum AlterConnectionAction {
1228    RotateKeys,
1229    AlterOptions {
1230        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1231        drop_options: BTreeSet<ConnectionOptionName>,
1232        validate: bool,
1233    },
1234}
1235
1236#[derive(Debug)]
1237pub struct AlterConnectionPlan {
1238    pub id: CatalogItemId,
1239    pub action: AlterConnectionAction,
1240}
1241
1242#[derive(Debug)]
1243pub enum AlterSourceAction {
1244    AddSubsourceExports {
1245        subsources: Vec<CreateSourcePlanBundle>,
1246        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1247    },
1248    RefreshReferences {
1249        references: SourceReferences,
1250    },
1251}
1252
1253#[derive(Debug)]
1254pub struct AlterSourcePlan {
1255    pub item_id: CatalogItemId,
1256    pub ingestion_id: GlobalId,
1257    pub action: AlterSourceAction,
1258}
1259
1260#[derive(Debug, Clone)]
1261pub struct AlterSinkPlan {
1262    pub item_id: CatalogItemId,
1263    pub global_id: GlobalId,
1264    pub sink: Sink,
1265    pub with_snapshot: bool,
1266    pub in_cluster: ClusterId,
1267}
1268
1269#[derive(Debug, Clone)]
1270pub struct AlterClusterPlan {
1271    pub id: ClusterId,
1272    pub name: String,
1273    pub options: PlanClusterOption,
1274    pub strategy: AlterClusterPlanStrategy,
1275}
1276
1277#[derive(Debug)]
1278pub struct AlterClusterRenamePlan {
1279    pub id: ClusterId,
1280    pub name: String,
1281    pub to_name: String,
1282}
1283
1284#[derive(Debug)]
1285pub struct AlterClusterReplicaRenamePlan {
1286    pub cluster_id: ClusterId,
1287    pub replica_id: ReplicaId,
1288    pub name: QualifiedReplica,
1289    pub to_name: String,
1290}
1291
1292#[derive(Debug)]
1293pub struct AlterItemRenamePlan {
1294    pub id: CatalogItemId,
1295    pub current_full_name: FullItemName,
1296    pub to_name: String,
1297    pub object_type: ObjectType,
1298}
1299
1300#[derive(Debug)]
1301pub struct AlterSchemaRenamePlan {
1302    pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1303    pub new_schema_name: String,
1304}
1305
1306#[derive(Debug)]
1307pub struct AlterSchemaSwapPlan {
1308    pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1309    pub schema_a_name: String,
1310    pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1311    pub schema_b_name: String,
1312    pub name_temp: String,
1313}
1314
1315#[derive(Debug)]
1316pub struct AlterClusterSwapPlan {
1317    pub id_a: ClusterId,
1318    pub id_b: ClusterId,
1319    pub name_a: String,
1320    pub name_b: String,
1321    pub name_temp: String,
1322}
1323
1324#[derive(Debug)]
1325pub struct AlterSecretPlan {
1326    pub id: CatalogItemId,
1327    pub secret_as: MirScalarExpr,
1328}
1329
1330#[derive(Debug)]
1331pub struct AlterSystemSetPlan {
1332    pub name: String,
1333    pub value: VariableValue,
1334}
1335
1336#[derive(Debug)]
1337pub struct AlterSystemResetPlan {
1338    pub name: String,
1339}
1340
1341#[derive(Debug)]
1342pub struct AlterSystemResetAllPlan {}
1343
1344#[derive(Debug)]
1345pub struct AlterRolePlan {
1346    pub id: RoleId,
1347    pub name: String,
1348    pub option: PlannedAlterRoleOption,
1349}
1350
1351#[derive(Debug)]
1352pub struct AlterOwnerPlan {
1353    pub id: ObjectId,
1354    pub object_type: ObjectType,
1355    pub new_owner: RoleId,
1356}
1357
1358#[derive(Debug)]
1359pub struct AlterTablePlan {
1360    pub relation_id: CatalogItemId,
1361    pub column_name: ColumnName,
1362    pub column_type: SqlColumnType,
1363    pub raw_sql_type: RawDataType,
1364}
1365
1366#[derive(Debug, Clone)]
1367pub struct AlterMaterializedViewApplyReplacementPlan {
1368    pub id: CatalogItemId,
1369    pub replacement_id: CatalogItemId,
1370}
1371
1372#[derive(Debug)]
1373pub struct DeclarePlan {
1374    pub name: String,
1375    pub stmt: Statement<Raw>,
1376    pub sql: String,
1377    pub params: Params,
1378}
1379
1380#[derive(Debug)]
1381pub struct FetchPlan {
1382    pub name: String,
1383    pub count: Option<FetchDirection>,
1384    pub timeout: ExecuteTimeout,
1385}
1386
1387#[derive(Debug)]
1388pub struct ClosePlan {
1389    pub name: String,
1390}
1391
1392#[derive(Debug)]
1393pub struct PreparePlan {
1394    pub name: String,
1395    pub stmt: Statement<Raw>,
1396    pub sql: String,
1397    pub desc: StatementDesc,
1398}
1399
1400#[derive(Debug)]
1401pub struct ExecutePlan {
1402    pub name: String,
1403    pub params: Params,
1404}
1405
1406#[derive(Debug)]
1407pub struct DeallocatePlan {
1408    pub name: Option<String>,
1409}
1410
1411#[derive(Debug)]
1412pub struct RaisePlan {
1413    pub severity: NoticeSeverity,
1414}
1415
1416#[derive(Debug)]
1417pub struct GrantRolePlan {
1418    /// The roles that are gaining members.
1419    pub role_ids: Vec<RoleId>,
1420    /// The roles that will be added to `role_id`.
1421    pub member_ids: Vec<RoleId>,
1422    /// The role that granted the membership.
1423    pub grantor_id: RoleId,
1424}
1425
1426#[derive(Debug)]
1427pub struct RevokeRolePlan {
1428    /// The roles that are losing members.
1429    pub role_ids: Vec<RoleId>,
1430    /// The roles that will be removed from `role_id`.
1431    pub member_ids: Vec<RoleId>,
1432    /// The role that revoked the membership.
1433    pub grantor_id: RoleId,
1434}
1435
1436#[derive(Debug)]
1437pub struct UpdatePrivilege {
1438    /// The privileges being granted/revoked on an object.
1439    pub acl_mode: AclMode,
1440    /// The ID of the object receiving privileges.
1441    pub target_id: SystemObjectId,
1442    /// The role that is granting the privileges.
1443    pub grantor: RoleId,
1444}
1445
1446#[derive(Debug)]
1447pub struct GrantPrivilegesPlan {
1448    /// Description of each privilege being granted.
1449    pub update_privileges: Vec<UpdatePrivilege>,
1450    /// The roles that will granted the privileges.
1451    pub grantees: Vec<RoleId>,
1452}
1453
1454#[derive(Debug)]
1455pub struct RevokePrivilegesPlan {
1456    /// Description of each privilege being revoked.
1457    pub update_privileges: Vec<UpdatePrivilege>,
1458    /// The roles that will have privileges revoked.
1459    pub revokees: Vec<RoleId>,
1460}
1461#[derive(Debug)]
1462pub struct AlterDefaultPrivilegesPlan {
1463    /// Description of objects that match this default privilege.
1464    pub privilege_objects: Vec<DefaultPrivilegeObject>,
1465    /// The privilege to be granted/revoked from the matching objects.
1466    pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1467    /// Whether this is a grant or revoke.
1468    pub is_grant: bool,
1469}
1470
1471#[derive(Debug)]
1472pub struct ReassignOwnedPlan {
1473    /// The roles whose owned objects are being reassigned.
1474    pub old_roles: Vec<RoleId>,
1475    /// The new owner of the objects.
1476    pub new_role: RoleId,
1477    /// All object IDs to reassign.
1478    pub reassign_ids: Vec<ObjectId>,
1479}
1480
1481#[derive(Debug)]
1482pub struct CommentPlan {
1483    /// The object that this comment is associated with.
1484    pub object_id: CommentObjectId,
1485    /// A sub-component of the object that this comment is associated with, e.g. a column.
1486    ///
1487    /// TODO(parkmycar): <https://github.com/MaterializeInc/database-issues/issues/6711>.
1488    pub sub_component: Option<usize>,
1489    /// The comment itself. If `None` that indicates we should clear the existing comment.
1490    pub comment: Option<String>,
1491}
1492
1493#[derive(Clone, Debug)]
1494pub enum TableDataSource {
1495    /// The table owns data created via INSERT/UPDATE/DELETE statements.
1496    TableWrites { defaults: Vec<Expr<Aug>> },
1497
1498    /// The table receives its data from the identified `DataSourceDesc`.
1499    /// This table type does not support INSERT/UPDATE/DELETE statements.
1500    DataSource {
1501        desc: DataSourceDesc,
1502        timeline: Timeline,
1503    },
1504}
1505
1506#[derive(Clone, Debug)]
1507pub struct Table {
1508    pub create_sql: String,
1509    pub desc: VersionedRelationDesc,
1510    pub temporary: bool,
1511    pub compaction_window: Option<CompactionWindow>,
1512    pub data_source: TableDataSource,
1513}
1514
1515#[derive(Clone, Debug)]
1516pub struct Source {
1517    pub create_sql: String,
1518    pub data_source: DataSourceDesc,
1519    pub desc: RelationDesc,
1520    pub compaction_window: Option<CompactionWindow>,
1521}
1522
1523#[derive(Debug, Clone)]
1524pub enum DataSourceDesc {
1525    /// Receives data from an external system.
1526    Ingestion(SourceDesc<ReferencedConnection>),
1527    /// Receives data from an external system.
1528    OldSyntaxIngestion {
1529        desc: SourceDesc<ReferencedConnection>,
1530        // If we're dealing with an old syntax ingestion the progress id will be some other collection
1531        // and the ingestion itself will have the data from a default external reference
1532        progress_subsource: CatalogItemId,
1533        data_config: SourceExportDataConfig<ReferencedConnection>,
1534        details: SourceExportDetails,
1535    },
1536    /// This source receives its data from the identified ingestion,
1537    /// specifically the output identified by `external_reference`.
1538    IngestionExport {
1539        ingestion_id: CatalogItemId,
1540        external_reference: UnresolvedItemName,
1541        details: SourceExportDetails,
1542        data_config: SourceExportDataConfig<ReferencedConnection>,
1543    },
1544    /// Receives data from the source's reclocking/remapping operations.
1545    Progress,
1546    /// Receives data from HTTP post requests.
1547    Webhook {
1548        validate_using: Option<WebhookValidation>,
1549        body_format: WebhookBodyFormat,
1550        headers: WebhookHeaders,
1551        /// Only `Some` when created via `CREATE TABLE ... FROM WEBHOOK`.
1552        cluster_id: Option<StorageInstanceId>,
1553    },
1554}
1555
1556#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1557pub struct WebhookValidation {
1558    /// The expression used to validate a request.
1559    pub expression: MirScalarExpr,
1560    /// Description of the source that will be created.
1561    pub relation_desc: RelationDesc,
1562    /// The column index to provide the request body and whether to provide it as bytes.
1563    pub bodies: Vec<(usize, bool)>,
1564    /// The column index to provide the request headers and whether to provide the values as bytes.
1565    pub headers: Vec<(usize, bool)>,
1566    /// Any secrets that are used in that validation.
1567    pub secrets: Vec<WebhookValidationSecret>,
1568}
1569
1570impl WebhookValidation {
1571    const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1572
1573    /// Attempt to reduce the internal [`MirScalarExpr`] into a simpler expression.
1574    ///
1575    /// The reduction happens on a separate thread, we also only wait for
1576    /// `WebhookValidation::MAX_REDUCE_TIME` before timing out and returning an error.
1577    pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1578        let WebhookValidation {
1579            expression,
1580            relation_desc,
1581            ..
1582        } = self;
1583
1584        // On a different thread, attempt to reduce the expression.
1585        let mut expression_ = expression.clone();
1586        let desc_ = relation_desc.clone();
1587        let reduce_task = mz_ore::task::spawn_blocking(
1588            || "webhook-validation-reduce",
1589            move || {
1590                let repr_col_types: Vec<ReprColumnType> = desc_
1591                    .typ()
1592                    .column_types
1593                    .iter()
1594                    .map(ReprColumnType::from)
1595                    .collect();
1596                expression_.reduce(&repr_col_types);
1597                expression_
1598            },
1599        );
1600
1601        match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1602            Ok(reduced_expr) => {
1603                *expression = reduced_expr;
1604                Ok(())
1605            }
1606            Err(_) => Err("timeout"),
1607        }
1608    }
1609}
1610
1611#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1612pub struct WebhookHeaders {
1613    /// Optionally include a column named `headers` whose content is possibly filtered.
1614    pub header_column: Option<WebhookHeaderFilters>,
1615    /// The column index to provide the specific request header, and whether to provide it as bytes.
1616    pub mapped_headers: BTreeMap<usize, (String, bool)>,
1617}
1618
1619impl WebhookHeaders {
1620    /// Returns the number of columns needed to represent our headers.
1621    pub fn num_columns(&self) -> usize {
1622        let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1623        let mapped_headers = self.mapped_headers.len();
1624
1625        header_column + mapped_headers
1626    }
1627}
1628
1629#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1630pub struct WebhookHeaderFilters {
1631    pub block: BTreeSet<String>,
1632    pub allow: BTreeSet<String>,
1633}
1634
1635#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Arbitrary)]
1636pub enum WebhookBodyFormat {
1637    Json { array: bool },
1638    Bytes,
1639    Text,
1640}
1641
1642impl From<WebhookBodyFormat> for SqlScalarType {
1643    fn from(value: WebhookBodyFormat) -> Self {
1644        match value {
1645            WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1646            WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1647            WebhookBodyFormat::Text => SqlScalarType::String,
1648        }
1649    }
1650}
1651
1652#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1653pub struct WebhookValidationSecret {
1654    /// Identifies the secret by [`CatalogItemId`].
1655    pub id: CatalogItemId,
1656    /// Column index for the expression context that this secret was originally evaluated in.
1657    pub column_idx: usize,
1658    /// Whether or not this secret should be provided to the expression as Bytes or a String.
1659    pub use_bytes: bool,
1660}
1661
1662#[derive(Clone, Debug)]
1663pub struct Connection {
1664    pub create_sql: String,
1665    pub details: ConnectionDetails,
1666}
1667
1668#[derive(Clone, Debug, Serialize)]
1669pub enum ConnectionDetails {
1670    Kafka(KafkaConnection<ReferencedConnection>),
1671    Csr(CsrConnection<ReferencedConnection>),
1672    GlueSchemaRegistry(GlueSchemaRegistryConnection<ReferencedConnection>),
1673    Postgres(PostgresConnection<ReferencedConnection>),
1674    Ssh {
1675        connection: SshConnection,
1676        key_1: SshKey,
1677        key_2: SshKey,
1678    },
1679    Aws(AwsConnection),
1680    AwsPrivatelink(AwsPrivatelinkConnection),
1681    MySql(MySqlConnection<ReferencedConnection>),
1682    SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1683    IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1684}
1685
1686impl ConnectionDetails {
1687    pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1688        match self {
1689            ConnectionDetails::Kafka(c) => {
1690                mz_storage_types::connections::Connection::Kafka(c.clone())
1691            }
1692            ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1693            ConnectionDetails::GlueSchemaRegistry(c) => {
1694                mz_storage_types::connections::Connection::GlueSchemaRegistry(c.clone())
1695            }
1696            ConnectionDetails::Postgres(c) => {
1697                mz_storage_types::connections::Connection::Postgres(c.clone())
1698            }
1699            ConnectionDetails::Ssh { connection, .. } => {
1700                mz_storage_types::connections::Connection::Ssh(connection.clone())
1701            }
1702            ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1703            ConnectionDetails::AwsPrivatelink(c) => {
1704                mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1705            }
1706            ConnectionDetails::MySql(c) => {
1707                mz_storage_types::connections::Connection::MySql(c.clone())
1708            }
1709            ConnectionDetails::SqlServer(c) => {
1710                mz_storage_types::connections::Connection::SqlServer(c.clone())
1711            }
1712            ConnectionDetails::IcebergCatalog(c) => {
1713                mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1714            }
1715        }
1716    }
1717}
1718
1719#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1720pub struct NetworkPolicyRule {
1721    pub name: String,
1722    pub action: NetworkPolicyRuleAction,
1723    pub address: PolicyAddress,
1724    pub direction: NetworkPolicyRuleDirection,
1725}
1726
1727#[derive(
1728    Debug,
1729    Clone,
1730    Serialize,
1731    Deserialize,
1732    PartialEq,
1733    Eq,
1734    Ord,
1735    PartialOrd,
1736    Hash
1737)]
1738pub enum NetworkPolicyRuleAction {
1739    Allow,
1740}
1741
1742impl std::fmt::Display for NetworkPolicyRuleAction {
1743    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1744        match self {
1745            Self::Allow => write!(f, "allow"),
1746        }
1747    }
1748}
1749impl TryFrom<&str> for NetworkPolicyRuleAction {
1750    type Error = PlanError;
1751    fn try_from(value: &str) -> Result<Self, Self::Error> {
1752        match value.to_uppercase().as_str() {
1753            "ALLOW" => Ok(Self::Allow),
1754            _ => Err(PlanError::Unstructured(
1755                "Allow is the only valid option".into(),
1756            )),
1757        }
1758    }
1759}
1760
1761#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1762pub enum NetworkPolicyRuleDirection {
1763    Ingress,
1764}
1765impl std::fmt::Display for NetworkPolicyRuleDirection {
1766    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1767        match self {
1768            Self::Ingress => write!(f, "ingress"),
1769        }
1770    }
1771}
1772impl TryFrom<&str> for NetworkPolicyRuleDirection {
1773    type Error = PlanError;
1774    fn try_from(value: &str) -> Result<Self, Self::Error> {
1775        match value.to_uppercase().as_str() {
1776            "INGRESS" => Ok(Self::Ingress),
1777            _ => Err(PlanError::Unstructured(
1778                "Ingress is the only valid option".into(),
1779            )),
1780        }
1781    }
1782}
1783
1784#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1785pub struct PolicyAddress(pub IpNet);
1786impl std::fmt::Display for PolicyAddress {
1787    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1788        write!(f, "{}", &self.0.to_string())
1789    }
1790}
1791impl From<String> for PolicyAddress {
1792    fn from(value: String) -> Self {
1793        Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1794    }
1795}
1796impl TryFrom<&str> for PolicyAddress {
1797    type Error = PlanError;
1798    fn try_from(value: &str) -> Result<Self, Self::Error> {
1799        let net = IpNet::from_str(value)
1800            .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1801        Ok(Self(net))
1802    }
1803}
1804
1805impl Serialize for PolicyAddress {
1806    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1807    where
1808        S: serde::Serializer,
1809    {
1810        serializer.serialize_str(&format!("{}", &self.0))
1811    }
1812}
1813
1814#[derive(Clone, Debug, Serialize)]
1815pub enum SshKey {
1816    PublicOnly(String),
1817    Both(SshKeyPair),
1818}
1819
1820impl SshKey {
1821    pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1822        match self {
1823            SshKey::PublicOnly(_) => None,
1824            SshKey::Both(key_pair) => Some(key_pair),
1825        }
1826    }
1827
1828    pub fn public_key(&self) -> String {
1829        match self {
1830            SshKey::PublicOnly(s) => s.into(),
1831            SshKey::Both(p) => p.ssh_public_key(),
1832        }
1833    }
1834}
1835
1836#[derive(Clone, Debug)]
1837pub struct Secret {
1838    pub create_sql: String,
1839    pub secret_as: MirScalarExpr,
1840}
1841
1842#[derive(Clone, Debug)]
1843pub struct Sink {
1844    /// Parse-able SQL that is stored durably and defines this sink.
1845    pub create_sql: String,
1846    /// Collection we read into this sink.
1847    pub from: GlobalId,
1848    /// Type of connection to the external service we sink into.
1849    pub connection: StorageSinkConnection<ReferencedConnection>,
1850    // TODO(guswynn): this probably should just be in the `connection`.
1851    pub envelope: SinkEnvelope,
1852    pub version: u64,
1853    pub commit_interval: Option<Duration>,
1854}
1855
1856#[derive(Clone, Debug)]
1857pub struct View {
1858    /// Parse-able SQL that is stored durably and defines this view.
1859    pub create_sql: String,
1860    /// Unoptimized high-level expression from parsing the `create_sql`.
1861    pub expr: HirRelationExpr,
1862    /// All of the catalog objects that are referenced by this view, according to the `expr`.
1863    pub dependencies: DependencyIds,
1864    /// Columns of this view.
1865    pub column_names: Vec<ColumnName>,
1866    /// If this view is created in the temporary schema, e.g. `CREATE TEMPORARY ...`.
1867    pub temporary: bool,
1868}
1869
1870#[derive(Clone, Debug)]
1871pub struct MaterializedView {
1872    /// Parse-able SQL that is stored durably and defines this materialized view.
1873    pub create_sql: String,
1874    /// Unoptimized high-level expression from parsing the `create_sql`.
1875    pub expr: HirRelationExpr,
1876    /// All of the catalog objects that are referenced by this materialized view, according to the `expr`.
1877    pub dependencies: DependencyIds,
1878    /// Columns of this view.
1879    pub column_names: Vec<ColumnName>,
1880    pub replacement_target: Option<CatalogItemId>,
1881    /// Cluster this materialized view will get installed on.
1882    pub cluster_id: ClusterId,
1883    /// If set, only install this materialized view's dataflow on the specified replica.
1884    pub target_replica: Option<ReplicaId>,
1885    pub non_null_assertions: Vec<usize>,
1886    pub compaction_window: Option<CompactionWindow>,
1887    pub refresh_schedule: Option<RefreshSchedule>,
1888    pub as_of: Option<Timestamp>,
1889}
1890
1891#[derive(Clone, Debug)]
1892pub struct Index {
1893    /// Parse-able SQL that is stored durably and defines this index.
1894    pub create_sql: String,
1895    /// Collection this index is on top of.
1896    pub on: GlobalId,
1897    pub keys: Vec<mz_expr::MirScalarExpr>,
1898    pub compaction_window: Option<CompactionWindow>,
1899    pub cluster_id: ClusterId,
1900}
1901
1902#[derive(Clone, Debug)]
1903pub struct Type {
1904    pub create_sql: String,
1905    pub inner: CatalogType<IdReference>,
1906}
1907
1908/// Specifies when a `Peek` or `Subscribe` should occur.
1909#[derive(Deserialize, Clone, Debug, PartialEq)]
1910pub enum QueryWhen {
1911    /// The peek should occur at the latest possible timestamp that allows the
1912    /// peek to complete immediately.
1913    Immediately,
1914    /// The peek should occur at a timestamp that allows the peek to see all
1915    /// data written to tables within Materialize.
1916    FreshestTableWrite,
1917    /// The peek should occur at the timestamp described by the specified
1918    /// expression.
1919    ///
1920    /// The expression may have any type.
1921    AtTimestamp(Timestamp),
1922    /// Same as Immediately, but will also advance to at least the specified
1923    /// expression.
1924    AtLeastTimestamp(Timestamp),
1925}
1926
1927impl QueryWhen {
1928    /// Returns a timestamp to which the candidate must be advanced.
1929    pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1930        match self {
1931            QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1932            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1933        }
1934    }
1935    /// Returns whether the candidate's upper bound is constrained.
1936    /// This is only true for `AtTimestamp` since it is the only variant that
1937    /// specifies a timestamp.
1938    pub fn constrains_upper(&self) -> bool {
1939        match self {
1940            QueryWhen::AtTimestamp(_) => true,
1941            QueryWhen::AtLeastTimestamp(_)
1942            | QueryWhen::Immediately
1943            | QueryWhen::FreshestTableWrite => false,
1944        }
1945    }
1946    /// Returns whether the candidate must be advanced to the since.
1947    pub fn advance_to_since(&self) -> bool {
1948        match self {
1949            QueryWhen::Immediately
1950            | QueryWhen::AtLeastTimestamp(_)
1951            | QueryWhen::FreshestTableWrite => true,
1952            QueryWhen::AtTimestamp(_) => false,
1953        }
1954    }
1955    /// Returns whether the candidate can be advanced to the upper.
1956    pub fn can_advance_to_upper(&self) -> bool {
1957        match self {
1958            QueryWhen::Immediately => true,
1959            QueryWhen::FreshestTableWrite
1960            | QueryWhen::AtTimestamp(_)
1961            | QueryWhen::AtLeastTimestamp(_) => false,
1962        }
1963    }
1964
1965    /// Returns whether the candidate can be advanced to the timeline's timestamp.
1966    pub fn can_advance_to_timeline_ts(&self) -> bool {
1967        match self {
1968            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1969            QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1970        }
1971    }
1972    /// Returns whether the candidate must be advanced to the timeline's timestamp.
1973    pub fn must_advance_to_timeline_ts(&self) -> bool {
1974        match self {
1975            QueryWhen::FreshestTableWrite => true,
1976            QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1977                false
1978            }
1979        }
1980    }
1981    /// Returns whether the selected timestamp should be tracked within the current transaction.
1982    pub fn is_transactional(&self) -> bool {
1983        match self {
1984            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1985            QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1986        }
1987    }
1988}
1989
1990#[derive(Debug, Copy, Clone)]
1991pub enum MutationKind {
1992    Insert,
1993    Update,
1994    Delete,
1995}
1996
1997#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1998pub enum CopyFormat {
1999    Text,
2000    Csv,
2001    Binary,
2002    Parquet,
2003}
2004
2005#[derive(Debug, Copy, Clone)]
2006pub enum ExecuteTimeout {
2007    None,
2008    Seconds(f64),
2009    WaitOnce,
2010}
2011
2012#[derive(Clone, Debug)]
2013pub enum IndexOption {
2014    /// Configures the logical compaction window for an index.
2015    RetainHistory(CompactionWindow),
2016}
2017
2018#[derive(Clone, Debug)]
2019pub enum TableOption {
2020    /// Configures the logical compaction window for a table.
2021    RetainHistory(CompactionWindow),
2022}
2023
2024#[derive(Clone, Debug)]
2025pub struct PlanClusterOption {
2026    pub availability_zones: AlterOptionParameter<Vec<String>>,
2027    pub introspection_debugging: AlterOptionParameter<bool>,
2028    pub introspection_interval: AlterOptionParameter<OptionalDuration>,
2029    pub managed: AlterOptionParameter<bool>,
2030    pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
2031    pub replication_factor: AlterOptionParameter<u32>,
2032    pub size: AlterOptionParameter,
2033    pub schedule: AlterOptionParameter<ClusterSchedule>,
2034    pub workload_class: AlterOptionParameter<Option<String>>,
2035}
2036
2037impl Default for PlanClusterOption {
2038    fn default() -> Self {
2039        Self {
2040            availability_zones: AlterOptionParameter::Unchanged,
2041            introspection_debugging: AlterOptionParameter::Unchanged,
2042            introspection_interval: AlterOptionParameter::Unchanged,
2043            managed: AlterOptionParameter::Unchanged,
2044            replicas: AlterOptionParameter::Unchanged,
2045            replication_factor: AlterOptionParameter::Unchanged,
2046            size: AlterOptionParameter::Unchanged,
2047            schedule: AlterOptionParameter::Unchanged,
2048            workload_class: AlterOptionParameter::Unchanged,
2049        }
2050    }
2051}
2052
2053#[derive(Clone, Debug, PartialEq, Eq)]
2054pub enum AlterClusterPlanStrategy {
2055    None,
2056    For(Duration),
2057    UntilReady {
2058        on_timeout: OnTimeoutAction,
2059        timeout: Duration,
2060    },
2061}
2062
2063#[derive(Clone, Debug, PartialEq, Eq)]
2064pub enum OnTimeoutAction {
2065    Commit,
2066    Rollback,
2067}
2068
2069impl Default for OnTimeoutAction {
2070    fn default() -> Self {
2071        Self::Commit
2072    }
2073}
2074
2075impl TryFrom<&str> for OnTimeoutAction {
2076    type Error = PlanError;
2077    fn try_from(value: &str) -> Result<Self, Self::Error> {
2078        match value.to_uppercase().as_str() {
2079            "COMMIT" => Ok(Self::Commit),
2080            "ROLLBACK" => Ok(Self::Rollback),
2081            _ => Err(PlanError::Unstructured(
2082                "Valid options are COMMIT, ROLLBACK".into(),
2083            )),
2084        }
2085    }
2086}
2087
2088impl AlterClusterPlanStrategy {
2089    pub fn is_none(&self) -> bool {
2090        matches!(self, Self::None)
2091    }
2092    pub fn is_some(&self) -> bool {
2093        !matches!(self, Self::None)
2094    }
2095}
2096
2097impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2098    type Error = PlanError;
2099
2100    fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2101        Ok(match value.wait {
2102            Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2103            Some(ClusterAlterOptionValue::UntilReady(options)) => {
2104                let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2105                Self::UntilReady {
2106                    timeout: match extracted.timeout {
2107                        Some(d) => d,
2108                        None => Err(PlanError::UntilReadyTimeoutRequired)?,
2109                    },
2110                    on_timeout: match extracted.on_timeout {
2111                        Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2112                            PlanError::InvalidOptionValue {
2113                                option_name: "ON TIMEOUT".into(),
2114                                err: Box::new(e),
2115                            }
2116                        })?,
2117                        None => OnTimeoutAction::default(),
2118                    },
2119                }
2120            }
2121            None => Self::None,
2122        })
2123    }
2124}
2125
2126/// A vector of values to which parameter references should be bound.
2127#[derive(Debug, Clone)]
2128pub struct Params {
2129    /// The datums that were provided in the EXECUTE statement.
2130    pub datums: Row,
2131    /// The types of the datums provided in the EXECUTE statement.
2132    pub execute_types: Vec<SqlScalarType>,
2133    /// The types that the prepared statement expects based on its definition.
2134    pub expected_types: Vec<SqlScalarType>,
2135}
2136
2137impl Params {
2138    /// Returns a `Params` with no parameters.
2139    pub fn empty() -> Params {
2140        Params {
2141            datums: Row::pack_slice(&[]),
2142            execute_types: vec![],
2143            expected_types: vec![],
2144        }
2145    }
2146}
2147
2148/// Controls planning of a SQL query.
2149#[derive(
2150    Ord,
2151    PartialOrd,
2152    Clone,
2153    Debug,
2154    Eq,
2155    PartialEq,
2156    Serialize,
2157    Deserialize,
2158    Hash,
2159    Copy
2160)]
2161pub struct PlanContext {
2162    pub wall_time: DateTime<Utc>,
2163    pub ignore_if_exists_errors: bool,
2164}
2165
2166impl PlanContext {
2167    pub fn new(wall_time: DateTime<Utc>) -> Self {
2168        Self {
2169            wall_time,
2170            ignore_if_exists_errors: false,
2171        }
2172    }
2173
2174    /// Return a PlanContext with zero values. This should only be used when
2175    /// planning is required but unused (like in `plan_create_table()`) or in
2176    /// tests.
2177    pub fn zero() -> Self {
2178        PlanContext {
2179            wall_time: now::to_datetime(NOW_ZERO()),
2180            ignore_if_exists_errors: false,
2181        }
2182    }
2183
2184    pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2185        self.ignore_if_exists_errors = value;
2186        self
2187    }
2188}