Skip to main content

mz_sql/
plan.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL planning.
11//!
12//! SQL planning is the process of taking the abstract syntax tree of a
13//! [`Statement`] and turning it into a [`Plan`] that the dataflow layer can
14//! execute.
15//!
16//! Statements must be purified before they can be planned. See the
17//! [`pure`](crate::pure) module for details.
18
19// Internal module layout.
20//
21// The entry point for planning is `statement::handle_statement`. That function
22// dispatches to a more specific `handle` function for the particular statement
23// type. For most statements, this `handle` function is uninteresting and short,
24// but anything involving a `SELECT` statement gets complicated. `SELECT`
25// queries wind through the functions in the `query` module, starting with
26// `plan_root_query` and fanning out based on the contents of the `SELECT`
27// statement.
28
29use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{CollectionPlan, ColumnOrder, MapFilterProject, MirScalarExpr, RowSetFinishing};
41use mz_ore::now::{self, NOW_ZERO};
42use mz_pgcopy::CopyFormatParams;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
44use mz_repr::explain::{ExplainConfig, ExplainFormat};
45use mz_repr::network_policy_id::NetworkPolicyId;
46use mz_repr::optimize::OptimizerFeatureOverrides;
47use mz_repr::refresh_schedule::RefreshSchedule;
48use mz_repr::role_id::RoleId;
49use mz_repr::{
50    CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, ReprColumnType, Row,
51    SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
52};
53use mz_sql_parser::ast::{
54    AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
55    RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
56    Value, WithOptionValue,
57};
58use mz_ssh_util::keys::SshKeyPair;
59use mz_storage_types::connections::aws::AwsConnection;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::connections::{
62    AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
63    MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
64};
65use mz_storage_types::instances::StorageInstanceId;
66use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
67use mz_storage_types::sources::{
68    SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
69};
70use proptest_derive::Arbitrary;
71use serde::{Deserialize, Serialize};
72
73use crate::ast::{
74    ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
75    TransactionAccessMode,
76};
77use crate::catalog::{
78    CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
79    RoleAttributesRaw,
80};
81use crate::names::{
82    Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
83    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
84};
85
86pub(crate) mod error;
87pub(crate) mod explain;
88pub(crate) mod hir;
89pub(crate) mod literal;
90pub(crate) mod lowering;
91pub(crate) mod notice;
92pub(crate) mod plan_utils;
93pub(crate) mod query;
94pub(crate) mod scope;
95pub(crate) mod side_effecting_func;
96pub(crate) mod statement;
97pub(crate) mod transform_ast;
98pub(crate) mod transform_hir;
99pub(crate) mod typeconv;
100pub(crate) mod with_options;
101
102use crate::plan;
103use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
104use crate::plan::with_options::OptionalDuration;
105pub use error::PlanError;
106pub use explain::normalize_subqueries;
107pub use hir::{
108    AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
109    WindowExprType,
110};
111pub use lowering::Config as HirToMirConfig;
112pub use notice::PlanNotice;
113pub use query::{ExprContext, QueryContext, QueryLifetime};
114pub use scope::Scope;
115pub use side_effecting_func::SideEffectingFunc;
116pub use statement::ddl::{
117    AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
118    PlannedAlterRoleOption, PlannedRoleAttributes, PlannedRoleVariable,
119    SqlServerConfigOptionExtracted,
120};
121pub use statement::{
122    StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
123    resolve_cluster_for_materialized_view,
124};
125
126use self::statement::ddl::ClusterAlterOptionExtracted;
127use self::with_options::TryFromValue;
128
129/// Instructions for executing a SQL query.
130#[derive(Debug, EnumKind)]
131#[enum_kind(PlanKind)]
132pub enum Plan {
133    CreateConnection(CreateConnectionPlan),
134    CreateDatabase(CreateDatabasePlan),
135    CreateSchema(CreateSchemaPlan),
136    CreateRole(CreateRolePlan),
137    CreateCluster(CreateClusterPlan),
138    CreateClusterReplica(CreateClusterReplicaPlan),
139    CreateSource(CreateSourcePlan),
140    CreateSources(Vec<CreateSourcePlanBundle>),
141    CreateSecret(CreateSecretPlan),
142    CreateSink(CreateSinkPlan),
143    CreateTable(CreateTablePlan),
144    CreateView(CreateViewPlan),
145    CreateMaterializedView(CreateMaterializedViewPlan),
146    CreateContinualTask(CreateContinualTaskPlan),
147    CreateNetworkPolicy(CreateNetworkPolicyPlan),
148    CreateIndex(CreateIndexPlan),
149    CreateType(CreateTypePlan),
150    Comment(CommentPlan),
151    DiscardTemp,
152    DiscardAll,
153    DropObjects(DropObjectsPlan),
154    DropOwned(DropOwnedPlan),
155    EmptyQuery,
156    ShowAllVariables,
157    ShowCreate(ShowCreatePlan),
158    ShowColumns(ShowColumnsPlan),
159    ShowVariable(ShowVariablePlan),
160    InspectShard(InspectShardPlan),
161    SetVariable(SetVariablePlan),
162    ResetVariable(ResetVariablePlan),
163    SetTransaction(SetTransactionPlan),
164    StartTransaction(StartTransactionPlan),
165    CommitTransaction(CommitTransactionPlan),
166    AbortTransaction(AbortTransactionPlan),
167    Select(SelectPlan),
168    Subscribe(SubscribePlan),
169    CopyFrom(CopyFromPlan),
170    CopyTo(CopyToPlan),
171    ExplainPlan(ExplainPlanPlan),
172    ExplainPushdown(ExplainPushdownPlan),
173    ExplainTimestamp(ExplainTimestampPlan),
174    ExplainSinkSchema(ExplainSinkSchemaPlan),
175    Insert(InsertPlan),
176    AlterCluster(AlterClusterPlan),
177    AlterClusterSwap(AlterClusterSwapPlan),
178    AlterNoop(AlterNoopPlan),
179    AlterSetCluster(AlterSetClusterPlan),
180    AlterConnection(AlterConnectionPlan),
181    AlterSource(AlterSourcePlan),
182    AlterClusterRename(AlterClusterRenamePlan),
183    AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
184    AlterItemRename(AlterItemRenamePlan),
185    AlterSchemaRename(AlterSchemaRenamePlan),
186    AlterSchemaSwap(AlterSchemaSwapPlan),
187    AlterSecret(AlterSecretPlan),
188    AlterSink(AlterSinkPlan),
189    AlterSystemSet(AlterSystemSetPlan),
190    AlterSystemReset(AlterSystemResetPlan),
191    AlterSystemResetAll(AlterSystemResetAllPlan),
192    AlterRole(AlterRolePlan),
193    AlterOwner(AlterOwnerPlan),
194    AlterTableAddColumn(AlterTablePlan),
195    AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementPlan),
196    AlterNetworkPolicy(AlterNetworkPolicyPlan),
197    Declare(DeclarePlan),
198    Fetch(FetchPlan),
199    Close(ClosePlan),
200    ReadThenWrite(ReadThenWritePlan),
201    Prepare(PreparePlan),
202    Execute(ExecutePlan),
203    Deallocate(DeallocatePlan),
204    Raise(RaisePlan),
205    GrantRole(GrantRolePlan),
206    RevokeRole(RevokeRolePlan),
207    GrantPrivileges(GrantPrivilegesPlan),
208    RevokePrivileges(RevokePrivilegesPlan),
209    AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
210    ReassignOwned(ReassignOwnedPlan),
211    SideEffectingFunc(SideEffectingFunc),
212    ValidateConnection(ValidateConnectionPlan),
213    AlterRetainHistory(AlterRetainHistoryPlan),
214    AlterSourceTimestampInterval(AlterSourceTimestampIntervalPlan),
215}
216
217impl Plan {
218    /// Expresses which [`StatementKind`] can generate which set of
219    /// [`PlanKind`].
220    pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
221        match stmt {
222            StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
223            StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
224            StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
225            StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
226            StatementKind::AlterObjectRename => &[
227                PlanKind::AlterClusterRename,
228                PlanKind::AlterClusterReplicaRename,
229                PlanKind::AlterItemRename,
230                PlanKind::AlterSchemaRename,
231                PlanKind::AlterNoop,
232            ],
233            StatementKind::AlterObjectSwap => &[
234                PlanKind::AlterClusterSwap,
235                PlanKind::AlterSchemaSwap,
236                PlanKind::AlterNoop,
237            ],
238            StatementKind::AlterRole => &[PlanKind::AlterRole],
239            StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
240            StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
241            StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
242            StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
243            StatementKind::AlterSource => &[
244                PlanKind::AlterNoop,
245                PlanKind::AlterSource,
246                PlanKind::AlterRetainHistory,
247                PlanKind::AlterSourceTimestampInterval,
248            ],
249            StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
250            StatementKind::AlterSystemResetAll => {
251                &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
252            }
253            StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
254            StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
255            StatementKind::AlterTableAddColumn => {
256                &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
257            }
258            StatementKind::AlterMaterializedViewApplyReplacement => &[
259                PlanKind::AlterNoop,
260                PlanKind::AlterMaterializedViewApplyReplacement,
261            ],
262            StatementKind::Close => &[PlanKind::Close],
263            StatementKind::Comment => &[PlanKind::Comment],
264            StatementKind::Commit => &[PlanKind::CommitTransaction],
265            StatementKind::Copy => &[
266                PlanKind::CopyFrom,
267                PlanKind::Select,
268                PlanKind::Subscribe,
269                PlanKind::CopyTo,
270            ],
271            StatementKind::CreateCluster => &[PlanKind::CreateCluster],
272            StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
273            StatementKind::CreateConnection => &[PlanKind::CreateConnection],
274            StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
275            StatementKind::CreateIndex => &[PlanKind::CreateIndex],
276            StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
277            StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
278            StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
279            StatementKind::CreateRole => &[PlanKind::CreateRole],
280            StatementKind::CreateSchema => &[PlanKind::CreateSchema],
281            StatementKind::CreateSecret => &[PlanKind::CreateSecret],
282            StatementKind::CreateSink => &[PlanKind::CreateSink],
283            StatementKind::CreateSource | StatementKind::CreateSubsource => {
284                &[PlanKind::CreateSource]
285            }
286            StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
287            StatementKind::CreateTable => &[PlanKind::CreateTable],
288            StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
289            StatementKind::CreateType => &[PlanKind::CreateType],
290            StatementKind::CreateView => &[PlanKind::CreateView],
291            StatementKind::Deallocate => &[PlanKind::Deallocate],
292            StatementKind::Declare => &[PlanKind::Declare],
293            StatementKind::Delete => &[PlanKind::ReadThenWrite],
294            StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
295            StatementKind::DropObjects => &[PlanKind::DropObjects],
296            StatementKind::DropOwned => &[PlanKind::DropOwned],
297            StatementKind::Execute => &[PlanKind::Execute],
298            StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
299            StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
300            StatementKind::ExplainAnalyzeObject => &[PlanKind::Select],
301            StatementKind::ExplainAnalyzeCluster => &[PlanKind::Select],
302            StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
303            StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
304            StatementKind::Fetch => &[PlanKind::Fetch],
305            StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
306            StatementKind::GrantRole => &[PlanKind::GrantRole],
307            StatementKind::Insert => &[PlanKind::Insert],
308            StatementKind::Prepare => &[PlanKind::Prepare],
309            StatementKind::Raise => &[PlanKind::Raise],
310            StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
311            StatementKind::ResetVariable => &[PlanKind::ResetVariable],
312            StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
313            StatementKind::RevokeRole => &[PlanKind::RevokeRole],
314            StatementKind::Rollback => &[PlanKind::AbortTransaction],
315            StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
316            StatementKind::SetTransaction => &[PlanKind::SetTransaction],
317            StatementKind::SetVariable => &[PlanKind::SetVariable],
318            StatementKind::Show => &[
319                PlanKind::Select,
320                PlanKind::ShowVariable,
321                PlanKind::ShowCreate,
322                PlanKind::ShowColumns,
323                PlanKind::ShowAllVariables,
324                PlanKind::InspectShard,
325            ],
326            StatementKind::StartTransaction => &[PlanKind::StartTransaction],
327            StatementKind::Subscribe => &[PlanKind::Subscribe],
328            StatementKind::Update => &[PlanKind::ReadThenWrite],
329            StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
330            StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
331        }
332    }
333
334    /// Returns a human readable name of the plan. Meant for use in messages sent back to a user.
335    pub fn name(&self) -> &str {
336        match self {
337            Plan::CreateConnection(_) => "create connection",
338            Plan::CreateDatabase(_) => "create database",
339            Plan::CreateSchema(_) => "create schema",
340            Plan::CreateRole(_) => "create role",
341            Plan::CreateCluster(_) => "create cluster",
342            Plan::CreateClusterReplica(_) => "create cluster replica",
343            Plan::CreateSource(_) => "create source",
344            Plan::CreateSources(_) => "create source",
345            Plan::CreateSecret(_) => "create secret",
346            Plan::CreateSink(_) => "create sink",
347            Plan::CreateTable(_) => "create table",
348            Plan::CreateView(_) => "create view",
349            Plan::CreateMaterializedView(_) => "create materialized view",
350            Plan::CreateContinualTask(_) => "create continual task",
351            Plan::CreateIndex(_) => "create index",
352            Plan::CreateType(_) => "create type",
353            Plan::CreateNetworkPolicy(_) => "create network policy",
354            Plan::Comment(_) => "comment",
355            Plan::DiscardTemp => "discard temp",
356            Plan::DiscardAll => "discard all",
357            Plan::DropObjects(plan) => match plan.object_type {
358                ObjectType::Table => "drop table",
359                ObjectType::View => "drop view",
360                ObjectType::MaterializedView => "drop materialized view",
361                ObjectType::Source => "drop source",
362                ObjectType::Sink => "drop sink",
363                ObjectType::Index => "drop index",
364                ObjectType::Type => "drop type",
365                ObjectType::Role => "drop roles",
366                ObjectType::Cluster => "drop clusters",
367                ObjectType::ClusterReplica => "drop cluster replicas",
368                ObjectType::Secret => "drop secret",
369                ObjectType::Connection => "drop connection",
370                ObjectType::Database => "drop database",
371                ObjectType::Schema => "drop schema",
372                ObjectType::Func => "drop function",
373                ObjectType::ContinualTask => "drop continual task",
374                ObjectType::NetworkPolicy => "drop network policy",
375            },
376            Plan::DropOwned(_) => "drop owned",
377            Plan::EmptyQuery => "do nothing",
378            Plan::ShowAllVariables => "show all variables",
379            Plan::ShowCreate(_) => "show create",
380            Plan::ShowColumns(_) => "show columns",
381            Plan::ShowVariable(_) => "show variable",
382            Plan::InspectShard(_) => "inspect shard",
383            Plan::SetVariable(_) => "set variable",
384            Plan::ResetVariable(_) => "reset variable",
385            Plan::SetTransaction(_) => "set transaction",
386            Plan::StartTransaction(_) => "start transaction",
387            Plan::CommitTransaction(_) => "commit",
388            Plan::AbortTransaction(_) => "abort",
389            Plan::Select(_) => "select",
390            Plan::Subscribe(_) => "subscribe",
391            Plan::CopyFrom(_) => "copy from",
392            Plan::CopyTo(_) => "copy to",
393            Plan::ExplainPlan(_) => "explain plan",
394            Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
395            Plan::ExplainTimestamp(_) => "explain timestamp",
396            Plan::ExplainSinkSchema(_) => "explain schema",
397            Plan::Insert(_) => "insert",
398            Plan::AlterNoop(plan) => match plan.object_type {
399                ObjectType::Table => "alter table",
400                ObjectType::View => "alter view",
401                ObjectType::MaterializedView => "alter materialized view",
402                ObjectType::Source => "alter source",
403                ObjectType::Sink => "alter sink",
404                ObjectType::Index => "alter index",
405                ObjectType::Type => "alter type",
406                ObjectType::Role => "alter role",
407                ObjectType::Cluster => "alter cluster",
408                ObjectType::ClusterReplica => "alter cluster replica",
409                ObjectType::Secret => "alter secret",
410                ObjectType::Connection => "alter connection",
411                ObjectType::Database => "alter database",
412                ObjectType::Schema => "alter schema",
413                ObjectType::Func => "alter function",
414                ObjectType::ContinualTask => "alter continual task",
415                ObjectType::NetworkPolicy => "alter network policy",
416            },
417            Plan::AlterCluster(_) => "alter cluster",
418            Plan::AlterClusterRename(_) => "alter cluster rename",
419            Plan::AlterClusterSwap(_) => "alter cluster swap",
420            Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
421            Plan::AlterSetCluster(_) => "alter set cluster",
422            Plan::AlterConnection(_) => "alter connection",
423            Plan::AlterSource(_) => "alter source",
424            Plan::AlterItemRename(_) => "rename item",
425            Plan::AlterSchemaRename(_) => "alter rename schema",
426            Plan::AlterSchemaSwap(_) => "alter swap schema",
427            Plan::AlterSecret(_) => "alter secret",
428            Plan::AlterSink(_) => "alter sink",
429            Plan::AlterSystemSet(_) => "alter system",
430            Plan::AlterSystemReset(_) => "alter system",
431            Plan::AlterSystemResetAll(_) => "alter system",
432            Plan::AlterRole(_) => "alter role",
433            Plan::AlterNetworkPolicy(_) => "alter network policy",
434            Plan::AlterOwner(plan) => match plan.object_type {
435                ObjectType::Table => "alter table owner",
436                ObjectType::View => "alter view owner",
437                ObjectType::MaterializedView => "alter materialized view owner",
438                ObjectType::Source => "alter source owner",
439                ObjectType::Sink => "alter sink owner",
440                ObjectType::Index => "alter index owner",
441                ObjectType::Type => "alter type owner",
442                ObjectType::Role => "alter role owner",
443                ObjectType::Cluster => "alter cluster owner",
444                ObjectType::ClusterReplica => "alter cluster replica owner",
445                ObjectType::Secret => "alter secret owner",
446                ObjectType::Connection => "alter connection owner",
447                ObjectType::Database => "alter database owner",
448                ObjectType::Schema => "alter schema owner",
449                ObjectType::Func => "alter function owner",
450                ObjectType::ContinualTask => "alter continual task owner",
451                ObjectType::NetworkPolicy => "alter network policy owner",
452            },
453            Plan::AlterTableAddColumn(_) => "alter table add column",
454            Plan::AlterMaterializedViewApplyReplacement(_) => {
455                "alter materialized view apply replacement"
456            }
457            Plan::Declare(_) => "declare",
458            Plan::Fetch(_) => "fetch",
459            Plan::Close(_) => "close",
460            Plan::ReadThenWrite(plan) => match plan.kind {
461                MutationKind::Insert => "insert into select",
462                MutationKind::Update => "update",
463                MutationKind::Delete => "delete",
464            },
465            Plan::Prepare(_) => "prepare",
466            Plan::Execute(_) => "execute",
467            Plan::Deallocate(_) => "deallocate",
468            Plan::Raise(_) => "raise",
469            Plan::GrantRole(_) => "grant role",
470            Plan::RevokeRole(_) => "revoke role",
471            Plan::GrantPrivileges(_) => "grant privilege",
472            Plan::RevokePrivileges(_) => "revoke privilege",
473            Plan::AlterDefaultPrivileges(_) => "alter default privileges",
474            Plan::ReassignOwned(_) => "reassign owned",
475            Plan::SideEffectingFunc(_) => "side effecting func",
476            Plan::ValidateConnection(_) => "validate connection",
477            Plan::AlterRetainHistory(_) => "alter retain history",
478            Plan::AlterSourceTimestampInterval(_) => "alter source timestamp interval",
479        }
480    }
481
482    /// Returns `true` iff this `Plan` is allowed to be executed in read-only
483    /// mode.
484    ///
485    /// We use an explicit allow-list, to avoid future additions automatically
486    /// falling into the `true` category.
487    pub fn allowed_in_read_only(&self) -> bool {
488        match self {
489            // These two set non-durable session variables, so are okay in
490            // read-only mode.
491            Plan::SetVariable(_) => true,
492            Plan::ResetVariable(_) => true,
493            Plan::SetTransaction(_) => true,
494            Plan::StartTransaction(_) => true,
495            Plan::CommitTransaction(_) => true,
496            Plan::AbortTransaction(_) => true,
497            Plan::Select(_) => true,
498            Plan::EmptyQuery => true,
499            Plan::ShowAllVariables => true,
500            Plan::ShowCreate(_) => true,
501            Plan::ShowColumns(_) => true,
502            Plan::ShowVariable(_) => true,
503            Plan::InspectShard(_) => true,
504            Plan::Subscribe(_) => true,
505            Plan::CopyTo(_) => true,
506            Plan::ExplainPlan(_) => true,
507            Plan::ExplainPushdown(_) => true,
508            Plan::ExplainTimestamp(_) => true,
509            Plan::ExplainSinkSchema(_) => true,
510            Plan::ValidateConnection(_) => true,
511            _ => false,
512        }
513    }
514}
515
516#[derive(Debug)]
517pub struct StartTransactionPlan {
518    pub access: Option<TransactionAccessMode>,
519    pub isolation_level: Option<TransactionIsolationLevel>,
520}
521
522#[derive(Debug)]
523pub enum TransactionType {
524    Explicit,
525    Implicit,
526}
527
528impl TransactionType {
529    pub fn is_explicit(&self) -> bool {
530        matches!(self, TransactionType::Explicit)
531    }
532
533    pub fn is_implicit(&self) -> bool {
534        matches!(self, TransactionType::Implicit)
535    }
536}
537
538#[derive(Debug)]
539pub struct CommitTransactionPlan {
540    pub transaction_type: TransactionType,
541}
542
543#[derive(Debug)]
544pub struct AbortTransactionPlan {
545    pub transaction_type: TransactionType,
546}
547
548#[derive(Debug)]
549pub struct CreateDatabasePlan {
550    pub name: String,
551    pub if_not_exists: bool,
552}
553
554#[derive(Debug)]
555pub struct CreateSchemaPlan {
556    pub database_spec: ResolvedDatabaseSpecifier,
557    pub schema_name: String,
558    pub if_not_exists: bool,
559}
560
561#[derive(Debug)]
562pub struct CreateRolePlan {
563    pub name: String,
564    pub attributes: RoleAttributesRaw,
565}
566
567#[derive(Debug, PartialEq, Eq, Clone)]
568pub struct CreateClusterPlan {
569    pub name: String,
570    pub variant: CreateClusterVariant,
571    pub workload_class: Option<String>,
572}
573
574#[derive(Debug, PartialEq, Eq, Clone)]
575pub enum CreateClusterVariant {
576    Managed(CreateClusterManagedPlan),
577    Unmanaged(CreateClusterUnmanagedPlan),
578}
579
580#[derive(Debug, PartialEq, Eq, Clone)]
581pub struct CreateClusterUnmanagedPlan {
582    pub replicas: Vec<(String, ReplicaConfig)>,
583}
584
585#[derive(Debug, PartialEq, Eq, Clone)]
586pub struct CreateClusterManagedPlan {
587    pub replication_factor: u32,
588    pub size: String,
589    pub availability_zones: Vec<String>,
590    pub compute: ComputeReplicaConfig,
591    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
592    pub schedule: ClusterSchedule,
593}
594
595#[derive(Debug)]
596pub struct CreateClusterReplicaPlan {
597    pub cluster_id: ClusterId,
598    pub name: String,
599    pub config: ReplicaConfig,
600}
601
602/// Configuration of introspection for a cluster replica.
603#[derive(
604    Clone,
605    Copy,
606    Debug,
607    Serialize,
608    Deserialize,
609    PartialOrd,
610    Ord,
611    PartialEq,
612    Eq
613)]
614pub struct ComputeReplicaIntrospectionConfig {
615    /// Whether to introspect the introspection.
616    pub debugging: bool,
617    /// The interval at which to introspect.
618    pub interval: Duration,
619}
620
621#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
622pub struct ComputeReplicaConfig {
623    pub introspection: Option<ComputeReplicaIntrospectionConfig>,
624}
625
626#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
627pub enum ReplicaConfig {
628    Unorchestrated {
629        storagectl_addrs: Vec<String>,
630        computectl_addrs: Vec<String>,
631        compute: ComputeReplicaConfig,
632    },
633    Orchestrated {
634        size: String,
635        availability_zone: Option<String>,
636        compute: ComputeReplicaConfig,
637        internal: bool,
638        billed_as: Option<String>,
639    },
640}
641
642#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
643pub enum ClusterSchedule {
644    /// The system won't automatically turn the cluster On or Off.
645    Manual,
646    /// The cluster will be On when a REFRESH materialized view on it needs to refresh.
647    /// `hydration_time_estimate` determines how much time before a refresh to turn the
648    /// cluster On, so that it can rehydrate already before the refresh time.
649    Refresh { hydration_time_estimate: Duration },
650}
651
652impl Default for ClusterSchedule {
653    fn default() -> Self {
654        // (Has to be consistent with `impl Default for ClusterScheduleOptionValue`.)
655        ClusterSchedule::Manual
656    }
657}
658
659#[derive(Debug)]
660pub struct CreateSourcePlan {
661    pub name: QualifiedItemName,
662    pub source: Source,
663    pub if_not_exists: bool,
664    pub timeline: Timeline,
665    // None for subsources, which run on the parent cluster.
666    pub in_cluster: Option<ClusterId>,
667}
668
669#[derive(Clone, Debug, PartialEq, Eq)]
670pub struct SourceReferences {
671    pub updated_at: u64,
672    pub references: Vec<SourceReference>,
673}
674
675/// An available external reference for a source and if possible to retrieve,
676/// any column names it contains.
677#[derive(Clone, Debug, PartialEq, Eq)]
678pub struct SourceReference {
679    pub name: String,
680    pub namespace: Option<String>,
681    pub columns: Vec<String>,
682}
683
684/// A [`CreateSourcePlan`] and the metadata necessary to sequence it.
685#[derive(Debug)]
686pub struct CreateSourcePlanBundle {
687    /// ID of this source in the Catalog.
688    pub item_id: CatalogItemId,
689    /// ID used to reference this source from outside the catalog, e.g. compute.
690    pub global_id: GlobalId,
691    /// Details of the source to create.
692    pub plan: CreateSourcePlan,
693    /// Other catalog objects that are referenced by this source, determined at name resolution.
694    pub resolved_ids: ResolvedIds,
695    /// All the available upstream references for this source.
696    /// Populated for top-level sources that can contain subsources/tables
697    /// and used during sequencing to populate the appropriate catalog fields.
698    pub available_source_references: Option<SourceReferences>,
699}
700
701#[derive(Debug)]
702pub struct CreateConnectionPlan {
703    pub name: QualifiedItemName,
704    pub if_not_exists: bool,
705    pub connection: Connection,
706    pub validate: bool,
707}
708
709#[derive(Debug)]
710pub struct ValidateConnectionPlan {
711    /// ID of the connection in the Catalog.
712    pub id: CatalogItemId,
713    /// The connection to validate.
714    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
715}
716
717#[derive(Debug)]
718pub struct CreateSecretPlan {
719    pub name: QualifiedItemName,
720    pub secret: Secret,
721    pub if_not_exists: bool,
722}
723
724#[derive(Debug)]
725pub struct CreateSinkPlan {
726    pub name: QualifiedItemName,
727    pub sink: Sink,
728    pub with_snapshot: bool,
729    pub if_not_exists: bool,
730    pub in_cluster: ClusterId,
731}
732
733#[derive(Debug)]
734pub struct CreateTablePlan {
735    pub name: QualifiedItemName,
736    pub table: Table,
737    pub if_not_exists: bool,
738}
739
740#[derive(Debug, Clone)]
741pub struct CreateViewPlan {
742    pub name: QualifiedItemName,
743    pub view: View,
744    /// The Catalog objects that this view is replacing, if any.
745    pub replace: Option<CatalogItemId>,
746    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
747    pub drop_ids: Vec<CatalogItemId>,
748    pub if_not_exists: bool,
749    /// True if the view contains an expression that can make the exact column list
750    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
751    pub ambiguous_columns: bool,
752}
753
754#[derive(Debug, Clone)]
755pub struct CreateMaterializedViewPlan {
756    pub name: QualifiedItemName,
757    pub materialized_view: MaterializedView,
758    /// The Catalog objects that this materialized view is replacing, if any.
759    pub replace: Option<CatalogItemId>,
760    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
761    pub drop_ids: Vec<CatalogItemId>,
762    pub if_not_exists: bool,
763    /// True if the materialized view contains an expression that can make the exact column list
764    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
765    pub ambiguous_columns: bool,
766}
767
768#[derive(Debug, Clone)]
769pub struct CreateContinualTaskPlan {
770    pub name: QualifiedItemName,
771    /// During initial creation, the `LocalId` placeholder for this CT in `continual_task.expr`.
772    /// None on restart.
773    pub placeholder_id: Option<mz_expr::LocalId>,
774    pub desc: RelationDesc,
775    /// ID of the collection we read into this continual task.
776    pub input_id: GlobalId,
777    pub with_snapshot: bool,
778    /// Definition for the continual task.
779    pub continual_task: MaterializedView,
780}
781
782#[derive(Debug, Clone)]
783pub struct CreateNetworkPolicyPlan {
784    pub name: String,
785    pub rules: Vec<NetworkPolicyRule>,
786}
787
788#[derive(Debug, Clone)]
789pub struct AlterNetworkPolicyPlan {
790    pub id: NetworkPolicyId,
791    pub name: String,
792    pub rules: Vec<NetworkPolicyRule>,
793}
794
795#[derive(Debug, Clone)]
796pub struct CreateIndexPlan {
797    pub name: QualifiedItemName,
798    pub index: Index,
799    pub if_not_exists: bool,
800}
801
802#[derive(Debug)]
803pub struct CreateTypePlan {
804    pub name: QualifiedItemName,
805    pub typ: Type,
806}
807
808#[derive(Debug)]
809pub struct DropObjectsPlan {
810    /// The IDs of only the objects directly referenced in the `DROP` statement.
811    pub referenced_ids: Vec<ObjectId>,
812    /// All object IDs to drop. Includes `referenced_ids` and all descendants.
813    pub drop_ids: Vec<ObjectId>,
814    /// The type of object that was dropped explicitly in the DROP statement. `ids` may contain
815    /// objects of different types due to CASCADE.
816    pub object_type: ObjectType,
817}
818
819#[derive(Debug)]
820pub struct DropOwnedPlan {
821    /// The role IDs that own the objects.
822    pub role_ids: Vec<RoleId>,
823    /// All object IDs to drop.
824    pub drop_ids: Vec<ObjectId>,
825    /// The privileges to revoke.
826    pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
827    /// The default privileges to revoke.
828    pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
829}
830
831#[derive(Debug)]
832pub struct ShowVariablePlan {
833    pub name: String,
834}
835
836#[derive(Debug)]
837pub struct InspectShardPlan {
838    /// ID of the storage collection to inspect.
839    pub id: GlobalId,
840}
841
842#[derive(Debug)]
843pub struct SetVariablePlan {
844    pub name: String,
845    pub value: VariableValue,
846    pub local: bool,
847}
848
849#[derive(Debug)]
850pub enum VariableValue {
851    Default,
852    Values(Vec<String>),
853}
854
855#[derive(Debug)]
856pub struct ResetVariablePlan {
857    pub name: String,
858}
859
860#[derive(Debug)]
861pub struct SetTransactionPlan {
862    pub local: bool,
863    pub modes: Vec<TransactionMode>,
864}
865
866/// A plan for select statements.
867#[derive(Clone, Debug)]
868pub struct SelectPlan {
869    /// The `SELECT` statement itself. Used for explain/notices, but not otherwise
870    /// load-bearing. Boxed to save stack space.
871    pub select: Option<Box<SelectStatement<Aug>>>,
872    /// The plan as a HIR.
873    pub source: HirRelationExpr,
874    /// At what time should this select happen?
875    pub when: QueryWhen,
876    /// Instructions how to form the result set.
877    pub finishing: RowSetFinishing,
878    /// For `COPY TO STDOUT`, the format to use.
879    pub copy_to: Option<CopyFormat>,
880}
881
882impl SelectPlan {
883    pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
884        let arity = typ.arity();
885        SelectPlan {
886            select: None,
887            source: HirRelationExpr::Constant { rows, typ },
888            when: QueryWhen::Immediately,
889            finishing: RowSetFinishing::trivial(arity),
890            copy_to: None,
891        }
892    }
893}
894
895#[derive(Debug, Clone)]
896pub enum SubscribeOutput {
897    Diffs,
898    WithinTimestampOrderBy {
899        /// We pretend that mz_diff is prepended to the normal columns, making it index 0
900        order_by: Vec<ColumnOrder>,
901    },
902    EnvelopeUpsert {
903        /// Order by with just keys
904        order_by_keys: Vec<ColumnOrder>,
905    },
906    EnvelopeDebezium {
907        /// Order by with just keys
908        order_by_keys: Vec<ColumnOrder>,
909    },
910}
911
912impl SubscribeOutput {
913    pub fn row_order(&self) -> &[ColumnOrder] {
914        match self {
915            SubscribeOutput::Diffs => &[],
916            // This ordering prepends the diff, so its `order_by` field cannot be applied to rows.
917            SubscribeOutput::WithinTimestampOrderBy { .. } => &[],
918            SubscribeOutput::EnvelopeUpsert { order_by_keys } => order_by_keys,
919            SubscribeOutput::EnvelopeDebezium { order_by_keys } => order_by_keys,
920        }
921    }
922}
923
924#[derive(Debug, Clone)]
925pub struct SubscribePlan {
926    pub from: SubscribeFrom,
927    pub with_snapshot: bool,
928    pub when: QueryWhen,
929    pub up_to: Option<Timestamp>,
930    pub copy_to: Option<CopyFormat>,
931    pub emit_progress: bool,
932    pub output: SubscribeOutput,
933}
934
935#[derive(Debug, Clone)]
936pub enum SubscribeFrom {
937    /// ID of the collection to subscribe to.
938    Id(GlobalId),
939    /// Query to subscribe to.
940    Query {
941        expr: HirRelationExpr,
942        desc: RelationDesc,
943    },
944}
945
946impl SubscribeFrom {
947    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
948        match self {
949            SubscribeFrom::Id(id) => BTreeSet::from([*id]),
950            SubscribeFrom::Query { expr, .. } => expr.depends_on(),
951        }
952    }
953
954    pub fn contains_temporal(&self) -> bool {
955        match self {
956            SubscribeFrom::Id(_) => false,
957            SubscribeFrom::Query { expr, .. } => expr
958                .contains_temporal()
959                .expect("Unexpected error in `visit_scalars` call"),
960        }
961    }
962}
963
964#[derive(Debug)]
965pub struct ShowCreatePlan {
966    pub id: ObjectId,
967    pub row: Row,
968}
969
970#[derive(Debug)]
971pub struct ShowColumnsPlan {
972    pub id: CatalogItemId,
973    pub select_plan: SelectPlan,
974    pub new_resolved_ids: ResolvedIds,
975}
976
977#[derive(Debug)]
978pub struct CopyFromPlan {
979    /// Table we're copying into.
980    pub target_id: CatalogItemId,
981    /// Human-readable full name of the target table.
982    pub target_name: String,
983    /// Source we're copying data from.
984    pub source: CopyFromSource,
985    /// How input columns map to those on the destination table.
986    ///
987    /// TODO(cf2): Remove this field in favor of the mfp.
988    pub columns: Vec<ColumnIndex>,
989    /// [`RelationDesc`] describing the input data.
990    pub source_desc: RelationDesc,
991    /// Changes the shape of the input data to match the destination table.
992    pub mfp: MapFilterProject,
993    /// Format specific params for copying the input data.
994    pub params: CopyFormatParams<'static>,
995    /// Filter for the source files we're copying from, e.g. an S3 prefix.
996    pub filter: Option<CopyFromFilter>,
997}
998
999#[derive(Debug)]
1000pub enum CopyFromSource {
1001    /// Copying from a file local to the user, transmitted via pgwire.
1002    Stdin,
1003    /// A remote resource, e.g. HTTP file.
1004    ///
1005    /// The contained [`HirScalarExpr`] evaluates to the Url for the remote resource.
1006    Url(HirScalarExpr),
1007    /// A file in an S3 bucket.
1008    AwsS3 {
1009        /// Expression that evaluates to the file we want to copy.
1010        uri: HirScalarExpr,
1011        /// Details for how we connect to AWS S3.
1012        connection: AwsConnection,
1013        /// ID of the connection object.
1014        connection_id: CatalogItemId,
1015    },
1016}
1017
1018#[derive(Debug)]
1019pub enum CopyFromFilter {
1020    Files(Vec<String>),
1021    Pattern(String),
1022}
1023
1024/// `COPY TO S3`
1025///
1026/// (This is a completely different thing from `COPY TO STDOUT`. That is a `Plan::Select` with
1027/// `copy_to` set.)
1028#[derive(Debug, Clone)]
1029pub struct CopyToPlan {
1030    /// The select query plan whose data will be copied to destination uri.
1031    pub select_plan: SelectPlan,
1032    pub desc: RelationDesc,
1033    /// The scalar expression to be resolved to get the destination uri.
1034    pub to: HirScalarExpr,
1035    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
1036    /// The ID of the connection.
1037    pub connection_id: CatalogItemId,
1038    pub format: S3SinkFormat,
1039    pub max_file_size: u64,
1040}
1041
1042#[derive(Clone, Debug)]
1043pub struct ExplainPlanPlan {
1044    pub stage: ExplainStage,
1045    pub format: ExplainFormat,
1046    pub config: ExplainConfig,
1047    pub explainee: Explainee,
1048}
1049
1050/// The type of object to be explained
1051#[derive(Clone, Debug)]
1052pub enum Explainee {
1053    /// Lookup and explain a plan saved for an view.
1054    View(CatalogItemId),
1055    /// Lookup and explain a plan saved for an existing materialized view.
1056    MaterializedView(CatalogItemId),
1057    /// Lookup and explain a plan saved for an existing index.
1058    Index(CatalogItemId),
1059    /// Replan an existing view.
1060    ReplanView(CatalogItemId),
1061    /// Replan an existing materialized view.
1062    ReplanMaterializedView(CatalogItemId),
1063    /// Replan an existing index.
1064    ReplanIndex(CatalogItemId),
1065    /// A SQL statement.
1066    Statement(ExplaineeStatement),
1067}
1068
1069/// Explainee types that are statements.
1070#[derive(Clone, Debug, EnumKind)]
1071#[enum_kind(ExplaineeStatementKind)]
1072pub enum ExplaineeStatement {
1073    /// The object to be explained is a SELECT statement.
1074    Select {
1075        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1076        broken: bool,
1077        plan: plan::SelectPlan,
1078        desc: RelationDesc,
1079    },
1080    /// The object to be explained is a CREATE VIEW.
1081    CreateView {
1082        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1083        broken: bool,
1084        plan: plan::CreateViewPlan,
1085    },
1086    /// The object to be explained is a CREATE MATERIALIZED VIEW.
1087    CreateMaterializedView {
1088        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1089        broken: bool,
1090        plan: plan::CreateMaterializedViewPlan,
1091    },
1092    /// The object to be explained is a CREATE INDEX.
1093    CreateIndex {
1094        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1095        broken: bool,
1096        plan: plan::CreateIndexPlan,
1097    },
1098    /// The object to be explained is a SUBSCRIBE statement.
1099    Subscribe {
1100        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1101        broken: bool,
1102        plan: plan::SubscribePlan,
1103    },
1104}
1105
1106impl ExplaineeStatement {
1107    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1108        match self {
1109            Self::Select { plan, .. } => plan.source.depends_on(),
1110            Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1111            Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1112            Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1113            Self::Subscribe { plan, .. } => plan.from.depends_on(),
1114        }
1115    }
1116
1117    /// Statements that have their `broken` flag set are expected to cause a
1118    /// panic in the optimizer code. In this case:
1119    ///
1120    /// 1. The optimizer pipeline execution will stop, but the panic will be
1121    ///    intercepted and will not propagate to the caller. The partial
1122    ///    optimizer trace collected until this point will be available.
1123    /// 2. The optimizer trace tracing subscriber will delegate regular tracing
1124    ///    spans and events to the default subscriber.
1125    ///
1126    /// This is useful when debugging queries that cause panics.
1127    pub fn broken(&self) -> bool {
1128        match self {
1129            Self::Select { broken, .. } => *broken,
1130            Self::CreateView { broken, .. } => *broken,
1131            Self::CreateMaterializedView { broken, .. } => *broken,
1132            Self::CreateIndex { broken, .. } => *broken,
1133            Self::Subscribe { broken, .. } => *broken,
1134        }
1135    }
1136}
1137
1138impl ExplaineeStatementKind {
1139    pub fn supports(&self, stage: &ExplainStage) -> bool {
1140        use ExplainStage::*;
1141        match self {
1142            Self::Select => true,
1143            Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1144            Self::CreateMaterializedView => true,
1145            Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1146            // SUBSCRIBE doesn't support RAW, DECORRELATED, or LOCAL stages because
1147            // it takes MIR directly rather than going through HIR lowering.
1148            Self::Subscribe => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1149        }
1150    }
1151}
1152
1153impl std::fmt::Display for ExplaineeStatementKind {
1154    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1155        match self {
1156            Self::Select => write!(f, "SELECT"),
1157            Self::CreateView => write!(f, "CREATE VIEW"),
1158            Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1159            Self::CreateIndex => write!(f, "CREATE INDEX"),
1160            Self::Subscribe => write!(f, "SUBSCRIBE"),
1161        }
1162    }
1163}
1164
1165#[derive(Clone, Debug)]
1166pub struct ExplainPushdownPlan {
1167    pub explainee: Explainee,
1168}
1169
1170#[derive(Clone, Debug)]
1171pub struct ExplainTimestampPlan {
1172    pub format: ExplainFormat,
1173    pub raw_plan: HirRelationExpr,
1174    pub when: QueryWhen,
1175}
1176
1177#[derive(Debug)]
1178pub struct ExplainSinkSchemaPlan {
1179    pub sink_from: GlobalId,
1180    pub json_schema: String,
1181}
1182
1183#[derive(Debug)]
1184pub struct SendDiffsPlan {
1185    pub id: CatalogItemId,
1186    pub updates: Vec<(Row, Diff)>,
1187    pub kind: MutationKind,
1188    pub returning: Vec<(Row, NonZeroUsize)>,
1189    pub max_result_size: u64,
1190}
1191
1192#[derive(Debug)]
1193pub struct InsertPlan {
1194    pub id: CatalogItemId,
1195    pub values: HirRelationExpr,
1196    pub returning: Vec<mz_expr::MirScalarExpr>,
1197}
1198
1199#[derive(Debug)]
1200pub struct ReadThenWritePlan {
1201    pub id: CatalogItemId,
1202    pub selection: HirRelationExpr,
1203    pub finishing: RowSetFinishing,
1204    pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1205    pub kind: MutationKind,
1206    pub returning: Vec<mz_expr::MirScalarExpr>,
1207}
1208
1209/// Generated by `ALTER ... IF EXISTS` if the named object did not exist.
1210#[derive(Debug)]
1211pub struct AlterNoopPlan {
1212    pub object_type: ObjectType,
1213}
1214
1215#[derive(Debug)]
1216pub struct AlterSetClusterPlan {
1217    pub id: CatalogItemId,
1218    pub set_cluster: ClusterId,
1219}
1220
1221#[derive(Debug)]
1222pub struct AlterRetainHistoryPlan {
1223    pub id: CatalogItemId,
1224    pub value: Option<Value>,
1225    pub window: CompactionWindow,
1226    pub object_type: ObjectType,
1227}
1228
1229#[derive(Debug)]
1230pub struct AlterSourceTimestampIntervalPlan {
1231    pub id: CatalogItemId,
1232    pub value: Option<Value>,
1233    pub interval: Duration,
1234}
1235
1236#[derive(Debug, Clone)]
1237
1238pub enum AlterOptionParameter<T = String> {
1239    Set(T),
1240    Reset,
1241    Unchanged,
1242}
1243
1244#[derive(Debug)]
1245pub enum AlterConnectionAction {
1246    RotateKeys,
1247    AlterOptions {
1248        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1249        drop_options: BTreeSet<ConnectionOptionName>,
1250        validate: bool,
1251    },
1252}
1253
1254#[derive(Debug)]
1255pub struct AlterConnectionPlan {
1256    pub id: CatalogItemId,
1257    pub action: AlterConnectionAction,
1258}
1259
1260#[derive(Debug)]
1261pub enum AlterSourceAction {
1262    AddSubsourceExports {
1263        subsources: Vec<CreateSourcePlanBundle>,
1264        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1265    },
1266    RefreshReferences {
1267        references: SourceReferences,
1268    },
1269}
1270
1271#[derive(Debug)]
1272pub struct AlterSourcePlan {
1273    pub item_id: CatalogItemId,
1274    pub ingestion_id: GlobalId,
1275    pub action: AlterSourceAction,
1276}
1277
1278#[derive(Debug, Clone)]
1279pub struct AlterSinkPlan {
1280    pub item_id: CatalogItemId,
1281    pub global_id: GlobalId,
1282    pub sink: Sink,
1283    pub with_snapshot: bool,
1284    pub in_cluster: ClusterId,
1285}
1286
1287#[derive(Debug, Clone)]
1288pub struct AlterClusterPlan {
1289    pub id: ClusterId,
1290    pub name: String,
1291    pub options: PlanClusterOption,
1292    pub strategy: AlterClusterPlanStrategy,
1293}
1294
1295#[derive(Debug)]
1296pub struct AlterClusterRenamePlan {
1297    pub id: ClusterId,
1298    pub name: String,
1299    pub to_name: String,
1300}
1301
1302#[derive(Debug)]
1303pub struct AlterClusterReplicaRenamePlan {
1304    pub cluster_id: ClusterId,
1305    pub replica_id: ReplicaId,
1306    pub name: QualifiedReplica,
1307    pub to_name: String,
1308}
1309
1310#[derive(Debug)]
1311pub struct AlterItemRenamePlan {
1312    pub id: CatalogItemId,
1313    pub current_full_name: FullItemName,
1314    pub to_name: String,
1315    pub object_type: ObjectType,
1316}
1317
1318#[derive(Debug)]
1319pub struct AlterSchemaRenamePlan {
1320    pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1321    pub new_schema_name: String,
1322}
1323
1324#[derive(Debug)]
1325pub struct AlterSchemaSwapPlan {
1326    pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1327    pub schema_a_name: String,
1328    pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1329    pub schema_b_name: String,
1330    pub name_temp: String,
1331}
1332
1333#[derive(Debug)]
1334pub struct AlterClusterSwapPlan {
1335    pub id_a: ClusterId,
1336    pub id_b: ClusterId,
1337    pub name_a: String,
1338    pub name_b: String,
1339    pub name_temp: String,
1340}
1341
1342#[derive(Debug)]
1343pub struct AlterSecretPlan {
1344    pub id: CatalogItemId,
1345    pub secret_as: MirScalarExpr,
1346}
1347
1348#[derive(Debug)]
1349pub struct AlterSystemSetPlan {
1350    pub name: String,
1351    pub value: VariableValue,
1352}
1353
1354#[derive(Debug)]
1355pub struct AlterSystemResetPlan {
1356    pub name: String,
1357}
1358
1359#[derive(Debug)]
1360pub struct AlterSystemResetAllPlan {}
1361
1362#[derive(Debug)]
1363pub struct AlterRolePlan {
1364    pub id: RoleId,
1365    pub name: String,
1366    pub option: PlannedAlterRoleOption,
1367}
1368
1369#[derive(Debug)]
1370pub struct AlterOwnerPlan {
1371    pub id: ObjectId,
1372    pub object_type: ObjectType,
1373    pub new_owner: RoleId,
1374}
1375
1376#[derive(Debug)]
1377pub struct AlterTablePlan {
1378    pub relation_id: CatalogItemId,
1379    pub column_name: ColumnName,
1380    pub column_type: SqlColumnType,
1381    pub raw_sql_type: RawDataType,
1382}
1383
1384#[derive(Debug, Clone)]
1385pub struct AlterMaterializedViewApplyReplacementPlan {
1386    pub id: CatalogItemId,
1387    pub replacement_id: CatalogItemId,
1388}
1389
1390#[derive(Debug)]
1391pub struct DeclarePlan {
1392    pub name: String,
1393    pub stmt: Statement<Raw>,
1394    pub sql: String,
1395    pub params: Params,
1396}
1397
1398#[derive(Debug)]
1399pub struct FetchPlan {
1400    pub name: String,
1401    pub count: Option<FetchDirection>,
1402    pub timeout: ExecuteTimeout,
1403}
1404
1405#[derive(Debug)]
1406pub struct ClosePlan {
1407    pub name: String,
1408}
1409
1410#[derive(Debug)]
1411pub struct PreparePlan {
1412    pub name: String,
1413    pub stmt: Statement<Raw>,
1414    pub sql: String,
1415    pub desc: StatementDesc,
1416}
1417
1418#[derive(Debug)]
1419pub struct ExecutePlan {
1420    pub name: String,
1421    pub params: Params,
1422}
1423
1424#[derive(Debug)]
1425pub struct DeallocatePlan {
1426    pub name: Option<String>,
1427}
1428
1429#[derive(Debug)]
1430pub struct RaisePlan {
1431    pub severity: NoticeSeverity,
1432}
1433
1434#[derive(Debug)]
1435pub struct GrantRolePlan {
1436    /// The roles that are gaining members.
1437    pub role_ids: Vec<RoleId>,
1438    /// The roles that will be added to `role_id`.
1439    pub member_ids: Vec<RoleId>,
1440    /// The role that granted the membership.
1441    pub grantor_id: RoleId,
1442}
1443
1444#[derive(Debug)]
1445pub struct RevokeRolePlan {
1446    /// The roles that are losing members.
1447    pub role_ids: Vec<RoleId>,
1448    /// The roles that will be removed from `role_id`.
1449    pub member_ids: Vec<RoleId>,
1450    /// The role that revoked the membership.
1451    pub grantor_id: RoleId,
1452}
1453
1454#[derive(Debug)]
1455pub struct UpdatePrivilege {
1456    /// The privileges being granted/revoked on an object.
1457    pub acl_mode: AclMode,
1458    /// The ID of the object receiving privileges.
1459    pub target_id: SystemObjectId,
1460    /// The role that is granting the privileges.
1461    pub grantor: RoleId,
1462}
1463
1464#[derive(Debug)]
1465pub struct GrantPrivilegesPlan {
1466    /// Description of each privilege being granted.
1467    pub update_privileges: Vec<UpdatePrivilege>,
1468    /// The roles that will granted the privileges.
1469    pub grantees: Vec<RoleId>,
1470}
1471
1472#[derive(Debug)]
1473pub struct RevokePrivilegesPlan {
1474    /// Description of each privilege being revoked.
1475    pub update_privileges: Vec<UpdatePrivilege>,
1476    /// The roles that will have privileges revoked.
1477    pub revokees: Vec<RoleId>,
1478}
1479#[derive(Debug)]
1480pub struct AlterDefaultPrivilegesPlan {
1481    /// Description of objects that match this default privilege.
1482    pub privilege_objects: Vec<DefaultPrivilegeObject>,
1483    /// The privilege to be granted/revoked from the matching objects.
1484    pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1485    /// Whether this is a grant or revoke.
1486    pub is_grant: bool,
1487}
1488
1489#[derive(Debug)]
1490pub struct ReassignOwnedPlan {
1491    /// The roles whose owned objects are being reassigned.
1492    pub old_roles: Vec<RoleId>,
1493    /// The new owner of the objects.
1494    pub new_role: RoleId,
1495    /// All object IDs to reassign.
1496    pub reassign_ids: Vec<ObjectId>,
1497}
1498
1499#[derive(Debug)]
1500pub struct CommentPlan {
1501    /// The object that this comment is associated with.
1502    pub object_id: CommentObjectId,
1503    /// A sub-component of the object that this comment is associated with, e.g. a column.
1504    ///
1505    /// TODO(parkmycar): <https://github.com/MaterializeInc/database-issues/issues/6711>.
1506    pub sub_component: Option<usize>,
1507    /// The comment itself. If `None` that indicates we should clear the existing comment.
1508    pub comment: Option<String>,
1509}
1510
1511#[derive(Clone, Debug)]
1512pub enum TableDataSource {
1513    /// The table owns data created via INSERT/UPDATE/DELETE statements.
1514    TableWrites { defaults: Vec<Expr<Aug>> },
1515
1516    /// The table receives its data from the identified `DataSourceDesc`.
1517    /// This table type does not support INSERT/UPDATE/DELETE statements.
1518    DataSource {
1519        desc: DataSourceDesc,
1520        timeline: Timeline,
1521    },
1522}
1523
1524#[derive(Clone, Debug)]
1525pub struct Table {
1526    pub create_sql: String,
1527    pub desc: VersionedRelationDesc,
1528    pub temporary: bool,
1529    pub compaction_window: Option<CompactionWindow>,
1530    pub data_source: TableDataSource,
1531}
1532
1533#[derive(Clone, Debug)]
1534pub struct Source {
1535    pub create_sql: String,
1536    pub data_source: DataSourceDesc,
1537    pub desc: RelationDesc,
1538    pub compaction_window: Option<CompactionWindow>,
1539}
1540
1541#[derive(Debug, Clone)]
1542pub enum DataSourceDesc {
1543    /// Receives data from an external system.
1544    Ingestion(SourceDesc<ReferencedConnection>),
1545    /// Receives data from an external system.
1546    OldSyntaxIngestion {
1547        desc: SourceDesc<ReferencedConnection>,
1548        // If we're dealing with an old syntax ingestion the progress id will be some other collection
1549        // and the ingestion itself will have the data from a default external reference
1550        progress_subsource: CatalogItemId,
1551        data_config: SourceExportDataConfig<ReferencedConnection>,
1552        details: SourceExportDetails,
1553    },
1554    /// This source receives its data from the identified ingestion,
1555    /// specifically the output identified by `external_reference`.
1556    IngestionExport {
1557        ingestion_id: CatalogItemId,
1558        external_reference: UnresolvedItemName,
1559        details: SourceExportDetails,
1560        data_config: SourceExportDataConfig<ReferencedConnection>,
1561    },
1562    /// Receives data from the source's reclocking/remapping operations.
1563    Progress,
1564    /// Receives data from HTTP post requests.
1565    Webhook {
1566        validate_using: Option<WebhookValidation>,
1567        body_format: WebhookBodyFormat,
1568        headers: WebhookHeaders,
1569        /// Only `Some` when created via `CREATE TABLE ... FROM WEBHOOK`.
1570        cluster_id: Option<StorageInstanceId>,
1571    },
1572}
1573
1574#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1575pub struct WebhookValidation {
1576    /// The expression used to validate a request.
1577    pub expression: MirScalarExpr,
1578    /// Description of the source that will be created.
1579    pub relation_desc: RelationDesc,
1580    /// The column index to provide the request body and whether to provide it as bytes.
1581    pub bodies: Vec<(usize, bool)>,
1582    /// The column index to provide the request headers and whether to provide the values as bytes.
1583    pub headers: Vec<(usize, bool)>,
1584    /// Any secrets that are used in that validation.
1585    pub secrets: Vec<WebhookValidationSecret>,
1586}
1587
1588impl WebhookValidation {
1589    const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1590
1591    /// Attempt to reduce the internal [`MirScalarExpr`] into a simpler expression.
1592    ///
1593    /// The reduction happens on a separate thread, we also only wait for
1594    /// `WebhookValidation::MAX_REDUCE_TIME` before timing out and returning an error.
1595    pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1596        let WebhookValidation {
1597            expression,
1598            relation_desc,
1599            ..
1600        } = self;
1601
1602        // On a different thread, attempt to reduce the expression.
1603        let mut expression_ = expression.clone();
1604        let desc_ = relation_desc.clone();
1605        let reduce_task = mz_ore::task::spawn_blocking(
1606            || "webhook-validation-reduce",
1607            move || {
1608                let repr_col_types: Vec<ReprColumnType> = desc_
1609                    .typ()
1610                    .column_types
1611                    .iter()
1612                    .map(ReprColumnType::from)
1613                    .collect();
1614                expression_.reduce(&repr_col_types);
1615                expression_
1616            },
1617        );
1618
1619        match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1620            Ok(reduced_expr) => {
1621                *expression = reduced_expr;
1622                Ok(())
1623            }
1624            Err(_) => Err("timeout"),
1625        }
1626    }
1627}
1628
1629#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1630pub struct WebhookHeaders {
1631    /// Optionally include a column named `headers` whose content is possibly filtered.
1632    pub header_column: Option<WebhookHeaderFilters>,
1633    /// The column index to provide the specific request header, and whether to provide it as bytes.
1634    pub mapped_headers: BTreeMap<usize, (String, bool)>,
1635}
1636
1637impl WebhookHeaders {
1638    /// Returns the number of columns needed to represent our headers.
1639    pub fn num_columns(&self) -> usize {
1640        let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1641        let mapped_headers = self.mapped_headers.len();
1642
1643        header_column + mapped_headers
1644    }
1645}
1646
1647#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1648pub struct WebhookHeaderFilters {
1649    pub block: BTreeSet<String>,
1650    pub allow: BTreeSet<String>,
1651}
1652
1653#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Arbitrary)]
1654pub enum WebhookBodyFormat {
1655    Json { array: bool },
1656    Bytes,
1657    Text,
1658}
1659
1660impl From<WebhookBodyFormat> for SqlScalarType {
1661    fn from(value: WebhookBodyFormat) -> Self {
1662        match value {
1663            WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1664            WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1665            WebhookBodyFormat::Text => SqlScalarType::String,
1666        }
1667    }
1668}
1669
1670#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1671pub struct WebhookValidationSecret {
1672    /// Identifies the secret by [`CatalogItemId`].
1673    pub id: CatalogItemId,
1674    /// Column index for the expression context that this secret was originally evaluated in.
1675    pub column_idx: usize,
1676    /// Whether or not this secret should be provided to the expression as Bytes or a String.
1677    pub use_bytes: bool,
1678}
1679
1680#[derive(Clone, Debug)]
1681pub struct Connection {
1682    pub create_sql: String,
1683    pub details: ConnectionDetails,
1684}
1685
1686#[derive(Clone, Debug, Serialize)]
1687pub enum ConnectionDetails {
1688    Kafka(KafkaConnection<ReferencedConnection>),
1689    Csr(CsrConnection<ReferencedConnection>),
1690    Postgres(PostgresConnection<ReferencedConnection>),
1691    Ssh {
1692        connection: SshConnection,
1693        key_1: SshKey,
1694        key_2: SshKey,
1695    },
1696    Aws(AwsConnection),
1697    AwsPrivatelink(AwsPrivatelinkConnection),
1698    MySql(MySqlConnection<ReferencedConnection>),
1699    SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1700    IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1701}
1702
1703impl ConnectionDetails {
1704    pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1705        match self {
1706            ConnectionDetails::Kafka(c) => {
1707                mz_storage_types::connections::Connection::Kafka(c.clone())
1708            }
1709            ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1710            ConnectionDetails::Postgres(c) => {
1711                mz_storage_types::connections::Connection::Postgres(c.clone())
1712            }
1713            ConnectionDetails::Ssh { connection, .. } => {
1714                mz_storage_types::connections::Connection::Ssh(connection.clone())
1715            }
1716            ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1717            ConnectionDetails::AwsPrivatelink(c) => {
1718                mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1719            }
1720            ConnectionDetails::MySql(c) => {
1721                mz_storage_types::connections::Connection::MySql(c.clone())
1722            }
1723            ConnectionDetails::SqlServer(c) => {
1724                mz_storage_types::connections::Connection::SqlServer(c.clone())
1725            }
1726            ConnectionDetails::IcebergCatalog(c) => {
1727                mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1728            }
1729        }
1730    }
1731}
1732
1733#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1734pub struct NetworkPolicyRule {
1735    pub name: String,
1736    pub action: NetworkPolicyRuleAction,
1737    pub address: PolicyAddress,
1738    pub direction: NetworkPolicyRuleDirection,
1739}
1740
1741#[derive(
1742    Debug,
1743    Clone,
1744    Serialize,
1745    Deserialize,
1746    PartialEq,
1747    Eq,
1748    Ord,
1749    PartialOrd,
1750    Hash
1751)]
1752pub enum NetworkPolicyRuleAction {
1753    Allow,
1754}
1755
1756impl std::fmt::Display for NetworkPolicyRuleAction {
1757    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1758        match self {
1759            Self::Allow => write!(f, "allow"),
1760        }
1761    }
1762}
1763impl TryFrom<&str> for NetworkPolicyRuleAction {
1764    type Error = PlanError;
1765    fn try_from(value: &str) -> Result<Self, Self::Error> {
1766        match value.to_uppercase().as_str() {
1767            "ALLOW" => Ok(Self::Allow),
1768            _ => Err(PlanError::Unstructured(
1769                "Allow is the only valid option".into(),
1770            )),
1771        }
1772    }
1773}
1774
1775#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1776pub enum NetworkPolicyRuleDirection {
1777    Ingress,
1778}
1779impl std::fmt::Display for NetworkPolicyRuleDirection {
1780    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1781        match self {
1782            Self::Ingress => write!(f, "ingress"),
1783        }
1784    }
1785}
1786impl TryFrom<&str> for NetworkPolicyRuleDirection {
1787    type Error = PlanError;
1788    fn try_from(value: &str) -> Result<Self, Self::Error> {
1789        match value.to_uppercase().as_str() {
1790            "INGRESS" => Ok(Self::Ingress),
1791            _ => Err(PlanError::Unstructured(
1792                "Ingress is the only valid option".into(),
1793            )),
1794        }
1795    }
1796}
1797
1798#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1799pub struct PolicyAddress(pub IpNet);
1800impl std::fmt::Display for PolicyAddress {
1801    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1802        write!(f, "{}", &self.0.to_string())
1803    }
1804}
1805impl From<String> for PolicyAddress {
1806    fn from(value: String) -> Self {
1807        Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1808    }
1809}
1810impl TryFrom<&str> for PolicyAddress {
1811    type Error = PlanError;
1812    fn try_from(value: &str) -> Result<Self, Self::Error> {
1813        let net = IpNet::from_str(value)
1814            .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1815        Ok(Self(net))
1816    }
1817}
1818
1819impl Serialize for PolicyAddress {
1820    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1821    where
1822        S: serde::Serializer,
1823    {
1824        serializer.serialize_str(&format!("{}", &self.0))
1825    }
1826}
1827
1828#[derive(Clone, Debug, Serialize)]
1829pub enum SshKey {
1830    PublicOnly(String),
1831    Both(SshKeyPair),
1832}
1833
1834impl SshKey {
1835    pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1836        match self {
1837            SshKey::PublicOnly(_) => None,
1838            SshKey::Both(key_pair) => Some(key_pair),
1839        }
1840    }
1841
1842    pub fn public_key(&self) -> String {
1843        match self {
1844            SshKey::PublicOnly(s) => s.into(),
1845            SshKey::Both(p) => p.ssh_public_key(),
1846        }
1847    }
1848}
1849
1850#[derive(Clone, Debug)]
1851pub struct Secret {
1852    pub create_sql: String,
1853    pub secret_as: MirScalarExpr,
1854}
1855
1856#[derive(Clone, Debug)]
1857pub struct Sink {
1858    /// Parse-able SQL that is stored durably and defines this sink.
1859    pub create_sql: String,
1860    /// Collection we read into this sink.
1861    pub from: GlobalId,
1862    /// Type of connection to the external service we sink into.
1863    pub connection: StorageSinkConnection<ReferencedConnection>,
1864    // TODO(guswynn): this probably should just be in the `connection`.
1865    pub envelope: SinkEnvelope,
1866    pub version: u64,
1867    pub commit_interval: Option<Duration>,
1868}
1869
1870#[derive(Clone, Debug)]
1871pub struct View {
1872    /// Parse-able SQL that is stored durably and defines this view.
1873    pub create_sql: String,
1874    /// Unoptimized high-level expression from parsing the `create_sql`.
1875    pub expr: HirRelationExpr,
1876    /// All of the catalog objects that are referenced by this view, according to the `expr`.
1877    pub dependencies: DependencyIds,
1878    /// Columns of this view.
1879    pub column_names: Vec<ColumnName>,
1880    /// If this view is created in the temporary schema, e.g. `CREATE TEMPORARY ...`.
1881    pub temporary: bool,
1882}
1883
1884#[derive(Clone, Debug)]
1885pub struct MaterializedView {
1886    /// Parse-able SQL that is stored durably and defines this materialized view.
1887    pub create_sql: String,
1888    /// Unoptimized high-level expression from parsing the `create_sql`.
1889    pub expr: HirRelationExpr,
1890    /// All of the catalog objects that are referenced by this materialized view, according to the `expr`.
1891    pub dependencies: DependencyIds,
1892    /// Columns of this view.
1893    pub column_names: Vec<ColumnName>,
1894    pub replacement_target: Option<CatalogItemId>,
1895    /// Cluster this materialized view will get installed on.
1896    pub cluster_id: ClusterId,
1897    /// If set, only install this materialized view's dataflow on the specified replica.
1898    pub target_replica: Option<ReplicaId>,
1899    pub non_null_assertions: Vec<usize>,
1900    pub compaction_window: Option<CompactionWindow>,
1901    pub refresh_schedule: Option<RefreshSchedule>,
1902    pub as_of: Option<Timestamp>,
1903}
1904
1905#[derive(Clone, Debug)]
1906pub struct Index {
1907    /// Parse-able SQL that is stored durably and defines this index.
1908    pub create_sql: String,
1909    /// Collection this index is on top of.
1910    pub on: GlobalId,
1911    pub keys: Vec<mz_expr::MirScalarExpr>,
1912    pub compaction_window: Option<CompactionWindow>,
1913    pub cluster_id: ClusterId,
1914}
1915
1916#[derive(Clone, Debug)]
1917pub struct Type {
1918    pub create_sql: String,
1919    pub inner: CatalogType<IdReference>,
1920}
1921
1922/// Specifies when a `Peek` or `Subscribe` should occur.
1923#[derive(Deserialize, Clone, Debug, PartialEq)]
1924pub enum QueryWhen {
1925    /// The peek should occur at the latest possible timestamp that allows the
1926    /// peek to complete immediately.
1927    Immediately,
1928    /// The peek should occur at a timestamp that allows the peek to see all
1929    /// data written to tables within Materialize.
1930    FreshestTableWrite,
1931    /// The peek should occur at the timestamp described by the specified
1932    /// expression.
1933    ///
1934    /// The expression may have any type.
1935    AtTimestamp(Timestamp),
1936    /// Same as Immediately, but will also advance to at least the specified
1937    /// expression.
1938    AtLeastTimestamp(Timestamp),
1939}
1940
1941impl QueryWhen {
1942    /// Returns a timestamp to which the candidate must be advanced.
1943    pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1944        match self {
1945            QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1946            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1947        }
1948    }
1949    /// Returns whether the candidate's upper bound is constrained.
1950    /// This is only true for `AtTimestamp` since it is the only variant that
1951    /// specifies a timestamp.
1952    pub fn constrains_upper(&self) -> bool {
1953        match self {
1954            QueryWhen::AtTimestamp(_) => true,
1955            QueryWhen::AtLeastTimestamp(_)
1956            | QueryWhen::Immediately
1957            | QueryWhen::FreshestTableWrite => false,
1958        }
1959    }
1960    /// Returns whether the candidate must be advanced to the since.
1961    pub fn advance_to_since(&self) -> bool {
1962        match self {
1963            QueryWhen::Immediately
1964            | QueryWhen::AtLeastTimestamp(_)
1965            | QueryWhen::FreshestTableWrite => true,
1966            QueryWhen::AtTimestamp(_) => false,
1967        }
1968    }
1969    /// Returns whether the candidate can be advanced to the upper.
1970    pub fn can_advance_to_upper(&self) -> bool {
1971        match self {
1972            QueryWhen::Immediately => true,
1973            QueryWhen::FreshestTableWrite
1974            | QueryWhen::AtTimestamp(_)
1975            | QueryWhen::AtLeastTimestamp(_) => false,
1976        }
1977    }
1978
1979    /// Returns whether the candidate can be advanced to the timeline's timestamp.
1980    pub fn can_advance_to_timeline_ts(&self) -> bool {
1981        match self {
1982            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1983            QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1984        }
1985    }
1986    /// Returns whether the candidate must be advanced to the timeline's timestamp.
1987    pub fn must_advance_to_timeline_ts(&self) -> bool {
1988        match self {
1989            QueryWhen::FreshestTableWrite => true,
1990            QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1991                false
1992            }
1993        }
1994    }
1995    /// Returns whether the selected timestamp should be tracked within the current transaction.
1996    pub fn is_transactional(&self) -> bool {
1997        match self {
1998            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1999            QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
2000        }
2001    }
2002}
2003
2004#[derive(Debug, Copy, Clone)]
2005pub enum MutationKind {
2006    Insert,
2007    Update,
2008    Delete,
2009}
2010
2011#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
2012pub enum CopyFormat {
2013    Text,
2014    Csv,
2015    Binary,
2016    Parquet,
2017}
2018
2019#[derive(Debug, Copy, Clone)]
2020pub enum ExecuteTimeout {
2021    None,
2022    Seconds(f64),
2023    WaitOnce,
2024}
2025
2026#[derive(Clone, Debug)]
2027pub enum IndexOption {
2028    /// Configures the logical compaction window for an index.
2029    RetainHistory(CompactionWindow),
2030}
2031
2032#[derive(Clone, Debug)]
2033pub enum TableOption {
2034    /// Configures the logical compaction window for a table.
2035    RetainHistory(CompactionWindow),
2036}
2037
2038#[derive(Clone, Debug)]
2039pub struct PlanClusterOption {
2040    pub availability_zones: AlterOptionParameter<Vec<String>>,
2041    pub introspection_debugging: AlterOptionParameter<bool>,
2042    pub introspection_interval: AlterOptionParameter<OptionalDuration>,
2043    pub managed: AlterOptionParameter<bool>,
2044    pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
2045    pub replication_factor: AlterOptionParameter<u32>,
2046    pub size: AlterOptionParameter,
2047    pub schedule: AlterOptionParameter<ClusterSchedule>,
2048    pub workload_class: AlterOptionParameter<Option<String>>,
2049}
2050
2051impl Default for PlanClusterOption {
2052    fn default() -> Self {
2053        Self {
2054            availability_zones: AlterOptionParameter::Unchanged,
2055            introspection_debugging: AlterOptionParameter::Unchanged,
2056            introspection_interval: AlterOptionParameter::Unchanged,
2057            managed: AlterOptionParameter::Unchanged,
2058            replicas: AlterOptionParameter::Unchanged,
2059            replication_factor: AlterOptionParameter::Unchanged,
2060            size: AlterOptionParameter::Unchanged,
2061            schedule: AlterOptionParameter::Unchanged,
2062            workload_class: AlterOptionParameter::Unchanged,
2063        }
2064    }
2065}
2066
2067#[derive(Clone, Debug, PartialEq, Eq)]
2068pub enum AlterClusterPlanStrategy {
2069    None,
2070    For(Duration),
2071    UntilReady {
2072        on_timeout: OnTimeoutAction,
2073        timeout: Duration,
2074    },
2075}
2076
2077#[derive(Clone, Debug, PartialEq, Eq)]
2078pub enum OnTimeoutAction {
2079    Commit,
2080    Rollback,
2081}
2082
2083impl Default for OnTimeoutAction {
2084    fn default() -> Self {
2085        Self::Commit
2086    }
2087}
2088
2089impl TryFrom<&str> for OnTimeoutAction {
2090    type Error = PlanError;
2091    fn try_from(value: &str) -> Result<Self, Self::Error> {
2092        match value.to_uppercase().as_str() {
2093            "COMMIT" => Ok(Self::Commit),
2094            "ROLLBACK" => Ok(Self::Rollback),
2095            _ => Err(PlanError::Unstructured(
2096                "Valid options are COMMIT, ROLLBACK".into(),
2097            )),
2098        }
2099    }
2100}
2101
2102impl AlterClusterPlanStrategy {
2103    pub fn is_none(&self) -> bool {
2104        matches!(self, Self::None)
2105    }
2106    pub fn is_some(&self) -> bool {
2107        !matches!(self, Self::None)
2108    }
2109}
2110
2111impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2112    type Error = PlanError;
2113
2114    fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2115        Ok(match value.wait {
2116            Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2117            Some(ClusterAlterOptionValue::UntilReady(options)) => {
2118                let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2119                Self::UntilReady {
2120                    timeout: match extracted.timeout {
2121                        Some(d) => d,
2122                        None => Err(PlanError::UntilReadyTimeoutRequired)?,
2123                    },
2124                    on_timeout: match extracted.on_timeout {
2125                        Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2126                            PlanError::InvalidOptionValue {
2127                                option_name: "ON TIMEOUT".into(),
2128                                err: Box::new(e),
2129                            }
2130                        })?,
2131                        None => OnTimeoutAction::default(),
2132                    },
2133                }
2134            }
2135            None => Self::None,
2136        })
2137    }
2138}
2139
2140/// A vector of values to which parameter references should be bound.
2141#[derive(Debug, Clone)]
2142pub struct Params {
2143    /// The datums that were provided in the EXECUTE statement.
2144    pub datums: Row,
2145    /// The types of the datums provided in the EXECUTE statement.
2146    pub execute_types: Vec<SqlScalarType>,
2147    /// The types that the prepared statement expects based on its definition.
2148    pub expected_types: Vec<SqlScalarType>,
2149}
2150
2151impl Params {
2152    /// Returns a `Params` with no parameters.
2153    pub fn empty() -> Params {
2154        Params {
2155            datums: Row::pack_slice(&[]),
2156            execute_types: vec![],
2157            expected_types: vec![],
2158        }
2159    }
2160}
2161
2162/// Controls planning of a SQL query.
2163#[derive(
2164    Ord,
2165    PartialOrd,
2166    Clone,
2167    Debug,
2168    Eq,
2169    PartialEq,
2170    Serialize,
2171    Deserialize,
2172    Hash,
2173    Copy
2174)]
2175pub struct PlanContext {
2176    pub wall_time: DateTime<Utc>,
2177    pub ignore_if_exists_errors: bool,
2178}
2179
2180impl PlanContext {
2181    pub fn new(wall_time: DateTime<Utc>) -> Self {
2182        Self {
2183            wall_time,
2184            ignore_if_exists_errors: false,
2185        }
2186    }
2187
2188    /// Return a PlanContext with zero values. This should only be used when
2189    /// planning is required but unused (like in `plan_create_table()`) or in
2190    /// tests.
2191    pub fn zero() -> Self {
2192        PlanContext {
2193            wall_time: now::to_datetime(NOW_ZERO()),
2194            ignore_if_exists_errors: false,
2195        }
2196    }
2197
2198    pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2199        self.ignore_if_exists_errors = value;
2200        self
2201    }
2202}