Skip to main content

mz_sql/
plan.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL planning.
11//!
12//! SQL planning is the process of taking the abstract syntax tree of a
13//! [`Statement`] and turning it into a [`Plan`] that the dataflow layer can
14//! execute.
15//!
16//! Statements must be purified before they can be planned. See the
17//! [`pure`](crate::pure) module for details.
18
19// Internal module layout.
20//
21// The entry point for planning is `statement::handle_statement`. That function
22// dispatches to a more specific `handle` function for the particular statement
23// type. For most statements, this `handle` function is uninteresting and short,
24// but anything involving a `SELECT` statement gets complicated. `SELECT`
25// queries wind through the functions in the `query` module, starting with
26// `plan_root_query` and fanning out based on the contents of the `SELECT`
27// statement.
28
29use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{CollectionPlan, ColumnOrder, MapFilterProject, MirScalarExpr, RowSetFinishing};
41use mz_ore::now::{self, NOW_ZERO};
42use mz_pgcopy::CopyFormatParams;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
44use mz_repr::explain::{ExplainConfig, ExplainFormat};
45use mz_repr::network_policy_id::NetworkPolicyId;
46use mz_repr::optimize::OptimizerFeatureOverrides;
47use mz_repr::refresh_schedule::RefreshSchedule;
48use mz_repr::role_id::RoleId;
49use mz_repr::{
50    CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, ReprColumnType, Row,
51    SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
52};
53use mz_sql_parser::ast::{
54    AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
55    RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
56    Value, WithOptionValue,
57};
58use mz_ssh_util::keys::SshKeyPair;
59use mz_storage_types::connections::aws::AwsConnection;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::connections::{
62    AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
63    MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
64};
65use mz_storage_types::instances::StorageInstanceId;
66use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
67use mz_storage_types::sources::{
68    SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
69};
70use proptest_derive::Arbitrary;
71use serde::{Deserialize, Serialize};
72
73use crate::ast::{
74    ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
75    TransactionAccessMode,
76};
77use crate::catalog::{
78    CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
79    RoleAttributesRaw,
80};
81use crate::names::{
82    Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
83    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
84};
85
86pub(crate) mod error;
87pub(crate) mod explain;
88pub(crate) mod hir;
89pub(crate) mod literal;
90pub(crate) mod lowering;
91pub(crate) mod notice;
92pub(crate) mod plan_utils;
93pub(crate) mod query;
94pub(crate) mod scope;
95pub(crate) mod side_effecting_func;
96pub(crate) mod statement;
97pub(crate) mod transform_ast;
98pub(crate) mod transform_hir;
99pub(crate) mod typeconv;
100pub(crate) mod with_options;
101
102use crate::plan;
103use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
104use crate::plan::with_options::OptionalDuration;
105pub use error::PlanError;
106pub use explain::normalize_subqueries;
107pub use hir::{
108    AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
109    WindowExprType,
110};
111pub use lowering::Config as HirToMirConfig;
112pub use notice::PlanNotice;
113pub use query::{ExprContext, QueryContext, QueryLifetime};
114pub use scope::Scope;
115pub use side_effecting_func::SideEffectingFunc;
116pub use statement::ddl::{
117    AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
118    PlannedAlterRoleOption, PlannedRoleAttributes, PlannedRoleVariable,
119    SqlServerConfigOptionExtracted,
120};
121pub use statement::{
122    StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
123    resolve_cluster_for_materialized_view,
124};
125
126use self::statement::ddl::ClusterAlterOptionExtracted;
127use self::with_options::TryFromValue;
128
129/// Instructions for executing a SQL query.
130#[derive(Debug, EnumKind)]
131#[enum_kind(PlanKind)]
132pub enum Plan {
133    CreateConnection(CreateConnectionPlan),
134    CreateDatabase(CreateDatabasePlan),
135    CreateSchema(CreateSchemaPlan),
136    CreateRole(CreateRolePlan),
137    CreateCluster(CreateClusterPlan),
138    CreateClusterReplica(CreateClusterReplicaPlan),
139    CreateSource(CreateSourcePlan),
140    CreateSources(Vec<CreateSourcePlanBundle>),
141    CreateSecret(CreateSecretPlan),
142    CreateSink(CreateSinkPlan),
143    CreateTable(CreateTablePlan),
144    CreateView(CreateViewPlan),
145    CreateMaterializedView(CreateMaterializedViewPlan),
146    CreateNetworkPolicy(CreateNetworkPolicyPlan),
147    CreateIndex(CreateIndexPlan),
148    CreateType(CreateTypePlan),
149    Comment(CommentPlan),
150    DiscardTemp,
151    DiscardAll,
152    DropObjects(DropObjectsPlan),
153    DropOwned(DropOwnedPlan),
154    EmptyQuery,
155    ShowAllVariables,
156    ShowCreate(ShowCreatePlan),
157    ShowColumns(ShowColumnsPlan),
158    ShowVariable(ShowVariablePlan),
159    InspectShard(InspectShardPlan),
160    SetVariable(SetVariablePlan),
161    ResetVariable(ResetVariablePlan),
162    SetTransaction(SetTransactionPlan),
163    StartTransaction(StartTransactionPlan),
164    CommitTransaction(CommitTransactionPlan),
165    AbortTransaction(AbortTransactionPlan),
166    Select(SelectPlan),
167    Subscribe(SubscribePlan),
168    CopyFrom(CopyFromPlan),
169    CopyTo(CopyToPlan),
170    ExplainPlan(ExplainPlanPlan),
171    ExplainPushdown(ExplainPushdownPlan),
172    ExplainTimestamp(ExplainTimestampPlan),
173    ExplainSinkSchema(ExplainSinkSchemaPlan),
174    Insert(InsertPlan),
175    AlterCluster(AlterClusterPlan),
176    AlterClusterSwap(AlterClusterSwapPlan),
177    AlterNoop(AlterNoopPlan),
178    AlterSetCluster(AlterSetClusterPlan),
179    AlterConnection(AlterConnectionPlan),
180    AlterSource(AlterSourcePlan),
181    AlterClusterRename(AlterClusterRenamePlan),
182    AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
183    AlterItemRename(AlterItemRenamePlan),
184    AlterSchemaRename(AlterSchemaRenamePlan),
185    AlterSchemaSwap(AlterSchemaSwapPlan),
186    AlterSecret(AlterSecretPlan),
187    AlterSink(AlterSinkPlan),
188    AlterSystemSet(AlterSystemSetPlan),
189    AlterSystemReset(AlterSystemResetPlan),
190    AlterSystemResetAll(AlterSystemResetAllPlan),
191    AlterRole(AlterRolePlan),
192    AlterOwner(AlterOwnerPlan),
193    AlterTableAddColumn(AlterTablePlan),
194    AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementPlan),
195    AlterNetworkPolicy(AlterNetworkPolicyPlan),
196    Declare(DeclarePlan),
197    Fetch(FetchPlan),
198    Close(ClosePlan),
199    ReadThenWrite(ReadThenWritePlan),
200    Prepare(PreparePlan),
201    Execute(ExecutePlan),
202    Deallocate(DeallocatePlan),
203    Raise(RaisePlan),
204    GrantRole(GrantRolePlan),
205    RevokeRole(RevokeRolePlan),
206    GrantPrivileges(GrantPrivilegesPlan),
207    RevokePrivileges(RevokePrivilegesPlan),
208    AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
209    ReassignOwned(ReassignOwnedPlan),
210    SideEffectingFunc(SideEffectingFunc),
211    ValidateConnection(ValidateConnectionPlan),
212    AlterRetainHistory(AlterRetainHistoryPlan),
213    AlterSourceTimestampInterval(AlterSourceTimestampIntervalPlan),
214}
215
216impl Plan {
217    /// Expresses which [`StatementKind`] can generate which set of
218    /// [`PlanKind`].
219    pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
220        match stmt {
221            StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
222            StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
223            StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
224            StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
225            StatementKind::AlterObjectRename => &[
226                PlanKind::AlterClusterRename,
227                PlanKind::AlterClusterReplicaRename,
228                PlanKind::AlterItemRename,
229                PlanKind::AlterSchemaRename,
230                PlanKind::AlterNoop,
231            ],
232            StatementKind::AlterObjectSwap => &[
233                PlanKind::AlterClusterSwap,
234                PlanKind::AlterSchemaSwap,
235                PlanKind::AlterNoop,
236            ],
237            StatementKind::AlterRole => &[PlanKind::AlterRole],
238            StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
239            StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
240            StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
241            StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
242            StatementKind::AlterSource => &[
243                PlanKind::AlterNoop,
244                PlanKind::AlterSource,
245                PlanKind::AlterRetainHistory,
246                PlanKind::AlterSourceTimestampInterval,
247            ],
248            StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
249            StatementKind::AlterSystemResetAll => {
250                &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
251            }
252            StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
253            StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
254            StatementKind::AlterTableAddColumn => {
255                &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
256            }
257            StatementKind::AlterMaterializedViewApplyReplacement => &[
258                PlanKind::AlterNoop,
259                PlanKind::AlterMaterializedViewApplyReplacement,
260            ],
261            StatementKind::Close => &[PlanKind::Close],
262            StatementKind::Comment => &[PlanKind::Comment],
263            StatementKind::Commit => &[PlanKind::CommitTransaction],
264            StatementKind::Copy => &[
265                PlanKind::CopyFrom,
266                PlanKind::Select,
267                PlanKind::Subscribe,
268                PlanKind::CopyTo,
269            ],
270            StatementKind::CreateCluster => &[PlanKind::CreateCluster],
271            StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
272            StatementKind::CreateConnection => &[PlanKind::CreateConnection],
273            StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
274            StatementKind::CreateIndex => &[PlanKind::CreateIndex],
275            StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
276            StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
277            StatementKind::CreateRole => &[PlanKind::CreateRole],
278            StatementKind::CreateSchema => &[PlanKind::CreateSchema],
279            StatementKind::CreateSecret => &[PlanKind::CreateSecret],
280            StatementKind::CreateSink => &[PlanKind::CreateSink],
281            StatementKind::CreateSource | StatementKind::CreateSubsource => {
282                &[PlanKind::CreateSource]
283            }
284            StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
285            StatementKind::CreateTable => &[PlanKind::CreateTable],
286            StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
287            StatementKind::CreateType => &[PlanKind::CreateType],
288            StatementKind::CreateView => &[PlanKind::CreateView],
289            StatementKind::Deallocate => &[PlanKind::Deallocate],
290            StatementKind::Declare => &[PlanKind::Declare],
291            StatementKind::Delete => &[PlanKind::ReadThenWrite],
292            StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
293            StatementKind::DropObjects => &[PlanKind::DropObjects],
294            StatementKind::DropOwned => &[PlanKind::DropOwned],
295            StatementKind::Execute => &[PlanKind::Execute],
296            StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
297            StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
298            StatementKind::ExplainAnalyzeObject => &[PlanKind::Select],
299            StatementKind::ExplainAnalyzeCluster => &[PlanKind::Select],
300            StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
301            StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
302            StatementKind::Fetch => &[PlanKind::Fetch],
303            StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
304            StatementKind::GrantRole => &[PlanKind::GrantRole],
305            StatementKind::Insert => &[PlanKind::Insert],
306            StatementKind::Prepare => &[PlanKind::Prepare],
307            StatementKind::Raise => &[PlanKind::Raise],
308            StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
309            StatementKind::ResetVariable => &[PlanKind::ResetVariable],
310            StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
311            StatementKind::RevokeRole => &[PlanKind::RevokeRole],
312            StatementKind::Rollback => &[PlanKind::AbortTransaction],
313            StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
314            StatementKind::SetTransaction => &[PlanKind::SetTransaction],
315            StatementKind::SetVariable => &[PlanKind::SetVariable],
316            StatementKind::Show => &[
317                PlanKind::Select,
318                PlanKind::ShowVariable,
319                PlanKind::ShowCreate,
320                PlanKind::ShowColumns,
321                PlanKind::ShowAllVariables,
322                PlanKind::InspectShard,
323            ],
324            StatementKind::StartTransaction => &[PlanKind::StartTransaction],
325            StatementKind::Subscribe => &[PlanKind::Subscribe],
326            StatementKind::Update => &[PlanKind::ReadThenWrite],
327            StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
328            StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
329        }
330    }
331
332    /// Returns a human readable name of the plan. Meant for use in messages sent back to a user.
333    pub fn name(&self) -> &str {
334        match self {
335            Plan::CreateConnection(_) => "create connection",
336            Plan::CreateDatabase(_) => "create database",
337            Plan::CreateSchema(_) => "create schema",
338            Plan::CreateRole(_) => "create role",
339            Plan::CreateCluster(_) => "create cluster",
340            Plan::CreateClusterReplica(_) => "create cluster replica",
341            Plan::CreateSource(_) => "create source",
342            Plan::CreateSources(_) => "create source",
343            Plan::CreateSecret(_) => "create secret",
344            Plan::CreateSink(_) => "create sink",
345            Plan::CreateTable(_) => "create table",
346            Plan::CreateView(_) => "create view",
347            Plan::CreateMaterializedView(_) => "create materialized view",
348            Plan::CreateIndex(_) => "create index",
349            Plan::CreateType(_) => "create type",
350            Plan::CreateNetworkPolicy(_) => "create network policy",
351            Plan::Comment(_) => "comment",
352            Plan::DiscardTemp => "discard temp",
353            Plan::DiscardAll => "discard all",
354            Plan::DropObjects(plan) => match plan.object_type {
355                ObjectType::Table => "drop table",
356                ObjectType::View => "drop view",
357                ObjectType::MaterializedView => "drop materialized view",
358                ObjectType::Source => "drop source",
359                ObjectType::Sink => "drop sink",
360                ObjectType::Index => "drop index",
361                ObjectType::Type => "drop type",
362                ObjectType::Role => "drop roles",
363                ObjectType::Cluster => "drop clusters",
364                ObjectType::ClusterReplica => "drop cluster replicas",
365                ObjectType::Secret => "drop secret",
366                ObjectType::Connection => "drop connection",
367                ObjectType::Database => "drop database",
368                ObjectType::Schema => "drop schema",
369                ObjectType::Func => "drop function",
370                ObjectType::NetworkPolicy => "drop network policy",
371            },
372            Plan::DropOwned(_) => "drop owned",
373            Plan::EmptyQuery => "do nothing",
374            Plan::ShowAllVariables => "show all variables",
375            Plan::ShowCreate(_) => "show create",
376            Plan::ShowColumns(_) => "show columns",
377            Plan::ShowVariable(_) => "show variable",
378            Plan::InspectShard(_) => "inspect shard",
379            Plan::SetVariable(_) => "set variable",
380            Plan::ResetVariable(_) => "reset variable",
381            Plan::SetTransaction(_) => "set transaction",
382            Plan::StartTransaction(_) => "start transaction",
383            Plan::CommitTransaction(_) => "commit",
384            Plan::AbortTransaction(_) => "abort",
385            Plan::Select(_) => "select",
386            Plan::Subscribe(_) => "subscribe",
387            Plan::CopyFrom(_) => "copy from",
388            Plan::CopyTo(_) => "copy to",
389            Plan::ExplainPlan(_) => "explain plan",
390            Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
391            Plan::ExplainTimestamp(_) => "explain timestamp",
392            Plan::ExplainSinkSchema(_) => "explain schema",
393            Plan::Insert(_) => "insert",
394            Plan::AlterNoop(plan) => match plan.object_type {
395                ObjectType::Table => "alter table",
396                ObjectType::View => "alter view",
397                ObjectType::MaterializedView => "alter materialized view",
398                ObjectType::Source => "alter source",
399                ObjectType::Sink => "alter sink",
400                ObjectType::Index => "alter index",
401                ObjectType::Type => "alter type",
402                ObjectType::Role => "alter role",
403                ObjectType::Cluster => "alter cluster",
404                ObjectType::ClusterReplica => "alter cluster replica",
405                ObjectType::Secret => "alter secret",
406                ObjectType::Connection => "alter connection",
407                ObjectType::Database => "alter database",
408                ObjectType::Schema => "alter schema",
409                ObjectType::Func => "alter function",
410                ObjectType::NetworkPolicy => "alter network policy",
411            },
412            Plan::AlterCluster(_) => "alter cluster",
413            Plan::AlterClusterRename(_) => "alter cluster rename",
414            Plan::AlterClusterSwap(_) => "alter cluster swap",
415            Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
416            Plan::AlterSetCluster(_) => "alter set cluster",
417            Plan::AlterConnection(_) => "alter connection",
418            Plan::AlterSource(_) => "alter source",
419            Plan::AlterItemRename(_) => "rename item",
420            Plan::AlterSchemaRename(_) => "alter rename schema",
421            Plan::AlterSchemaSwap(_) => "alter swap schema",
422            Plan::AlterSecret(_) => "alter secret",
423            Plan::AlterSink(_) => "alter sink",
424            Plan::AlterSystemSet(_) => "alter system",
425            Plan::AlterSystemReset(_) => "alter system",
426            Plan::AlterSystemResetAll(_) => "alter system",
427            Plan::AlterRole(_) => "alter role",
428            Plan::AlterNetworkPolicy(_) => "alter network policy",
429            Plan::AlterOwner(plan) => match plan.object_type {
430                ObjectType::Table => "alter table owner",
431                ObjectType::View => "alter view owner",
432                ObjectType::MaterializedView => "alter materialized view owner",
433                ObjectType::Source => "alter source owner",
434                ObjectType::Sink => "alter sink owner",
435                ObjectType::Index => "alter index owner",
436                ObjectType::Type => "alter type owner",
437                ObjectType::Role => "alter role owner",
438                ObjectType::Cluster => "alter cluster owner",
439                ObjectType::ClusterReplica => "alter cluster replica owner",
440                ObjectType::Secret => "alter secret owner",
441                ObjectType::Connection => "alter connection owner",
442                ObjectType::Database => "alter database owner",
443                ObjectType::Schema => "alter schema owner",
444                ObjectType::Func => "alter function owner",
445                ObjectType::NetworkPolicy => "alter network policy owner",
446            },
447            Plan::AlterTableAddColumn(_) => "alter table add column",
448            Plan::AlterMaterializedViewApplyReplacement(_) => {
449                "alter materialized view apply replacement"
450            }
451            Plan::Declare(_) => "declare",
452            Plan::Fetch(_) => "fetch",
453            Plan::Close(_) => "close",
454            Plan::ReadThenWrite(plan) => match plan.kind {
455                MutationKind::Insert => "insert into select",
456                MutationKind::Update => "update",
457                MutationKind::Delete => "delete",
458            },
459            Plan::Prepare(_) => "prepare",
460            Plan::Execute(_) => "execute",
461            Plan::Deallocate(_) => "deallocate",
462            Plan::Raise(_) => "raise",
463            Plan::GrantRole(_) => "grant role",
464            Plan::RevokeRole(_) => "revoke role",
465            Plan::GrantPrivileges(_) => "grant privilege",
466            Plan::RevokePrivileges(_) => "revoke privilege",
467            Plan::AlterDefaultPrivileges(_) => "alter default privileges",
468            Plan::ReassignOwned(_) => "reassign owned",
469            Plan::SideEffectingFunc(_) => "side effecting func",
470            Plan::ValidateConnection(_) => "validate connection",
471            Plan::AlterRetainHistory(_) => "alter retain history",
472            Plan::AlterSourceTimestampInterval(_) => "alter source timestamp interval",
473        }
474    }
475
476    /// Returns `true` iff this `Plan` is allowed to be executed in read-only
477    /// mode.
478    ///
479    /// We use an explicit allow-list, to avoid future additions automatically
480    /// falling into the `true` category.
481    pub fn allowed_in_read_only(&self) -> bool {
482        match self {
483            // These two set non-durable session variables, so are okay in
484            // read-only mode.
485            Plan::SetVariable(_) => true,
486            Plan::ResetVariable(_) => true,
487            Plan::SetTransaction(_) => true,
488            Plan::StartTransaction(_) => true,
489            Plan::CommitTransaction(_) => true,
490            Plan::AbortTransaction(_) => true,
491            Plan::Select(_) => true,
492            Plan::EmptyQuery => true,
493            Plan::ShowAllVariables => true,
494            Plan::ShowCreate(_) => true,
495            Plan::ShowColumns(_) => true,
496            Plan::ShowVariable(_) => true,
497            Plan::InspectShard(_) => true,
498            Plan::Subscribe(_) => true,
499            Plan::CopyTo(_) => true,
500            Plan::ExplainPlan(_) => true,
501            Plan::ExplainPushdown(_) => true,
502            Plan::ExplainTimestamp(_) => true,
503            Plan::ExplainSinkSchema(_) => true,
504            Plan::ValidateConnection(_) => true,
505            _ => false,
506        }
507    }
508}
509
510#[derive(Debug)]
511pub struct StartTransactionPlan {
512    pub access: Option<TransactionAccessMode>,
513    pub isolation_level: Option<TransactionIsolationLevel>,
514}
515
516#[derive(Debug)]
517pub enum TransactionType {
518    Explicit,
519    Implicit,
520}
521
522impl TransactionType {
523    pub fn is_explicit(&self) -> bool {
524        matches!(self, TransactionType::Explicit)
525    }
526
527    pub fn is_implicit(&self) -> bool {
528        matches!(self, TransactionType::Implicit)
529    }
530}
531
532#[derive(Debug)]
533pub struct CommitTransactionPlan {
534    pub transaction_type: TransactionType,
535}
536
537#[derive(Debug)]
538pub struct AbortTransactionPlan {
539    pub transaction_type: TransactionType,
540}
541
542#[derive(Debug)]
543pub struct CreateDatabasePlan {
544    pub name: String,
545    pub if_not_exists: bool,
546}
547
548#[derive(Debug)]
549pub struct CreateSchemaPlan {
550    pub database_spec: ResolvedDatabaseSpecifier,
551    pub schema_name: String,
552    pub if_not_exists: bool,
553}
554
555#[derive(Debug)]
556pub struct CreateRolePlan {
557    pub name: String,
558    pub attributes: RoleAttributesRaw,
559}
560
561#[derive(Debug, PartialEq, Eq, Clone)]
562pub struct CreateClusterPlan {
563    pub name: String,
564    pub variant: CreateClusterVariant,
565    pub workload_class: Option<String>,
566}
567
568#[derive(Debug, PartialEq, Eq, Clone)]
569pub enum CreateClusterVariant {
570    Managed(CreateClusterManagedPlan),
571    Unmanaged(CreateClusterUnmanagedPlan),
572}
573
574#[derive(Debug, PartialEq, Eq, Clone)]
575pub struct CreateClusterUnmanagedPlan {
576    pub replicas: Vec<(String, ReplicaConfig)>,
577}
578
579#[derive(Debug, PartialEq, Eq, Clone)]
580pub struct CreateClusterManagedPlan {
581    pub replication_factor: u32,
582    pub size: String,
583    pub availability_zones: Vec<String>,
584    pub compute: ComputeReplicaConfig,
585    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
586    pub schedule: ClusterSchedule,
587}
588
589#[derive(Debug)]
590pub struct CreateClusterReplicaPlan {
591    pub cluster_id: ClusterId,
592    pub name: String,
593    pub config: ReplicaConfig,
594}
595
596/// Configuration of introspection for a cluster replica.
597#[derive(
598    Clone,
599    Copy,
600    Debug,
601    Serialize,
602    Deserialize,
603    PartialOrd,
604    Ord,
605    PartialEq,
606    Eq
607)]
608pub struct ComputeReplicaIntrospectionConfig {
609    /// Whether to introspect the introspection.
610    pub debugging: bool,
611    /// The interval at which to introspect.
612    pub interval: Duration,
613}
614
615#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
616pub struct ComputeReplicaConfig {
617    pub introspection: Option<ComputeReplicaIntrospectionConfig>,
618}
619
620#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
621pub enum ReplicaConfig {
622    Unorchestrated {
623        storagectl_addrs: Vec<String>,
624        computectl_addrs: Vec<String>,
625        compute: ComputeReplicaConfig,
626    },
627    Orchestrated {
628        size: String,
629        availability_zone: Option<String>,
630        compute: ComputeReplicaConfig,
631        internal: bool,
632        billed_as: Option<String>,
633    },
634}
635
636#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
637pub enum ClusterSchedule {
638    /// The system won't automatically turn the cluster On or Off.
639    Manual,
640    /// The cluster will be On when a REFRESH materialized view on it needs to refresh.
641    /// `hydration_time_estimate` determines how much time before a refresh to turn the
642    /// cluster On, so that it can rehydrate already before the refresh time.
643    Refresh { hydration_time_estimate: Duration },
644}
645
646impl Default for ClusterSchedule {
647    fn default() -> Self {
648        // (Has to be consistent with `impl Default for ClusterScheduleOptionValue`.)
649        ClusterSchedule::Manual
650    }
651}
652
653#[derive(Debug)]
654pub struct CreateSourcePlan {
655    pub name: QualifiedItemName,
656    pub source: Source,
657    pub if_not_exists: bool,
658    pub timeline: Timeline,
659    // None for subsources, which run on the parent cluster.
660    pub in_cluster: Option<ClusterId>,
661}
662
663#[derive(Clone, Debug, PartialEq, Eq)]
664pub struct SourceReferences {
665    pub updated_at: u64,
666    pub references: Vec<SourceReference>,
667}
668
669/// An available external reference for a source and if possible to retrieve,
670/// any column names it contains.
671#[derive(Clone, Debug, PartialEq, Eq)]
672pub struct SourceReference {
673    pub name: String,
674    pub namespace: Option<String>,
675    pub columns: Vec<String>,
676}
677
678/// A [`CreateSourcePlan`] and the metadata necessary to sequence it.
679#[derive(Debug)]
680pub struct CreateSourcePlanBundle {
681    /// ID of this source in the Catalog.
682    pub item_id: CatalogItemId,
683    /// ID used to reference this source from outside the catalog, e.g. compute.
684    pub global_id: GlobalId,
685    /// Details of the source to create.
686    pub plan: CreateSourcePlan,
687    /// Other catalog objects that are referenced by this source, determined at name resolution.
688    pub resolved_ids: ResolvedIds,
689    /// All the available upstream references for this source.
690    /// Populated for top-level sources that can contain subsources/tables
691    /// and used during sequencing to populate the appropriate catalog fields.
692    pub available_source_references: Option<SourceReferences>,
693}
694
695#[derive(Debug)]
696pub struct CreateConnectionPlan {
697    pub name: QualifiedItemName,
698    pub if_not_exists: bool,
699    pub connection: Connection,
700    pub validate: bool,
701}
702
703#[derive(Debug)]
704pub struct ValidateConnectionPlan {
705    /// ID of the connection in the Catalog.
706    pub id: CatalogItemId,
707    /// The connection to validate.
708    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
709}
710
711#[derive(Debug)]
712pub struct CreateSecretPlan {
713    pub name: QualifiedItemName,
714    pub secret: Secret,
715    pub if_not_exists: bool,
716}
717
718#[derive(Debug)]
719pub struct CreateSinkPlan {
720    pub name: QualifiedItemName,
721    pub sink: Sink,
722    pub with_snapshot: bool,
723    pub if_not_exists: bool,
724    pub in_cluster: ClusterId,
725}
726
727#[derive(Debug)]
728pub struct CreateTablePlan {
729    pub name: QualifiedItemName,
730    pub table: Table,
731    pub if_not_exists: bool,
732}
733
734#[derive(Debug, Clone)]
735pub struct CreateViewPlan {
736    pub name: QualifiedItemName,
737    pub view: View,
738    /// The Catalog objects that this view is replacing, if any.
739    pub replace: Option<CatalogItemId>,
740    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
741    pub drop_ids: Vec<CatalogItemId>,
742    pub if_not_exists: bool,
743    /// True if the view contains an expression that can make the exact column list
744    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
745    pub ambiguous_columns: bool,
746}
747
748#[derive(Debug, Clone)]
749pub struct CreateMaterializedViewPlan {
750    pub name: QualifiedItemName,
751    pub materialized_view: MaterializedView,
752    /// The Catalog objects that this materialized view is replacing, if any.
753    pub replace: Option<CatalogItemId>,
754    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
755    pub drop_ids: Vec<CatalogItemId>,
756    pub if_not_exists: bool,
757    /// True if the materialized view contains an expression that can make the exact column list
758    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
759    pub ambiguous_columns: bool,
760}
761
762#[derive(Debug, Clone)]
763pub struct CreateNetworkPolicyPlan {
764    pub name: String,
765    pub rules: Vec<NetworkPolicyRule>,
766}
767
768#[derive(Debug, Clone)]
769pub struct AlterNetworkPolicyPlan {
770    pub id: NetworkPolicyId,
771    pub name: String,
772    pub rules: Vec<NetworkPolicyRule>,
773}
774
775#[derive(Debug, Clone)]
776pub struct CreateIndexPlan {
777    pub name: QualifiedItemName,
778    pub index: Index,
779    pub if_not_exists: bool,
780}
781
782#[derive(Debug)]
783pub struct CreateTypePlan {
784    pub name: QualifiedItemName,
785    pub typ: Type,
786}
787
788#[derive(Debug)]
789pub struct DropObjectsPlan {
790    /// The IDs of only the objects directly referenced in the `DROP` statement.
791    pub referenced_ids: Vec<ObjectId>,
792    /// All object IDs to drop. Includes `referenced_ids` and all descendants.
793    pub drop_ids: Vec<ObjectId>,
794    /// The type of object that was dropped explicitly in the DROP statement. `ids` may contain
795    /// objects of different types due to CASCADE.
796    pub object_type: ObjectType,
797}
798
799#[derive(Debug)]
800pub struct DropOwnedPlan {
801    /// The role IDs that own the objects.
802    pub role_ids: Vec<RoleId>,
803    /// All object IDs to drop.
804    pub drop_ids: Vec<ObjectId>,
805    /// The privileges to revoke.
806    pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
807    /// The default privileges to revoke.
808    pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
809}
810
811#[derive(Debug)]
812pub struct ShowVariablePlan {
813    pub name: String,
814}
815
816#[derive(Debug)]
817pub struct InspectShardPlan {
818    /// ID of the storage collection to inspect.
819    pub id: GlobalId,
820}
821
822#[derive(Debug)]
823pub struct SetVariablePlan {
824    pub name: String,
825    pub value: VariableValue,
826    pub local: bool,
827}
828
829#[derive(Debug)]
830pub enum VariableValue {
831    Default,
832    Values(Vec<String>),
833}
834
835#[derive(Debug)]
836pub struct ResetVariablePlan {
837    pub name: String,
838}
839
840#[derive(Debug)]
841pub struct SetTransactionPlan {
842    pub local: bool,
843    pub modes: Vec<TransactionMode>,
844}
845
846/// A plan for select statements.
847#[derive(Clone, Debug)]
848pub struct SelectPlan {
849    /// The `SELECT` statement itself. Used for explain/notices, but not otherwise
850    /// load-bearing. Boxed to save stack space.
851    pub select: Option<Box<SelectStatement<Aug>>>,
852    /// The plan as a HIR.
853    pub source: HirRelationExpr,
854    /// At what time should this select happen?
855    pub when: QueryWhen,
856    /// Instructions how to form the result set.
857    pub finishing: RowSetFinishing,
858    /// For `COPY TO STDOUT`, the format to use.
859    pub copy_to: Option<CopyFormat>,
860}
861
862impl SelectPlan {
863    pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
864        let arity = typ.arity();
865        SelectPlan {
866            select: None,
867            source: HirRelationExpr::Constant { rows, typ },
868            when: QueryWhen::Immediately,
869            finishing: RowSetFinishing::trivial(arity),
870            copy_to: None,
871        }
872    }
873}
874
875#[derive(Debug, Clone)]
876pub enum SubscribeOutput {
877    Diffs,
878    WithinTimestampOrderBy {
879        /// We pretend that mz_diff is prepended to the normal columns, making it index 0
880        order_by: Vec<ColumnOrder>,
881    },
882    EnvelopeUpsert {
883        /// Order by with just keys
884        order_by_keys: Vec<ColumnOrder>,
885    },
886    EnvelopeDebezium {
887        /// Order by with just keys
888        order_by_keys: Vec<ColumnOrder>,
889    },
890}
891
892impl SubscribeOutput {
893    pub fn row_order(&self) -> &[ColumnOrder] {
894        match self {
895            SubscribeOutput::Diffs => &[],
896            // This ordering prepends the diff, so its `order_by` field cannot be applied to rows.
897            SubscribeOutput::WithinTimestampOrderBy { .. } => &[],
898            SubscribeOutput::EnvelopeUpsert { order_by_keys } => order_by_keys,
899            SubscribeOutput::EnvelopeDebezium { order_by_keys } => order_by_keys,
900        }
901    }
902}
903
904#[derive(Debug, Clone)]
905pub struct SubscribePlan {
906    pub from: SubscribeFrom,
907    pub with_snapshot: bool,
908    pub when: QueryWhen,
909    pub up_to: Option<Timestamp>,
910    pub copy_to: Option<CopyFormat>,
911    pub emit_progress: bool,
912    pub output: SubscribeOutput,
913}
914
915#[derive(Debug, Clone)]
916pub enum SubscribeFrom {
917    /// ID of the collection to subscribe to.
918    Id(GlobalId),
919    /// Query to subscribe to.
920    Query {
921        expr: HirRelationExpr,
922        desc: RelationDesc,
923    },
924}
925
926impl SubscribeFrom {
927    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
928        match self {
929            SubscribeFrom::Id(id) => BTreeSet::from([*id]),
930            SubscribeFrom::Query { expr, .. } => expr.depends_on(),
931        }
932    }
933
934    pub fn contains_temporal(&self) -> bool {
935        match self {
936            SubscribeFrom::Id(_) => false,
937            SubscribeFrom::Query { expr, .. } => expr
938                .contains_temporal()
939                .expect("Unexpected error in `visit_scalars` call"),
940        }
941    }
942}
943
944#[derive(Debug)]
945pub struct ShowCreatePlan {
946    pub id: ObjectId,
947    pub row: Row,
948}
949
950#[derive(Debug)]
951pub struct ShowColumnsPlan {
952    pub id: CatalogItemId,
953    pub select_plan: SelectPlan,
954    pub new_resolved_ids: ResolvedIds,
955}
956
957#[derive(Debug)]
958pub struct CopyFromPlan {
959    /// Table we're copying into.
960    pub target_id: CatalogItemId,
961    /// Human-readable full name of the target table.
962    pub target_name: String,
963    /// Source we're copying data from.
964    pub source: CopyFromSource,
965    /// How input columns map to those on the destination table.
966    ///
967    /// TODO(cf2): Remove this field in favor of the mfp.
968    pub columns: Vec<ColumnIndex>,
969    /// [`RelationDesc`] describing the input data.
970    pub source_desc: RelationDesc,
971    /// Changes the shape of the input data to match the destination table.
972    pub mfp: MapFilterProject,
973    /// Format specific params for copying the input data.
974    pub params: CopyFormatParams<'static>,
975    /// Filter for the source files we're copying from, e.g. an S3 prefix.
976    pub filter: Option<CopyFromFilter>,
977}
978
979#[derive(Debug)]
980pub enum CopyFromSource {
981    /// Copying from a file local to the user, transmitted via pgwire.
982    Stdin,
983    /// A remote resource, e.g. HTTP file.
984    ///
985    /// The contained [`HirScalarExpr`] evaluates to the Url for the remote resource.
986    Url(HirScalarExpr),
987    /// A file in an S3 bucket.
988    AwsS3 {
989        /// Expression that evaluates to the file we want to copy.
990        uri: HirScalarExpr,
991        /// Details for how we connect to AWS S3.
992        connection: AwsConnection,
993        /// ID of the connection object.
994        connection_id: CatalogItemId,
995    },
996}
997
998#[derive(Debug)]
999pub enum CopyFromFilter {
1000    Files(Vec<String>),
1001    Pattern(String),
1002}
1003
1004/// `COPY TO S3`
1005///
1006/// (This is a completely different thing from `COPY TO STDOUT`. That is a `Plan::Select` with
1007/// `copy_to` set.)
1008#[derive(Debug, Clone)]
1009pub struct CopyToPlan {
1010    /// The select query plan whose data will be copied to destination uri.
1011    pub select_plan: SelectPlan,
1012    pub desc: RelationDesc,
1013    /// The scalar expression to be resolved to get the destination uri.
1014    pub to: HirScalarExpr,
1015    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
1016    /// The ID of the connection.
1017    pub connection_id: CatalogItemId,
1018    pub format: S3SinkFormat,
1019    pub max_file_size: u64,
1020}
1021
1022#[derive(Clone, Debug)]
1023pub struct ExplainPlanPlan {
1024    pub stage: ExplainStage,
1025    pub format: ExplainFormat,
1026    pub config: ExplainConfig,
1027    pub explainee: Explainee,
1028}
1029
1030/// The type of object to be explained
1031#[derive(Clone, Debug)]
1032pub enum Explainee {
1033    /// Lookup and explain a plan saved for an view.
1034    View(CatalogItemId),
1035    /// Lookup and explain a plan saved for an existing materialized view.
1036    MaterializedView(CatalogItemId),
1037    /// Lookup and explain a plan saved for an existing index.
1038    Index(CatalogItemId),
1039    /// Replan an existing view.
1040    ReplanView(CatalogItemId),
1041    /// Replan an existing materialized view.
1042    ReplanMaterializedView(CatalogItemId),
1043    /// Replan an existing index.
1044    ReplanIndex(CatalogItemId),
1045    /// A SQL statement.
1046    Statement(ExplaineeStatement),
1047}
1048
1049/// Explainee types that are statements.
1050#[derive(Clone, Debug, EnumKind)]
1051#[enum_kind(ExplaineeStatementKind)]
1052pub enum ExplaineeStatement {
1053    /// The object to be explained is a SELECT statement.
1054    Select {
1055        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1056        broken: bool,
1057        plan: plan::SelectPlan,
1058        desc: RelationDesc,
1059    },
1060    /// The object to be explained is a CREATE VIEW.
1061    CreateView {
1062        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1063        broken: bool,
1064        plan: plan::CreateViewPlan,
1065    },
1066    /// The object to be explained is a CREATE MATERIALIZED VIEW.
1067    CreateMaterializedView {
1068        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1069        broken: bool,
1070        plan: plan::CreateMaterializedViewPlan,
1071    },
1072    /// The object to be explained is a CREATE INDEX.
1073    CreateIndex {
1074        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1075        broken: bool,
1076        plan: plan::CreateIndexPlan,
1077    },
1078    /// The object to be explained is a SUBSCRIBE statement.
1079    Subscribe {
1080        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1081        broken: bool,
1082        plan: plan::SubscribePlan,
1083    },
1084}
1085
1086impl ExplaineeStatement {
1087    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1088        match self {
1089            Self::Select { plan, .. } => plan.source.depends_on(),
1090            Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1091            Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1092            Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1093            Self::Subscribe { plan, .. } => plan.from.depends_on(),
1094        }
1095    }
1096
1097    /// Statements that have their `broken` flag set are expected to cause a
1098    /// panic in the optimizer code. In this case:
1099    ///
1100    /// 1. The optimizer pipeline execution will stop, but the panic will be
1101    ///    intercepted and will not propagate to the caller. The partial
1102    ///    optimizer trace collected until this point will be available.
1103    /// 2. The optimizer trace tracing subscriber will delegate regular tracing
1104    ///    spans and events to the default subscriber.
1105    ///
1106    /// This is useful when debugging queries that cause panics.
1107    pub fn broken(&self) -> bool {
1108        match self {
1109            Self::Select { broken, .. } => *broken,
1110            Self::CreateView { broken, .. } => *broken,
1111            Self::CreateMaterializedView { broken, .. } => *broken,
1112            Self::CreateIndex { broken, .. } => *broken,
1113            Self::Subscribe { broken, .. } => *broken,
1114        }
1115    }
1116}
1117
1118impl ExplaineeStatementKind {
1119    pub fn supports(&self, stage: &ExplainStage) -> bool {
1120        use ExplainStage::*;
1121        match self {
1122            Self::Select => true,
1123            Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1124            Self::CreateMaterializedView => true,
1125            Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1126            // SUBSCRIBE doesn't support RAW, DECORRELATED, or LOCAL stages because
1127            // it takes MIR directly rather than going through HIR lowering.
1128            Self::Subscribe => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1129        }
1130    }
1131}
1132
1133impl std::fmt::Display for ExplaineeStatementKind {
1134    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1135        match self {
1136            Self::Select => write!(f, "SELECT"),
1137            Self::CreateView => write!(f, "CREATE VIEW"),
1138            Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1139            Self::CreateIndex => write!(f, "CREATE INDEX"),
1140            Self::Subscribe => write!(f, "SUBSCRIBE"),
1141        }
1142    }
1143}
1144
1145#[derive(Clone, Debug)]
1146pub struct ExplainPushdownPlan {
1147    pub explainee: Explainee,
1148}
1149
1150#[derive(Clone, Debug)]
1151pub struct ExplainTimestampPlan {
1152    pub format: ExplainFormat,
1153    pub raw_plan: HirRelationExpr,
1154    pub when: QueryWhen,
1155}
1156
1157#[derive(Debug)]
1158pub struct ExplainSinkSchemaPlan {
1159    pub sink_from: GlobalId,
1160    pub json_schema: String,
1161}
1162
1163#[derive(Debug)]
1164pub struct SendDiffsPlan {
1165    pub id: CatalogItemId,
1166    pub updates: Vec<(Row, Diff)>,
1167    pub kind: MutationKind,
1168    pub returning: Vec<(Row, NonZeroUsize)>,
1169    pub max_result_size: u64,
1170}
1171
1172#[derive(Debug)]
1173pub struct InsertPlan {
1174    pub id: CatalogItemId,
1175    pub values: HirRelationExpr,
1176    pub returning: Vec<mz_expr::MirScalarExpr>,
1177}
1178
1179#[derive(Debug)]
1180pub struct ReadThenWritePlan {
1181    pub id: CatalogItemId,
1182    pub selection: HirRelationExpr,
1183    pub finishing: RowSetFinishing,
1184    pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1185    pub kind: MutationKind,
1186    pub returning: Vec<mz_expr::MirScalarExpr>,
1187}
1188
1189/// Generated by `ALTER ... IF EXISTS` if the named object did not exist.
1190#[derive(Debug)]
1191pub struct AlterNoopPlan {
1192    pub object_type: ObjectType,
1193}
1194
1195#[derive(Debug)]
1196pub struct AlterSetClusterPlan {
1197    pub id: CatalogItemId,
1198    pub set_cluster: ClusterId,
1199}
1200
1201#[derive(Debug)]
1202pub struct AlterRetainHistoryPlan {
1203    pub id: CatalogItemId,
1204    pub value: Option<Value>,
1205    pub window: CompactionWindow,
1206    pub object_type: ObjectType,
1207}
1208
1209#[derive(Debug)]
1210pub struct AlterSourceTimestampIntervalPlan {
1211    pub id: CatalogItemId,
1212    pub value: Option<Value>,
1213    pub interval: Duration,
1214}
1215
1216#[derive(Debug, Clone)]
1217
1218pub enum AlterOptionParameter<T = String> {
1219    Set(T),
1220    Reset,
1221    Unchanged,
1222}
1223
1224#[derive(Debug)]
1225pub enum AlterConnectionAction {
1226    RotateKeys,
1227    AlterOptions {
1228        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1229        drop_options: BTreeSet<ConnectionOptionName>,
1230        validate: bool,
1231    },
1232}
1233
1234#[derive(Debug)]
1235pub struct AlterConnectionPlan {
1236    pub id: CatalogItemId,
1237    pub action: AlterConnectionAction,
1238}
1239
1240#[derive(Debug)]
1241pub enum AlterSourceAction {
1242    AddSubsourceExports {
1243        subsources: Vec<CreateSourcePlanBundle>,
1244        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1245    },
1246    RefreshReferences {
1247        references: SourceReferences,
1248    },
1249}
1250
1251#[derive(Debug)]
1252pub struct AlterSourcePlan {
1253    pub item_id: CatalogItemId,
1254    pub ingestion_id: GlobalId,
1255    pub action: AlterSourceAction,
1256}
1257
1258#[derive(Debug, Clone)]
1259pub struct AlterSinkPlan {
1260    pub item_id: CatalogItemId,
1261    pub global_id: GlobalId,
1262    pub sink: Sink,
1263    pub with_snapshot: bool,
1264    pub in_cluster: ClusterId,
1265}
1266
1267#[derive(Debug, Clone)]
1268pub struct AlterClusterPlan {
1269    pub id: ClusterId,
1270    pub name: String,
1271    pub options: PlanClusterOption,
1272    pub strategy: AlterClusterPlanStrategy,
1273}
1274
1275#[derive(Debug)]
1276pub struct AlterClusterRenamePlan {
1277    pub id: ClusterId,
1278    pub name: String,
1279    pub to_name: String,
1280}
1281
1282#[derive(Debug)]
1283pub struct AlterClusterReplicaRenamePlan {
1284    pub cluster_id: ClusterId,
1285    pub replica_id: ReplicaId,
1286    pub name: QualifiedReplica,
1287    pub to_name: String,
1288}
1289
1290#[derive(Debug)]
1291pub struct AlterItemRenamePlan {
1292    pub id: CatalogItemId,
1293    pub current_full_name: FullItemName,
1294    pub to_name: String,
1295    pub object_type: ObjectType,
1296}
1297
1298#[derive(Debug)]
1299pub struct AlterSchemaRenamePlan {
1300    pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1301    pub new_schema_name: String,
1302}
1303
1304#[derive(Debug)]
1305pub struct AlterSchemaSwapPlan {
1306    pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1307    pub schema_a_name: String,
1308    pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1309    pub schema_b_name: String,
1310    pub name_temp: String,
1311}
1312
1313#[derive(Debug)]
1314pub struct AlterClusterSwapPlan {
1315    pub id_a: ClusterId,
1316    pub id_b: ClusterId,
1317    pub name_a: String,
1318    pub name_b: String,
1319    pub name_temp: String,
1320}
1321
1322#[derive(Debug)]
1323pub struct AlterSecretPlan {
1324    pub id: CatalogItemId,
1325    pub secret_as: MirScalarExpr,
1326}
1327
1328#[derive(Debug)]
1329pub struct AlterSystemSetPlan {
1330    pub name: String,
1331    pub value: VariableValue,
1332}
1333
1334#[derive(Debug)]
1335pub struct AlterSystemResetPlan {
1336    pub name: String,
1337}
1338
1339#[derive(Debug)]
1340pub struct AlterSystemResetAllPlan {}
1341
1342#[derive(Debug)]
1343pub struct AlterRolePlan {
1344    pub id: RoleId,
1345    pub name: String,
1346    pub option: PlannedAlterRoleOption,
1347}
1348
1349#[derive(Debug)]
1350pub struct AlterOwnerPlan {
1351    pub id: ObjectId,
1352    pub object_type: ObjectType,
1353    pub new_owner: RoleId,
1354}
1355
1356#[derive(Debug)]
1357pub struct AlterTablePlan {
1358    pub relation_id: CatalogItemId,
1359    pub column_name: ColumnName,
1360    pub column_type: SqlColumnType,
1361    pub raw_sql_type: RawDataType,
1362}
1363
1364#[derive(Debug, Clone)]
1365pub struct AlterMaterializedViewApplyReplacementPlan {
1366    pub id: CatalogItemId,
1367    pub replacement_id: CatalogItemId,
1368}
1369
1370#[derive(Debug)]
1371pub struct DeclarePlan {
1372    pub name: String,
1373    pub stmt: Statement<Raw>,
1374    pub sql: String,
1375    pub params: Params,
1376}
1377
1378#[derive(Debug)]
1379pub struct FetchPlan {
1380    pub name: String,
1381    pub count: Option<FetchDirection>,
1382    pub timeout: ExecuteTimeout,
1383}
1384
1385#[derive(Debug)]
1386pub struct ClosePlan {
1387    pub name: String,
1388}
1389
1390#[derive(Debug)]
1391pub struct PreparePlan {
1392    pub name: String,
1393    pub stmt: Statement<Raw>,
1394    pub sql: String,
1395    pub desc: StatementDesc,
1396}
1397
1398#[derive(Debug)]
1399pub struct ExecutePlan {
1400    pub name: String,
1401    pub params: Params,
1402}
1403
1404#[derive(Debug)]
1405pub struct DeallocatePlan {
1406    pub name: Option<String>,
1407}
1408
1409#[derive(Debug)]
1410pub struct RaisePlan {
1411    pub severity: NoticeSeverity,
1412}
1413
1414#[derive(Debug)]
1415pub struct GrantRolePlan {
1416    /// The roles that are gaining members.
1417    pub role_ids: Vec<RoleId>,
1418    /// The roles that will be added to `role_id`.
1419    pub member_ids: Vec<RoleId>,
1420    /// The role that granted the membership.
1421    pub grantor_id: RoleId,
1422}
1423
1424#[derive(Debug)]
1425pub struct RevokeRolePlan {
1426    /// The roles that are losing members.
1427    pub role_ids: Vec<RoleId>,
1428    /// The roles that will be removed from `role_id`.
1429    pub member_ids: Vec<RoleId>,
1430    /// The role that revoked the membership.
1431    pub grantor_id: RoleId,
1432}
1433
1434#[derive(Debug)]
1435pub struct UpdatePrivilege {
1436    /// The privileges being granted/revoked on an object.
1437    pub acl_mode: AclMode,
1438    /// The ID of the object receiving privileges.
1439    pub target_id: SystemObjectId,
1440    /// The role that is granting the privileges.
1441    pub grantor: RoleId,
1442}
1443
1444#[derive(Debug)]
1445pub struct GrantPrivilegesPlan {
1446    /// Description of each privilege being granted.
1447    pub update_privileges: Vec<UpdatePrivilege>,
1448    /// The roles that will granted the privileges.
1449    pub grantees: Vec<RoleId>,
1450}
1451
1452#[derive(Debug)]
1453pub struct RevokePrivilegesPlan {
1454    /// Description of each privilege being revoked.
1455    pub update_privileges: Vec<UpdatePrivilege>,
1456    /// The roles that will have privileges revoked.
1457    pub revokees: Vec<RoleId>,
1458}
1459#[derive(Debug)]
1460pub struct AlterDefaultPrivilegesPlan {
1461    /// Description of objects that match this default privilege.
1462    pub privilege_objects: Vec<DefaultPrivilegeObject>,
1463    /// The privilege to be granted/revoked from the matching objects.
1464    pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1465    /// Whether this is a grant or revoke.
1466    pub is_grant: bool,
1467}
1468
1469#[derive(Debug)]
1470pub struct ReassignOwnedPlan {
1471    /// The roles whose owned objects are being reassigned.
1472    pub old_roles: Vec<RoleId>,
1473    /// The new owner of the objects.
1474    pub new_role: RoleId,
1475    /// All object IDs to reassign.
1476    pub reassign_ids: Vec<ObjectId>,
1477}
1478
1479#[derive(Debug)]
1480pub struct CommentPlan {
1481    /// The object that this comment is associated with.
1482    pub object_id: CommentObjectId,
1483    /// A sub-component of the object that this comment is associated with, e.g. a column.
1484    ///
1485    /// TODO(parkmycar): <https://github.com/MaterializeInc/database-issues/issues/6711>.
1486    pub sub_component: Option<usize>,
1487    /// The comment itself. If `None` that indicates we should clear the existing comment.
1488    pub comment: Option<String>,
1489}
1490
1491#[derive(Clone, Debug)]
1492pub enum TableDataSource {
1493    /// The table owns data created via INSERT/UPDATE/DELETE statements.
1494    TableWrites { defaults: Vec<Expr<Aug>> },
1495
1496    /// The table receives its data from the identified `DataSourceDesc`.
1497    /// This table type does not support INSERT/UPDATE/DELETE statements.
1498    DataSource {
1499        desc: DataSourceDesc,
1500        timeline: Timeline,
1501    },
1502}
1503
1504#[derive(Clone, Debug)]
1505pub struct Table {
1506    pub create_sql: String,
1507    pub desc: VersionedRelationDesc,
1508    pub temporary: bool,
1509    pub compaction_window: Option<CompactionWindow>,
1510    pub data_source: TableDataSource,
1511}
1512
1513#[derive(Clone, Debug)]
1514pub struct Source {
1515    pub create_sql: String,
1516    pub data_source: DataSourceDesc,
1517    pub desc: RelationDesc,
1518    pub compaction_window: Option<CompactionWindow>,
1519}
1520
1521#[derive(Debug, Clone)]
1522pub enum DataSourceDesc {
1523    /// Receives data from an external system.
1524    Ingestion(SourceDesc<ReferencedConnection>),
1525    /// Receives data from an external system.
1526    OldSyntaxIngestion {
1527        desc: SourceDesc<ReferencedConnection>,
1528        // If we're dealing with an old syntax ingestion the progress id will be some other collection
1529        // and the ingestion itself will have the data from a default external reference
1530        progress_subsource: CatalogItemId,
1531        data_config: SourceExportDataConfig<ReferencedConnection>,
1532        details: SourceExportDetails,
1533    },
1534    /// This source receives its data from the identified ingestion,
1535    /// specifically the output identified by `external_reference`.
1536    IngestionExport {
1537        ingestion_id: CatalogItemId,
1538        external_reference: UnresolvedItemName,
1539        details: SourceExportDetails,
1540        data_config: SourceExportDataConfig<ReferencedConnection>,
1541    },
1542    /// Receives data from the source's reclocking/remapping operations.
1543    Progress,
1544    /// Receives data from HTTP post requests.
1545    Webhook {
1546        validate_using: Option<WebhookValidation>,
1547        body_format: WebhookBodyFormat,
1548        headers: WebhookHeaders,
1549        /// Only `Some` when created via `CREATE TABLE ... FROM WEBHOOK`.
1550        cluster_id: Option<StorageInstanceId>,
1551    },
1552}
1553
1554#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1555pub struct WebhookValidation {
1556    /// The expression used to validate a request.
1557    pub expression: MirScalarExpr,
1558    /// Description of the source that will be created.
1559    pub relation_desc: RelationDesc,
1560    /// The column index to provide the request body and whether to provide it as bytes.
1561    pub bodies: Vec<(usize, bool)>,
1562    /// The column index to provide the request headers and whether to provide the values as bytes.
1563    pub headers: Vec<(usize, bool)>,
1564    /// Any secrets that are used in that validation.
1565    pub secrets: Vec<WebhookValidationSecret>,
1566}
1567
1568impl WebhookValidation {
1569    const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1570
1571    /// Attempt to reduce the internal [`MirScalarExpr`] into a simpler expression.
1572    ///
1573    /// The reduction happens on a separate thread, we also only wait for
1574    /// `WebhookValidation::MAX_REDUCE_TIME` before timing out and returning an error.
1575    pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1576        let WebhookValidation {
1577            expression,
1578            relation_desc,
1579            ..
1580        } = self;
1581
1582        // On a different thread, attempt to reduce the expression.
1583        let mut expression_ = expression.clone();
1584        let desc_ = relation_desc.clone();
1585        let reduce_task = mz_ore::task::spawn_blocking(
1586            || "webhook-validation-reduce",
1587            move || {
1588                let repr_col_types: Vec<ReprColumnType> = desc_
1589                    .typ()
1590                    .column_types
1591                    .iter()
1592                    .map(ReprColumnType::from)
1593                    .collect();
1594                expression_.reduce(&repr_col_types);
1595                expression_
1596            },
1597        );
1598
1599        match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1600            Ok(reduced_expr) => {
1601                *expression = reduced_expr;
1602                Ok(())
1603            }
1604            Err(_) => Err("timeout"),
1605        }
1606    }
1607}
1608
1609#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1610pub struct WebhookHeaders {
1611    /// Optionally include a column named `headers` whose content is possibly filtered.
1612    pub header_column: Option<WebhookHeaderFilters>,
1613    /// The column index to provide the specific request header, and whether to provide it as bytes.
1614    pub mapped_headers: BTreeMap<usize, (String, bool)>,
1615}
1616
1617impl WebhookHeaders {
1618    /// Returns the number of columns needed to represent our headers.
1619    pub fn num_columns(&self) -> usize {
1620        let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1621        let mapped_headers = self.mapped_headers.len();
1622
1623        header_column + mapped_headers
1624    }
1625}
1626
1627#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1628pub struct WebhookHeaderFilters {
1629    pub block: BTreeSet<String>,
1630    pub allow: BTreeSet<String>,
1631}
1632
1633#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Arbitrary)]
1634pub enum WebhookBodyFormat {
1635    Json { array: bool },
1636    Bytes,
1637    Text,
1638}
1639
1640impl From<WebhookBodyFormat> for SqlScalarType {
1641    fn from(value: WebhookBodyFormat) -> Self {
1642        match value {
1643            WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1644            WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1645            WebhookBodyFormat::Text => SqlScalarType::String,
1646        }
1647    }
1648}
1649
1650#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1651pub struct WebhookValidationSecret {
1652    /// Identifies the secret by [`CatalogItemId`].
1653    pub id: CatalogItemId,
1654    /// Column index for the expression context that this secret was originally evaluated in.
1655    pub column_idx: usize,
1656    /// Whether or not this secret should be provided to the expression as Bytes or a String.
1657    pub use_bytes: bool,
1658}
1659
1660#[derive(Clone, Debug)]
1661pub struct Connection {
1662    pub create_sql: String,
1663    pub details: ConnectionDetails,
1664}
1665
1666#[derive(Clone, Debug, Serialize)]
1667pub enum ConnectionDetails {
1668    Kafka(KafkaConnection<ReferencedConnection>),
1669    Csr(CsrConnection<ReferencedConnection>),
1670    Postgres(PostgresConnection<ReferencedConnection>),
1671    Ssh {
1672        connection: SshConnection,
1673        key_1: SshKey,
1674        key_2: SshKey,
1675    },
1676    Aws(AwsConnection),
1677    AwsPrivatelink(AwsPrivatelinkConnection),
1678    MySql(MySqlConnection<ReferencedConnection>),
1679    SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1680    IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1681}
1682
1683impl ConnectionDetails {
1684    pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1685        match self {
1686            ConnectionDetails::Kafka(c) => {
1687                mz_storage_types::connections::Connection::Kafka(c.clone())
1688            }
1689            ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1690            ConnectionDetails::Postgres(c) => {
1691                mz_storage_types::connections::Connection::Postgres(c.clone())
1692            }
1693            ConnectionDetails::Ssh { connection, .. } => {
1694                mz_storage_types::connections::Connection::Ssh(connection.clone())
1695            }
1696            ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1697            ConnectionDetails::AwsPrivatelink(c) => {
1698                mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1699            }
1700            ConnectionDetails::MySql(c) => {
1701                mz_storage_types::connections::Connection::MySql(c.clone())
1702            }
1703            ConnectionDetails::SqlServer(c) => {
1704                mz_storage_types::connections::Connection::SqlServer(c.clone())
1705            }
1706            ConnectionDetails::IcebergCatalog(c) => {
1707                mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1708            }
1709        }
1710    }
1711}
1712
1713#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1714pub struct NetworkPolicyRule {
1715    pub name: String,
1716    pub action: NetworkPolicyRuleAction,
1717    pub address: PolicyAddress,
1718    pub direction: NetworkPolicyRuleDirection,
1719}
1720
1721#[derive(
1722    Debug,
1723    Clone,
1724    Serialize,
1725    Deserialize,
1726    PartialEq,
1727    Eq,
1728    Ord,
1729    PartialOrd,
1730    Hash
1731)]
1732pub enum NetworkPolicyRuleAction {
1733    Allow,
1734}
1735
1736impl std::fmt::Display for NetworkPolicyRuleAction {
1737    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1738        match self {
1739            Self::Allow => write!(f, "allow"),
1740        }
1741    }
1742}
1743impl TryFrom<&str> for NetworkPolicyRuleAction {
1744    type Error = PlanError;
1745    fn try_from(value: &str) -> Result<Self, Self::Error> {
1746        match value.to_uppercase().as_str() {
1747            "ALLOW" => Ok(Self::Allow),
1748            _ => Err(PlanError::Unstructured(
1749                "Allow is the only valid option".into(),
1750            )),
1751        }
1752    }
1753}
1754
1755#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1756pub enum NetworkPolicyRuleDirection {
1757    Ingress,
1758}
1759impl std::fmt::Display for NetworkPolicyRuleDirection {
1760    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1761        match self {
1762            Self::Ingress => write!(f, "ingress"),
1763        }
1764    }
1765}
1766impl TryFrom<&str> for NetworkPolicyRuleDirection {
1767    type Error = PlanError;
1768    fn try_from(value: &str) -> Result<Self, Self::Error> {
1769        match value.to_uppercase().as_str() {
1770            "INGRESS" => Ok(Self::Ingress),
1771            _ => Err(PlanError::Unstructured(
1772                "Ingress is the only valid option".into(),
1773            )),
1774        }
1775    }
1776}
1777
1778#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1779pub struct PolicyAddress(pub IpNet);
1780impl std::fmt::Display for PolicyAddress {
1781    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1782        write!(f, "{}", &self.0.to_string())
1783    }
1784}
1785impl From<String> for PolicyAddress {
1786    fn from(value: String) -> Self {
1787        Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1788    }
1789}
1790impl TryFrom<&str> for PolicyAddress {
1791    type Error = PlanError;
1792    fn try_from(value: &str) -> Result<Self, Self::Error> {
1793        let net = IpNet::from_str(value)
1794            .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1795        Ok(Self(net))
1796    }
1797}
1798
1799impl Serialize for PolicyAddress {
1800    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1801    where
1802        S: serde::Serializer,
1803    {
1804        serializer.serialize_str(&format!("{}", &self.0))
1805    }
1806}
1807
1808#[derive(Clone, Debug, Serialize)]
1809pub enum SshKey {
1810    PublicOnly(String),
1811    Both(SshKeyPair),
1812}
1813
1814impl SshKey {
1815    pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1816        match self {
1817            SshKey::PublicOnly(_) => None,
1818            SshKey::Both(key_pair) => Some(key_pair),
1819        }
1820    }
1821
1822    pub fn public_key(&self) -> String {
1823        match self {
1824            SshKey::PublicOnly(s) => s.into(),
1825            SshKey::Both(p) => p.ssh_public_key(),
1826        }
1827    }
1828}
1829
1830#[derive(Clone, Debug)]
1831pub struct Secret {
1832    pub create_sql: String,
1833    pub secret_as: MirScalarExpr,
1834}
1835
1836#[derive(Clone, Debug)]
1837pub struct Sink {
1838    /// Parse-able SQL that is stored durably and defines this sink.
1839    pub create_sql: String,
1840    /// Collection we read into this sink.
1841    pub from: GlobalId,
1842    /// Type of connection to the external service we sink into.
1843    pub connection: StorageSinkConnection<ReferencedConnection>,
1844    // TODO(guswynn): this probably should just be in the `connection`.
1845    pub envelope: SinkEnvelope,
1846    pub version: u64,
1847    pub commit_interval: Option<Duration>,
1848}
1849
1850#[derive(Clone, Debug)]
1851pub struct View {
1852    /// Parse-able SQL that is stored durably and defines this view.
1853    pub create_sql: String,
1854    /// Unoptimized high-level expression from parsing the `create_sql`.
1855    pub expr: HirRelationExpr,
1856    /// All of the catalog objects that are referenced by this view, according to the `expr`.
1857    pub dependencies: DependencyIds,
1858    /// Columns of this view.
1859    pub column_names: Vec<ColumnName>,
1860    /// If this view is created in the temporary schema, e.g. `CREATE TEMPORARY ...`.
1861    pub temporary: bool,
1862}
1863
1864#[derive(Clone, Debug)]
1865pub struct MaterializedView {
1866    /// Parse-able SQL that is stored durably and defines this materialized view.
1867    pub create_sql: String,
1868    /// Unoptimized high-level expression from parsing the `create_sql`.
1869    pub expr: HirRelationExpr,
1870    /// All of the catalog objects that are referenced by this materialized view, according to the `expr`.
1871    pub dependencies: DependencyIds,
1872    /// Columns of this view.
1873    pub column_names: Vec<ColumnName>,
1874    pub replacement_target: Option<CatalogItemId>,
1875    /// Cluster this materialized view will get installed on.
1876    pub cluster_id: ClusterId,
1877    /// If set, only install this materialized view's dataflow on the specified replica.
1878    pub target_replica: Option<ReplicaId>,
1879    pub non_null_assertions: Vec<usize>,
1880    pub compaction_window: Option<CompactionWindow>,
1881    pub refresh_schedule: Option<RefreshSchedule>,
1882    pub as_of: Option<Timestamp>,
1883}
1884
1885#[derive(Clone, Debug)]
1886pub struct Index {
1887    /// Parse-able SQL that is stored durably and defines this index.
1888    pub create_sql: String,
1889    /// Collection this index is on top of.
1890    pub on: GlobalId,
1891    pub keys: Vec<mz_expr::MirScalarExpr>,
1892    pub compaction_window: Option<CompactionWindow>,
1893    pub cluster_id: ClusterId,
1894}
1895
1896#[derive(Clone, Debug)]
1897pub struct Type {
1898    pub create_sql: String,
1899    pub inner: CatalogType<IdReference>,
1900}
1901
1902/// Specifies when a `Peek` or `Subscribe` should occur.
1903#[derive(Deserialize, Clone, Debug, PartialEq)]
1904pub enum QueryWhen {
1905    /// The peek should occur at the latest possible timestamp that allows the
1906    /// peek to complete immediately.
1907    Immediately,
1908    /// The peek should occur at a timestamp that allows the peek to see all
1909    /// data written to tables within Materialize.
1910    FreshestTableWrite,
1911    /// The peek should occur at the timestamp described by the specified
1912    /// expression.
1913    ///
1914    /// The expression may have any type.
1915    AtTimestamp(Timestamp),
1916    /// Same as Immediately, but will also advance to at least the specified
1917    /// expression.
1918    AtLeastTimestamp(Timestamp),
1919}
1920
1921impl QueryWhen {
1922    /// Returns a timestamp to which the candidate must be advanced.
1923    pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1924        match self {
1925            QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1926            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1927        }
1928    }
1929    /// Returns whether the candidate's upper bound is constrained.
1930    /// This is only true for `AtTimestamp` since it is the only variant that
1931    /// specifies a timestamp.
1932    pub fn constrains_upper(&self) -> bool {
1933        match self {
1934            QueryWhen::AtTimestamp(_) => true,
1935            QueryWhen::AtLeastTimestamp(_)
1936            | QueryWhen::Immediately
1937            | QueryWhen::FreshestTableWrite => false,
1938        }
1939    }
1940    /// Returns whether the candidate must be advanced to the since.
1941    pub fn advance_to_since(&self) -> bool {
1942        match self {
1943            QueryWhen::Immediately
1944            | QueryWhen::AtLeastTimestamp(_)
1945            | QueryWhen::FreshestTableWrite => true,
1946            QueryWhen::AtTimestamp(_) => false,
1947        }
1948    }
1949    /// Returns whether the candidate can be advanced to the upper.
1950    pub fn can_advance_to_upper(&self) -> bool {
1951        match self {
1952            QueryWhen::Immediately => true,
1953            QueryWhen::FreshestTableWrite
1954            | QueryWhen::AtTimestamp(_)
1955            | QueryWhen::AtLeastTimestamp(_) => false,
1956        }
1957    }
1958
1959    /// Returns whether the candidate can be advanced to the timeline's timestamp.
1960    pub fn can_advance_to_timeline_ts(&self) -> bool {
1961        match self {
1962            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1963            QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1964        }
1965    }
1966    /// Returns whether the candidate must be advanced to the timeline's timestamp.
1967    pub fn must_advance_to_timeline_ts(&self) -> bool {
1968        match self {
1969            QueryWhen::FreshestTableWrite => true,
1970            QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1971                false
1972            }
1973        }
1974    }
1975    /// Returns whether the selected timestamp should be tracked within the current transaction.
1976    pub fn is_transactional(&self) -> bool {
1977        match self {
1978            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1979            QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1980        }
1981    }
1982}
1983
1984#[derive(Debug, Copy, Clone)]
1985pub enum MutationKind {
1986    Insert,
1987    Update,
1988    Delete,
1989}
1990
1991#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1992pub enum CopyFormat {
1993    Text,
1994    Csv,
1995    Binary,
1996    Parquet,
1997}
1998
1999#[derive(Debug, Copy, Clone)]
2000pub enum ExecuteTimeout {
2001    None,
2002    Seconds(f64),
2003    WaitOnce,
2004}
2005
2006#[derive(Clone, Debug)]
2007pub enum IndexOption {
2008    /// Configures the logical compaction window for an index.
2009    RetainHistory(CompactionWindow),
2010}
2011
2012#[derive(Clone, Debug)]
2013pub enum TableOption {
2014    /// Configures the logical compaction window for a table.
2015    RetainHistory(CompactionWindow),
2016}
2017
2018#[derive(Clone, Debug)]
2019pub struct PlanClusterOption {
2020    pub availability_zones: AlterOptionParameter<Vec<String>>,
2021    pub introspection_debugging: AlterOptionParameter<bool>,
2022    pub introspection_interval: AlterOptionParameter<OptionalDuration>,
2023    pub managed: AlterOptionParameter<bool>,
2024    pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
2025    pub replication_factor: AlterOptionParameter<u32>,
2026    pub size: AlterOptionParameter,
2027    pub schedule: AlterOptionParameter<ClusterSchedule>,
2028    pub workload_class: AlterOptionParameter<Option<String>>,
2029}
2030
2031impl Default for PlanClusterOption {
2032    fn default() -> Self {
2033        Self {
2034            availability_zones: AlterOptionParameter::Unchanged,
2035            introspection_debugging: AlterOptionParameter::Unchanged,
2036            introspection_interval: AlterOptionParameter::Unchanged,
2037            managed: AlterOptionParameter::Unchanged,
2038            replicas: AlterOptionParameter::Unchanged,
2039            replication_factor: AlterOptionParameter::Unchanged,
2040            size: AlterOptionParameter::Unchanged,
2041            schedule: AlterOptionParameter::Unchanged,
2042            workload_class: AlterOptionParameter::Unchanged,
2043        }
2044    }
2045}
2046
2047#[derive(Clone, Debug, PartialEq, Eq)]
2048pub enum AlterClusterPlanStrategy {
2049    None,
2050    For(Duration),
2051    UntilReady {
2052        on_timeout: OnTimeoutAction,
2053        timeout: Duration,
2054    },
2055}
2056
2057#[derive(Clone, Debug, PartialEq, Eq)]
2058pub enum OnTimeoutAction {
2059    Commit,
2060    Rollback,
2061}
2062
2063impl Default for OnTimeoutAction {
2064    fn default() -> Self {
2065        Self::Commit
2066    }
2067}
2068
2069impl TryFrom<&str> for OnTimeoutAction {
2070    type Error = PlanError;
2071    fn try_from(value: &str) -> Result<Self, Self::Error> {
2072        match value.to_uppercase().as_str() {
2073            "COMMIT" => Ok(Self::Commit),
2074            "ROLLBACK" => Ok(Self::Rollback),
2075            _ => Err(PlanError::Unstructured(
2076                "Valid options are COMMIT, ROLLBACK".into(),
2077            )),
2078        }
2079    }
2080}
2081
2082impl AlterClusterPlanStrategy {
2083    pub fn is_none(&self) -> bool {
2084        matches!(self, Self::None)
2085    }
2086    pub fn is_some(&self) -> bool {
2087        !matches!(self, Self::None)
2088    }
2089}
2090
2091impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2092    type Error = PlanError;
2093
2094    fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2095        Ok(match value.wait {
2096            Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2097            Some(ClusterAlterOptionValue::UntilReady(options)) => {
2098                let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2099                Self::UntilReady {
2100                    timeout: match extracted.timeout {
2101                        Some(d) => d,
2102                        None => Err(PlanError::UntilReadyTimeoutRequired)?,
2103                    },
2104                    on_timeout: match extracted.on_timeout {
2105                        Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2106                            PlanError::InvalidOptionValue {
2107                                option_name: "ON TIMEOUT".into(),
2108                                err: Box::new(e),
2109                            }
2110                        })?,
2111                        None => OnTimeoutAction::default(),
2112                    },
2113                }
2114            }
2115            None => Self::None,
2116        })
2117    }
2118}
2119
2120/// A vector of values to which parameter references should be bound.
2121#[derive(Debug, Clone)]
2122pub struct Params {
2123    /// The datums that were provided in the EXECUTE statement.
2124    pub datums: Row,
2125    /// The types of the datums provided in the EXECUTE statement.
2126    pub execute_types: Vec<SqlScalarType>,
2127    /// The types that the prepared statement expects based on its definition.
2128    pub expected_types: Vec<SqlScalarType>,
2129}
2130
2131impl Params {
2132    /// Returns a `Params` with no parameters.
2133    pub fn empty() -> Params {
2134        Params {
2135            datums: Row::pack_slice(&[]),
2136            execute_types: vec![],
2137            expected_types: vec![],
2138        }
2139    }
2140}
2141
2142/// Controls planning of a SQL query.
2143#[derive(
2144    Ord,
2145    PartialOrd,
2146    Clone,
2147    Debug,
2148    Eq,
2149    PartialEq,
2150    Serialize,
2151    Deserialize,
2152    Hash,
2153    Copy
2154)]
2155pub struct PlanContext {
2156    pub wall_time: DateTime<Utc>,
2157    pub ignore_if_exists_errors: bool,
2158}
2159
2160impl PlanContext {
2161    pub fn new(wall_time: DateTime<Utc>) -> Self {
2162        Self {
2163            wall_time,
2164            ignore_if_exists_errors: false,
2165        }
2166    }
2167
2168    /// Return a PlanContext with zero values. This should only be used when
2169    /// planning is required but unused (like in `plan_create_table()`) or in
2170    /// tests.
2171    pub fn zero() -> Self {
2172        PlanContext {
2173            wall_time: now::to_datetime(NOW_ZERO()),
2174            ignore_if_exists_errors: false,
2175        }
2176    }
2177
2178    pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2179        self.ignore_if_exists_errors = value;
2180        self
2181    }
2182}