Skip to main content

mz_sql/
plan.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL planning.
11//!
12//! SQL planning is the process of taking the abstract syntax tree of a
13//! [`Statement`] and turning it into a [`Plan`] that the dataflow layer can
14//! execute.
15//!
16//! Statements must be purified before they can be planned. See the
17//! [`pure`](crate::pure) module for details.
18
19// Internal module layout.
20//
21// The entry point for planning is `statement::handle_statement`. That function
22// dispatches to a more specific `handle` function for the particular statement
23// type. For most statements, this `handle` function is uninteresting and short,
24// but anything involving a `SELECT` statement gets complicated. `SELECT`
25// queries wind through the functions in the `query` module, starting with
26// `plan_root_query` and fanning out based on the contents of the `SELECT`
27// statement.
28
29use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{
41    CollectionPlan, ColumnOrder, MapFilterProject, MirRelationExpr, MirScalarExpr, RowSetFinishing,
42};
43use mz_ore::now::{self, NOW_ZERO};
44use mz_pgcopy::CopyFormatParams;
45use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
46use mz_repr::explain::{ExplainConfig, ExplainFormat};
47use mz_repr::network_policy_id::NetworkPolicyId;
48use mz_repr::optimize::OptimizerFeatureOverrides;
49use mz_repr::refresh_schedule::RefreshSchedule;
50use mz_repr::role_id::RoleId;
51use mz_repr::{
52    CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, ReprColumnType, Row,
53    SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
54};
55use mz_sql_parser::ast::{
56    AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
57    RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
58    Value, WithOptionValue,
59};
60use mz_ssh_util::keys::SshKeyPair;
61use mz_storage_types::connections::aws::AwsConnection;
62use mz_storage_types::connections::inline::ReferencedConnection;
63use mz_storage_types::connections::{
64    AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
65    MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
66};
67use mz_storage_types::instances::StorageInstanceId;
68use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
69use mz_storage_types::sources::{
70    SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
71};
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74
75use crate::ast::{
76    ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
77    TransactionAccessMode,
78};
79use crate::catalog::{
80    CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
81    RoleAttributesRaw,
82};
83use crate::names::{
84    Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
85    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
86};
87
88pub(crate) mod error;
89pub(crate) mod explain;
90pub(crate) mod hir;
91pub(crate) mod literal;
92pub(crate) mod lowering;
93pub(crate) mod notice;
94pub(crate) mod plan_utils;
95pub(crate) mod query;
96pub(crate) mod scope;
97pub(crate) mod side_effecting_func;
98pub(crate) mod statement;
99pub(crate) mod transform_ast;
100pub(crate) mod transform_hir;
101pub(crate) mod typeconv;
102pub(crate) mod with_options;
103
104use crate::plan;
105use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
106use crate::plan::with_options::OptionalDuration;
107pub use error::PlanError;
108pub use explain::normalize_subqueries;
109pub use hir::{
110    AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
111    WindowExprType,
112};
113pub use lowering::Config as HirToMirConfig;
114pub use notice::PlanNotice;
115pub use query::{ExprContext, QueryContext, QueryLifetime};
116pub use scope::Scope;
117pub use side_effecting_func::SideEffectingFunc;
118pub use statement::ddl::{
119    AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
120    PlannedAlterRoleOption, PlannedRoleVariable, SqlServerConfigOptionExtracted,
121};
122pub use statement::{
123    StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
124    resolve_cluster_for_materialized_view,
125};
126
127use self::statement::ddl::ClusterAlterOptionExtracted;
128use self::with_options::TryFromValue;
129
130/// Instructions for executing a SQL query.
131#[derive(Debug, EnumKind)]
132#[enum_kind(PlanKind)]
133pub enum Plan {
134    CreateConnection(CreateConnectionPlan),
135    CreateDatabase(CreateDatabasePlan),
136    CreateSchema(CreateSchemaPlan),
137    CreateRole(CreateRolePlan),
138    CreateCluster(CreateClusterPlan),
139    CreateClusterReplica(CreateClusterReplicaPlan),
140    CreateSource(CreateSourcePlan),
141    CreateSources(Vec<CreateSourcePlanBundle>),
142    CreateSecret(CreateSecretPlan),
143    CreateSink(CreateSinkPlan),
144    CreateTable(CreateTablePlan),
145    CreateView(CreateViewPlan),
146    CreateMaterializedView(CreateMaterializedViewPlan),
147    CreateContinualTask(CreateContinualTaskPlan),
148    CreateNetworkPolicy(CreateNetworkPolicyPlan),
149    CreateIndex(CreateIndexPlan),
150    CreateType(CreateTypePlan),
151    Comment(CommentPlan),
152    DiscardTemp,
153    DiscardAll,
154    DropObjects(DropObjectsPlan),
155    DropOwned(DropOwnedPlan),
156    EmptyQuery,
157    ShowAllVariables,
158    ShowCreate(ShowCreatePlan),
159    ShowColumns(ShowColumnsPlan),
160    ShowVariable(ShowVariablePlan),
161    InspectShard(InspectShardPlan),
162    SetVariable(SetVariablePlan),
163    ResetVariable(ResetVariablePlan),
164    SetTransaction(SetTransactionPlan),
165    StartTransaction(StartTransactionPlan),
166    CommitTransaction(CommitTransactionPlan),
167    AbortTransaction(AbortTransactionPlan),
168    Select(SelectPlan),
169    Subscribe(SubscribePlan),
170    CopyFrom(CopyFromPlan),
171    CopyTo(CopyToPlan),
172    ExplainPlan(ExplainPlanPlan),
173    ExplainPushdown(ExplainPushdownPlan),
174    ExplainTimestamp(ExplainTimestampPlan),
175    ExplainSinkSchema(ExplainSinkSchemaPlan),
176    Insert(InsertPlan),
177    AlterCluster(AlterClusterPlan),
178    AlterClusterSwap(AlterClusterSwapPlan),
179    AlterNoop(AlterNoopPlan),
180    AlterSetCluster(AlterSetClusterPlan),
181    AlterConnection(AlterConnectionPlan),
182    AlterSource(AlterSourcePlan),
183    AlterClusterRename(AlterClusterRenamePlan),
184    AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
185    AlterItemRename(AlterItemRenamePlan),
186    AlterSchemaRename(AlterSchemaRenamePlan),
187    AlterSchemaSwap(AlterSchemaSwapPlan),
188    AlterSecret(AlterSecretPlan),
189    AlterSink(AlterSinkPlan),
190    AlterSystemSet(AlterSystemSetPlan),
191    AlterSystemReset(AlterSystemResetPlan),
192    AlterSystemResetAll(AlterSystemResetAllPlan),
193    AlterRole(AlterRolePlan),
194    AlterOwner(AlterOwnerPlan),
195    AlterTableAddColumn(AlterTablePlan),
196    AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementPlan),
197    AlterNetworkPolicy(AlterNetworkPolicyPlan),
198    Declare(DeclarePlan),
199    Fetch(FetchPlan),
200    Close(ClosePlan),
201    ReadThenWrite(ReadThenWritePlan),
202    Prepare(PreparePlan),
203    Execute(ExecutePlan),
204    Deallocate(DeallocatePlan),
205    Raise(RaisePlan),
206    GrantRole(GrantRolePlan),
207    RevokeRole(RevokeRolePlan),
208    GrantPrivileges(GrantPrivilegesPlan),
209    RevokePrivileges(RevokePrivilegesPlan),
210    AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
211    ReassignOwned(ReassignOwnedPlan),
212    SideEffectingFunc(SideEffectingFunc),
213    ValidateConnection(ValidateConnectionPlan),
214    AlterRetainHistory(AlterRetainHistoryPlan),
215    AlterSourceTimestampInterval(AlterSourceTimestampIntervalPlan),
216}
217
218impl Plan {
219    /// Expresses which [`StatementKind`] can generate which set of
220    /// [`PlanKind`].
221    pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
222        match stmt {
223            StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
224            StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
225            StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
226            StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
227            StatementKind::AlterObjectRename => &[
228                PlanKind::AlterClusterRename,
229                PlanKind::AlterClusterReplicaRename,
230                PlanKind::AlterItemRename,
231                PlanKind::AlterSchemaRename,
232                PlanKind::AlterNoop,
233            ],
234            StatementKind::AlterObjectSwap => &[
235                PlanKind::AlterClusterSwap,
236                PlanKind::AlterSchemaSwap,
237                PlanKind::AlterNoop,
238            ],
239            StatementKind::AlterRole => &[PlanKind::AlterRole],
240            StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
241            StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
242            StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
243            StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
244            StatementKind::AlterSource => &[
245                PlanKind::AlterNoop,
246                PlanKind::AlterSource,
247                PlanKind::AlterRetainHistory,
248                PlanKind::AlterSourceTimestampInterval,
249            ],
250            StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
251            StatementKind::AlterSystemResetAll => {
252                &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
253            }
254            StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
255            StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
256            StatementKind::AlterTableAddColumn => {
257                &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
258            }
259            StatementKind::AlterMaterializedViewApplyReplacement => &[
260                PlanKind::AlterNoop,
261                PlanKind::AlterMaterializedViewApplyReplacement,
262            ],
263            StatementKind::Close => &[PlanKind::Close],
264            StatementKind::Comment => &[PlanKind::Comment],
265            StatementKind::Commit => &[PlanKind::CommitTransaction],
266            StatementKind::Copy => &[
267                PlanKind::CopyFrom,
268                PlanKind::Select,
269                PlanKind::Subscribe,
270                PlanKind::CopyTo,
271            ],
272            StatementKind::CreateCluster => &[PlanKind::CreateCluster],
273            StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
274            StatementKind::CreateConnection => &[PlanKind::CreateConnection],
275            StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
276            StatementKind::CreateIndex => &[PlanKind::CreateIndex],
277            StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
278            StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
279            StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
280            StatementKind::CreateRole => &[PlanKind::CreateRole],
281            StatementKind::CreateSchema => &[PlanKind::CreateSchema],
282            StatementKind::CreateSecret => &[PlanKind::CreateSecret],
283            StatementKind::CreateSink => &[PlanKind::CreateSink],
284            StatementKind::CreateSource | StatementKind::CreateSubsource => {
285                &[PlanKind::CreateSource]
286            }
287            StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
288            StatementKind::CreateTable => &[PlanKind::CreateTable],
289            StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
290            StatementKind::CreateType => &[PlanKind::CreateType],
291            StatementKind::CreateView => &[PlanKind::CreateView],
292            StatementKind::Deallocate => &[PlanKind::Deallocate],
293            StatementKind::Declare => &[PlanKind::Declare],
294            StatementKind::Delete => &[PlanKind::ReadThenWrite],
295            StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
296            StatementKind::DropObjects => &[PlanKind::DropObjects],
297            StatementKind::DropOwned => &[PlanKind::DropOwned],
298            StatementKind::Execute => &[PlanKind::Execute],
299            StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
300            StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
301            StatementKind::ExplainAnalyzeObject => &[PlanKind::Select],
302            StatementKind::ExplainAnalyzeCluster => &[PlanKind::Select],
303            StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
304            StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
305            StatementKind::Fetch => &[PlanKind::Fetch],
306            StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
307            StatementKind::GrantRole => &[PlanKind::GrantRole],
308            StatementKind::Insert => &[PlanKind::Insert],
309            StatementKind::Prepare => &[PlanKind::Prepare],
310            StatementKind::Raise => &[PlanKind::Raise],
311            StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
312            StatementKind::ResetVariable => &[PlanKind::ResetVariable],
313            StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
314            StatementKind::RevokeRole => &[PlanKind::RevokeRole],
315            StatementKind::Rollback => &[PlanKind::AbortTransaction],
316            StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
317            StatementKind::SetTransaction => &[PlanKind::SetTransaction],
318            StatementKind::SetVariable => &[PlanKind::SetVariable],
319            StatementKind::Show => &[
320                PlanKind::Select,
321                PlanKind::ShowVariable,
322                PlanKind::ShowCreate,
323                PlanKind::ShowColumns,
324                PlanKind::ShowAllVariables,
325                PlanKind::InspectShard,
326            ],
327            StatementKind::StartTransaction => &[PlanKind::StartTransaction],
328            StatementKind::Subscribe => &[PlanKind::Subscribe],
329            StatementKind::Update => &[PlanKind::ReadThenWrite],
330            StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
331            StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
332        }
333    }
334
335    /// Returns a human readable name of the plan. Meant for use in messages sent back to a user.
336    pub fn name(&self) -> &str {
337        match self {
338            Plan::CreateConnection(_) => "create connection",
339            Plan::CreateDatabase(_) => "create database",
340            Plan::CreateSchema(_) => "create schema",
341            Plan::CreateRole(_) => "create role",
342            Plan::CreateCluster(_) => "create cluster",
343            Plan::CreateClusterReplica(_) => "create cluster replica",
344            Plan::CreateSource(_) => "create source",
345            Plan::CreateSources(_) => "create source",
346            Plan::CreateSecret(_) => "create secret",
347            Plan::CreateSink(_) => "create sink",
348            Plan::CreateTable(_) => "create table",
349            Plan::CreateView(_) => "create view",
350            Plan::CreateMaterializedView(_) => "create materialized view",
351            Plan::CreateContinualTask(_) => "create continual task",
352            Plan::CreateIndex(_) => "create index",
353            Plan::CreateType(_) => "create type",
354            Plan::CreateNetworkPolicy(_) => "create network policy",
355            Plan::Comment(_) => "comment",
356            Plan::DiscardTemp => "discard temp",
357            Plan::DiscardAll => "discard all",
358            Plan::DropObjects(plan) => match plan.object_type {
359                ObjectType::Table => "drop table",
360                ObjectType::View => "drop view",
361                ObjectType::MaterializedView => "drop materialized view",
362                ObjectType::Source => "drop source",
363                ObjectType::Sink => "drop sink",
364                ObjectType::Index => "drop index",
365                ObjectType::Type => "drop type",
366                ObjectType::Role => "drop roles",
367                ObjectType::Cluster => "drop clusters",
368                ObjectType::ClusterReplica => "drop cluster replicas",
369                ObjectType::Secret => "drop secret",
370                ObjectType::Connection => "drop connection",
371                ObjectType::Database => "drop database",
372                ObjectType::Schema => "drop schema",
373                ObjectType::Func => "drop function",
374                ObjectType::ContinualTask => "drop continual task",
375                ObjectType::NetworkPolicy => "drop network policy",
376            },
377            Plan::DropOwned(_) => "drop owned",
378            Plan::EmptyQuery => "do nothing",
379            Plan::ShowAllVariables => "show all variables",
380            Plan::ShowCreate(_) => "show create",
381            Plan::ShowColumns(_) => "show columns",
382            Plan::ShowVariable(_) => "show variable",
383            Plan::InspectShard(_) => "inspect shard",
384            Plan::SetVariable(_) => "set variable",
385            Plan::ResetVariable(_) => "reset variable",
386            Plan::SetTransaction(_) => "set transaction",
387            Plan::StartTransaction(_) => "start transaction",
388            Plan::CommitTransaction(_) => "commit",
389            Plan::AbortTransaction(_) => "abort",
390            Plan::Select(_) => "select",
391            Plan::Subscribe(_) => "subscribe",
392            Plan::CopyFrom(_) => "copy from",
393            Plan::CopyTo(_) => "copy to",
394            Plan::ExplainPlan(_) => "explain plan",
395            Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
396            Plan::ExplainTimestamp(_) => "explain timestamp",
397            Plan::ExplainSinkSchema(_) => "explain schema",
398            Plan::Insert(_) => "insert",
399            Plan::AlterNoop(plan) => match plan.object_type {
400                ObjectType::Table => "alter table",
401                ObjectType::View => "alter view",
402                ObjectType::MaterializedView => "alter materialized view",
403                ObjectType::Source => "alter source",
404                ObjectType::Sink => "alter sink",
405                ObjectType::Index => "alter index",
406                ObjectType::Type => "alter type",
407                ObjectType::Role => "alter role",
408                ObjectType::Cluster => "alter cluster",
409                ObjectType::ClusterReplica => "alter cluster replica",
410                ObjectType::Secret => "alter secret",
411                ObjectType::Connection => "alter connection",
412                ObjectType::Database => "alter database",
413                ObjectType::Schema => "alter schema",
414                ObjectType::Func => "alter function",
415                ObjectType::ContinualTask => "alter continual task",
416                ObjectType::NetworkPolicy => "alter network policy",
417            },
418            Plan::AlterCluster(_) => "alter cluster",
419            Plan::AlterClusterRename(_) => "alter cluster rename",
420            Plan::AlterClusterSwap(_) => "alter cluster swap",
421            Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
422            Plan::AlterSetCluster(_) => "alter set cluster",
423            Plan::AlterConnection(_) => "alter connection",
424            Plan::AlterSource(_) => "alter source",
425            Plan::AlterItemRename(_) => "rename item",
426            Plan::AlterSchemaRename(_) => "alter rename schema",
427            Plan::AlterSchemaSwap(_) => "alter swap schema",
428            Plan::AlterSecret(_) => "alter secret",
429            Plan::AlterSink(_) => "alter sink",
430            Plan::AlterSystemSet(_) => "alter system",
431            Plan::AlterSystemReset(_) => "alter system",
432            Plan::AlterSystemResetAll(_) => "alter system",
433            Plan::AlterRole(_) => "alter role",
434            Plan::AlterNetworkPolicy(_) => "alter network policy",
435            Plan::AlterOwner(plan) => match plan.object_type {
436                ObjectType::Table => "alter table owner",
437                ObjectType::View => "alter view owner",
438                ObjectType::MaterializedView => "alter materialized view owner",
439                ObjectType::Source => "alter source owner",
440                ObjectType::Sink => "alter sink owner",
441                ObjectType::Index => "alter index owner",
442                ObjectType::Type => "alter type owner",
443                ObjectType::Role => "alter role owner",
444                ObjectType::Cluster => "alter cluster owner",
445                ObjectType::ClusterReplica => "alter cluster replica owner",
446                ObjectType::Secret => "alter secret owner",
447                ObjectType::Connection => "alter connection owner",
448                ObjectType::Database => "alter database owner",
449                ObjectType::Schema => "alter schema owner",
450                ObjectType::Func => "alter function owner",
451                ObjectType::ContinualTask => "alter continual task owner",
452                ObjectType::NetworkPolicy => "alter network policy owner",
453            },
454            Plan::AlterTableAddColumn(_) => "alter table add column",
455            Plan::AlterMaterializedViewApplyReplacement(_) => {
456                "alter materialized view apply replacement"
457            }
458            Plan::Declare(_) => "declare",
459            Plan::Fetch(_) => "fetch",
460            Plan::Close(_) => "close",
461            Plan::ReadThenWrite(plan) => match plan.kind {
462                MutationKind::Insert => "insert into select",
463                MutationKind::Update => "update",
464                MutationKind::Delete => "delete",
465            },
466            Plan::Prepare(_) => "prepare",
467            Plan::Execute(_) => "execute",
468            Plan::Deallocate(_) => "deallocate",
469            Plan::Raise(_) => "raise",
470            Plan::GrantRole(_) => "grant role",
471            Plan::RevokeRole(_) => "revoke role",
472            Plan::GrantPrivileges(_) => "grant privilege",
473            Plan::RevokePrivileges(_) => "revoke privilege",
474            Plan::AlterDefaultPrivileges(_) => "alter default privileges",
475            Plan::ReassignOwned(_) => "reassign owned",
476            Plan::SideEffectingFunc(_) => "side effecting func",
477            Plan::ValidateConnection(_) => "validate connection",
478            Plan::AlterRetainHistory(_) => "alter retain history",
479            Plan::AlterSourceTimestampInterval(_) => "alter source timestamp interval",
480        }
481    }
482
483    /// Returns `true` iff this `Plan` is allowed to be executed in read-only
484    /// mode.
485    ///
486    /// We use an explicit allow-list, to avoid future additions automatically
487    /// falling into the `true` category.
488    pub fn allowed_in_read_only(&self) -> bool {
489        match self {
490            // These two set non-durable session variables, so are okay in
491            // read-only mode.
492            Plan::SetVariable(_) => true,
493            Plan::ResetVariable(_) => true,
494            Plan::SetTransaction(_) => true,
495            Plan::StartTransaction(_) => true,
496            Plan::CommitTransaction(_) => true,
497            Plan::AbortTransaction(_) => true,
498            Plan::Select(_) => true,
499            Plan::EmptyQuery => true,
500            Plan::ShowAllVariables => true,
501            Plan::ShowCreate(_) => true,
502            Plan::ShowColumns(_) => true,
503            Plan::ShowVariable(_) => true,
504            Plan::InspectShard(_) => true,
505            Plan::Subscribe(_) => true,
506            Plan::CopyTo(_) => true,
507            Plan::ExplainPlan(_) => true,
508            Plan::ExplainPushdown(_) => true,
509            Plan::ExplainTimestamp(_) => true,
510            Plan::ExplainSinkSchema(_) => true,
511            Plan::ValidateConnection(_) => true,
512            _ => false,
513        }
514    }
515}
516
517#[derive(Debug)]
518pub struct StartTransactionPlan {
519    pub access: Option<TransactionAccessMode>,
520    pub isolation_level: Option<TransactionIsolationLevel>,
521}
522
523#[derive(Debug)]
524pub enum TransactionType {
525    Explicit,
526    Implicit,
527}
528
529impl TransactionType {
530    pub fn is_explicit(&self) -> bool {
531        matches!(self, TransactionType::Explicit)
532    }
533
534    pub fn is_implicit(&self) -> bool {
535        matches!(self, TransactionType::Implicit)
536    }
537}
538
539#[derive(Debug)]
540pub struct CommitTransactionPlan {
541    pub transaction_type: TransactionType,
542}
543
544#[derive(Debug)]
545pub struct AbortTransactionPlan {
546    pub transaction_type: TransactionType,
547}
548
549#[derive(Debug)]
550pub struct CreateDatabasePlan {
551    pub name: String,
552    pub if_not_exists: bool,
553}
554
555#[derive(Debug)]
556pub struct CreateSchemaPlan {
557    pub database_spec: ResolvedDatabaseSpecifier,
558    pub schema_name: String,
559    pub if_not_exists: bool,
560}
561
562#[derive(Debug)]
563pub struct CreateRolePlan {
564    pub name: String,
565    pub attributes: RoleAttributesRaw,
566}
567
568#[derive(Debug, PartialEq, Eq, Clone)]
569pub struct CreateClusterPlan {
570    pub name: String,
571    pub variant: CreateClusterVariant,
572    pub workload_class: Option<String>,
573}
574
575#[derive(Debug, PartialEq, Eq, Clone)]
576pub enum CreateClusterVariant {
577    Managed(CreateClusterManagedPlan),
578    Unmanaged(CreateClusterUnmanagedPlan),
579}
580
581#[derive(Debug, PartialEq, Eq, Clone)]
582pub struct CreateClusterUnmanagedPlan {
583    pub replicas: Vec<(String, ReplicaConfig)>,
584}
585
586#[derive(Debug, PartialEq, Eq, Clone)]
587pub struct CreateClusterManagedPlan {
588    pub replication_factor: u32,
589    pub size: String,
590    pub availability_zones: Vec<String>,
591    pub compute: ComputeReplicaConfig,
592    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
593    pub schedule: ClusterSchedule,
594}
595
596#[derive(Debug)]
597pub struct CreateClusterReplicaPlan {
598    pub cluster_id: ClusterId,
599    pub name: String,
600    pub config: ReplicaConfig,
601}
602
603/// Configuration of introspection for a cluster replica.
604#[derive(
605    Clone,
606    Copy,
607    Debug,
608    Serialize,
609    Deserialize,
610    PartialOrd,
611    Ord,
612    PartialEq,
613    Eq
614)]
615pub struct ComputeReplicaIntrospectionConfig {
616    /// Whether to introspect the introspection.
617    pub debugging: bool,
618    /// The interval at which to introspect.
619    pub interval: Duration,
620}
621
622#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
623pub struct ComputeReplicaConfig {
624    pub introspection: Option<ComputeReplicaIntrospectionConfig>,
625}
626
627#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
628pub enum ReplicaConfig {
629    Unorchestrated {
630        storagectl_addrs: Vec<String>,
631        computectl_addrs: Vec<String>,
632        compute: ComputeReplicaConfig,
633    },
634    Orchestrated {
635        size: String,
636        availability_zone: Option<String>,
637        compute: ComputeReplicaConfig,
638        internal: bool,
639        billed_as: Option<String>,
640    },
641}
642
643#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
644pub enum ClusterSchedule {
645    /// The system won't automatically turn the cluster On or Off.
646    Manual,
647    /// The cluster will be On when a REFRESH materialized view on it needs to refresh.
648    /// `hydration_time_estimate` determines how much time before a refresh to turn the
649    /// cluster On, so that it can rehydrate already before the refresh time.
650    Refresh { hydration_time_estimate: Duration },
651}
652
653impl Default for ClusterSchedule {
654    fn default() -> Self {
655        // (Has to be consistent with `impl Default for ClusterScheduleOptionValue`.)
656        ClusterSchedule::Manual
657    }
658}
659
660#[derive(Debug)]
661pub struct CreateSourcePlan {
662    pub name: QualifiedItemName,
663    pub source: Source,
664    pub if_not_exists: bool,
665    pub timeline: Timeline,
666    // None for subsources, which run on the parent cluster.
667    pub in_cluster: Option<ClusterId>,
668}
669
670#[derive(Clone, Debug, PartialEq, Eq)]
671pub struct SourceReferences {
672    pub updated_at: u64,
673    pub references: Vec<SourceReference>,
674}
675
676/// An available external reference for a source and if possible to retrieve,
677/// any column names it contains.
678#[derive(Clone, Debug, PartialEq, Eq)]
679pub struct SourceReference {
680    pub name: String,
681    pub namespace: Option<String>,
682    pub columns: Vec<String>,
683}
684
685/// A [`CreateSourcePlan`] and the metadata necessary to sequence it.
686#[derive(Debug)]
687pub struct CreateSourcePlanBundle {
688    /// ID of this source in the Catalog.
689    pub item_id: CatalogItemId,
690    /// ID used to reference this source from outside the catalog, e.g. compute.
691    pub global_id: GlobalId,
692    /// Details of the source to create.
693    pub plan: CreateSourcePlan,
694    /// Other catalog objects that are referenced by this source, determined at name resolution.
695    pub resolved_ids: ResolvedIds,
696    /// All the available upstream references for this source.
697    /// Populated for top-level sources that can contain subsources/tables
698    /// and used during sequencing to populate the appropriate catalog fields.
699    pub available_source_references: Option<SourceReferences>,
700}
701
702#[derive(Debug)]
703pub struct CreateConnectionPlan {
704    pub name: QualifiedItemName,
705    pub if_not_exists: bool,
706    pub connection: Connection,
707    pub validate: bool,
708}
709
710#[derive(Debug)]
711pub struct ValidateConnectionPlan {
712    /// ID of the connection in the Catalog.
713    pub id: CatalogItemId,
714    /// The connection to validate.
715    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
716}
717
718#[derive(Debug)]
719pub struct CreateSecretPlan {
720    pub name: QualifiedItemName,
721    pub secret: Secret,
722    pub if_not_exists: bool,
723}
724
725#[derive(Debug)]
726pub struct CreateSinkPlan {
727    pub name: QualifiedItemName,
728    pub sink: Sink,
729    pub with_snapshot: bool,
730    pub if_not_exists: bool,
731    pub in_cluster: ClusterId,
732}
733
734#[derive(Debug)]
735pub struct CreateTablePlan {
736    pub name: QualifiedItemName,
737    pub table: Table,
738    pub if_not_exists: bool,
739}
740
741#[derive(Debug, Clone)]
742pub struct CreateViewPlan {
743    pub name: QualifiedItemName,
744    pub view: View,
745    /// The Catalog objects that this view is replacing, if any.
746    pub replace: Option<CatalogItemId>,
747    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
748    pub drop_ids: Vec<CatalogItemId>,
749    pub if_not_exists: bool,
750    /// True if the view contains an expression that can make the exact column list
751    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
752    pub ambiguous_columns: bool,
753}
754
755#[derive(Debug, Clone)]
756pub struct CreateMaterializedViewPlan {
757    pub name: QualifiedItemName,
758    pub materialized_view: MaterializedView,
759    /// The Catalog objects that this materialized view is replacing, if any.
760    pub replace: Option<CatalogItemId>,
761    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
762    pub drop_ids: Vec<CatalogItemId>,
763    pub if_not_exists: bool,
764    /// True if the materialized view contains an expression that can make the exact column list
765    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
766    pub ambiguous_columns: bool,
767}
768
769#[derive(Debug, Clone)]
770pub struct CreateContinualTaskPlan {
771    pub name: QualifiedItemName,
772    /// During initial creation, the `LocalId` placeholder for this CT in `continual_task.expr`.
773    /// None on restart.
774    pub placeholder_id: Option<mz_expr::LocalId>,
775    pub desc: RelationDesc,
776    /// ID of the collection we read into this continual task.
777    pub input_id: GlobalId,
778    pub with_snapshot: bool,
779    /// Definition for the continual task.
780    pub continual_task: MaterializedView,
781}
782
783#[derive(Debug, Clone)]
784pub struct CreateNetworkPolicyPlan {
785    pub name: String,
786    pub rules: Vec<NetworkPolicyRule>,
787}
788
789#[derive(Debug, Clone)]
790pub struct AlterNetworkPolicyPlan {
791    pub id: NetworkPolicyId,
792    pub name: String,
793    pub rules: Vec<NetworkPolicyRule>,
794}
795
796#[derive(Debug, Clone)]
797pub struct CreateIndexPlan {
798    pub name: QualifiedItemName,
799    pub index: Index,
800    pub if_not_exists: bool,
801}
802
803#[derive(Debug)]
804pub struct CreateTypePlan {
805    pub name: QualifiedItemName,
806    pub typ: Type,
807}
808
809#[derive(Debug)]
810pub struct DropObjectsPlan {
811    /// The IDs of only the objects directly referenced in the `DROP` statement.
812    pub referenced_ids: Vec<ObjectId>,
813    /// All object IDs to drop. Includes `referenced_ids` and all descendants.
814    pub drop_ids: Vec<ObjectId>,
815    /// The type of object that was dropped explicitly in the DROP statement. `ids` may contain
816    /// objects of different types due to CASCADE.
817    pub object_type: ObjectType,
818}
819
820#[derive(Debug)]
821pub struct DropOwnedPlan {
822    /// The role IDs that own the objects.
823    pub role_ids: Vec<RoleId>,
824    /// All object IDs to drop.
825    pub drop_ids: Vec<ObjectId>,
826    /// The privileges to revoke.
827    pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
828    /// The default privileges to revoke.
829    pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
830}
831
832#[derive(Debug)]
833pub struct ShowVariablePlan {
834    pub name: String,
835}
836
837#[derive(Debug)]
838pub struct InspectShardPlan {
839    /// ID of the storage collection to inspect.
840    pub id: GlobalId,
841}
842
843#[derive(Debug)]
844pub struct SetVariablePlan {
845    pub name: String,
846    pub value: VariableValue,
847    pub local: bool,
848}
849
850#[derive(Debug)]
851pub enum VariableValue {
852    Default,
853    Values(Vec<String>),
854}
855
856#[derive(Debug)]
857pub struct ResetVariablePlan {
858    pub name: String,
859}
860
861#[derive(Debug)]
862pub struct SetTransactionPlan {
863    pub local: bool,
864    pub modes: Vec<TransactionMode>,
865}
866
867/// A plan for select statements.
868#[derive(Clone, Debug)]
869pub struct SelectPlan {
870    /// The `SELECT` statement itself. Used for explain/notices, but not otherwise
871    /// load-bearing. Boxed to save stack space.
872    pub select: Option<Box<SelectStatement<Aug>>>,
873    /// The plan as a HIR.
874    pub source: HirRelationExpr,
875    /// At what time should this select happen?
876    pub when: QueryWhen,
877    /// Instructions how to form the result set.
878    pub finishing: RowSetFinishing,
879    /// For `COPY TO STDOUT`, the format to use.
880    pub copy_to: Option<CopyFormat>,
881}
882
883impl SelectPlan {
884    pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
885        let arity = typ.arity();
886        SelectPlan {
887            select: None,
888            source: HirRelationExpr::Constant { rows, typ },
889            when: QueryWhen::Immediately,
890            finishing: RowSetFinishing::trivial(arity),
891            copy_to: None,
892        }
893    }
894}
895
896#[derive(Debug, Clone)]
897pub enum SubscribeOutput {
898    Diffs,
899    WithinTimestampOrderBy {
900        /// We pretend that mz_diff is prepended to the normal columns, making it index 0
901        order_by: Vec<ColumnOrder>,
902    },
903    EnvelopeUpsert {
904        /// Order by with just keys
905        order_by_keys: Vec<ColumnOrder>,
906    },
907    EnvelopeDebezium {
908        /// Order by with just keys
909        order_by_keys: Vec<ColumnOrder>,
910    },
911}
912
913#[derive(Debug, Clone)]
914pub struct SubscribePlan {
915    pub from: SubscribeFrom,
916    pub with_snapshot: bool,
917    pub when: QueryWhen,
918    pub up_to: Option<Timestamp>,
919    pub copy_to: Option<CopyFormat>,
920    pub emit_progress: bool,
921    pub output: SubscribeOutput,
922}
923
924#[derive(Debug, Clone)]
925pub enum SubscribeFrom {
926    /// ID of the collection to subscribe to.
927    Id(GlobalId),
928    /// Query to subscribe to.
929    Query {
930        expr: MirRelationExpr,
931        desc: RelationDesc,
932    },
933}
934
935impl SubscribeFrom {
936    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
937        match self {
938            SubscribeFrom::Id(id) => BTreeSet::from([*id]),
939            SubscribeFrom::Query { expr, .. } => expr.depends_on(),
940        }
941    }
942
943    pub fn contains_temporal(&self) -> bool {
944        match self {
945            SubscribeFrom::Id(_) => false,
946            SubscribeFrom::Query { expr, .. } => expr.contains_temporal(),
947        }
948    }
949}
950
951#[derive(Debug)]
952pub struct ShowCreatePlan {
953    pub id: ObjectId,
954    pub row: Row,
955}
956
957#[derive(Debug)]
958pub struct ShowColumnsPlan {
959    pub id: CatalogItemId,
960    pub select_plan: SelectPlan,
961    pub new_resolved_ids: ResolvedIds,
962}
963
964#[derive(Debug)]
965pub struct CopyFromPlan {
966    /// Table we're copying into.
967    pub target_id: CatalogItemId,
968    /// Human-readable full name of the target table.
969    pub target_name: String,
970    /// Source we're copying data from.
971    pub source: CopyFromSource,
972    /// How input columns map to those on the destination table.
973    ///
974    /// TODO(cf2): Remove this field in favor of the mfp.
975    pub columns: Vec<ColumnIndex>,
976    /// [`RelationDesc`] describing the input data.
977    pub source_desc: RelationDesc,
978    /// Changes the shape of the input data to match the destination table.
979    pub mfp: MapFilterProject,
980    /// Format specific params for copying the input data.
981    pub params: CopyFormatParams<'static>,
982    /// Filter for the source files we're copying from, e.g. an S3 prefix.
983    pub filter: Option<CopyFromFilter>,
984}
985
986#[derive(Debug)]
987pub enum CopyFromSource {
988    /// Copying from a file local to the user, transmitted via pgwire.
989    Stdin,
990    /// A remote resource, e.g. HTTP file.
991    ///
992    /// The contained [`HirScalarExpr`] evaluates to the Url for the remote resource.
993    Url(HirScalarExpr),
994    /// A file in an S3 bucket.
995    AwsS3 {
996        /// Expression that evaluates to the file we want to copy.
997        uri: HirScalarExpr,
998        /// Details for how we connect to AWS S3.
999        connection: AwsConnection,
1000        /// ID of the connection object.
1001        connection_id: CatalogItemId,
1002    },
1003}
1004
1005#[derive(Debug)]
1006pub enum CopyFromFilter {
1007    Files(Vec<String>),
1008    Pattern(String),
1009}
1010
1011/// `COPY TO S3`
1012///
1013/// (This is a completely different thing from `COPY TO STDOUT`. That is a `Plan::Select` with
1014/// `copy_to` set.)
1015#[derive(Debug, Clone)]
1016pub struct CopyToPlan {
1017    /// The select query plan whose data will be copied to destination uri.
1018    pub select_plan: SelectPlan,
1019    pub desc: RelationDesc,
1020    /// The scalar expression to be resolved to get the destination uri.
1021    pub to: HirScalarExpr,
1022    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
1023    /// The ID of the connection.
1024    pub connection_id: CatalogItemId,
1025    pub format: S3SinkFormat,
1026    pub max_file_size: u64,
1027}
1028
1029#[derive(Clone, Debug)]
1030pub struct ExplainPlanPlan {
1031    pub stage: ExplainStage,
1032    pub format: ExplainFormat,
1033    pub config: ExplainConfig,
1034    pub explainee: Explainee,
1035}
1036
1037/// The type of object to be explained
1038#[derive(Clone, Debug)]
1039pub enum Explainee {
1040    /// Lookup and explain a plan saved for an view.
1041    View(CatalogItemId),
1042    /// Lookup and explain a plan saved for an existing materialized view.
1043    MaterializedView(CatalogItemId),
1044    /// Lookup and explain a plan saved for an existing index.
1045    Index(CatalogItemId),
1046    /// Replan an existing view.
1047    ReplanView(CatalogItemId),
1048    /// Replan an existing materialized view.
1049    ReplanMaterializedView(CatalogItemId),
1050    /// Replan an existing index.
1051    ReplanIndex(CatalogItemId),
1052    /// A SQL statement.
1053    Statement(ExplaineeStatement),
1054}
1055
1056/// Explainee types that are statements.
1057#[derive(Clone, Debug, EnumKind)]
1058#[enum_kind(ExplaineeStatementKind)]
1059pub enum ExplaineeStatement {
1060    /// The object to be explained is a SELECT statement.
1061    Select {
1062        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1063        broken: bool,
1064        plan: plan::SelectPlan,
1065        desc: RelationDesc,
1066    },
1067    /// The object to be explained is a CREATE VIEW.
1068    CreateView {
1069        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1070        broken: bool,
1071        plan: plan::CreateViewPlan,
1072    },
1073    /// The object to be explained is a CREATE MATERIALIZED VIEW.
1074    CreateMaterializedView {
1075        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1076        broken: bool,
1077        plan: plan::CreateMaterializedViewPlan,
1078    },
1079    /// The object to be explained is a CREATE INDEX.
1080    CreateIndex {
1081        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1082        broken: bool,
1083        plan: plan::CreateIndexPlan,
1084    },
1085    /// The object to be explained is a SUBSCRIBE statement.
1086    Subscribe {
1087        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1088        broken: bool,
1089        plan: plan::SubscribePlan,
1090    },
1091}
1092
1093impl ExplaineeStatement {
1094    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1095        match self {
1096            Self::Select { plan, .. } => plan.source.depends_on(),
1097            Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1098            Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1099            Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1100            Self::Subscribe { plan, .. } => plan.from.depends_on(),
1101        }
1102    }
1103
1104    /// Statements that have their `broken` flag set are expected to cause a
1105    /// panic in the optimizer code. In this case:
1106    ///
1107    /// 1. The optimizer pipeline execution will stop, but the panic will be
1108    ///    intercepted and will not propagate to the caller. The partial
1109    ///    optimizer trace collected until this point will be available.
1110    /// 2. The optimizer trace tracing subscriber will delegate regular tracing
1111    ///    spans and events to the default subscriber.
1112    ///
1113    /// This is useful when debugging queries that cause panics.
1114    pub fn broken(&self) -> bool {
1115        match self {
1116            Self::Select { broken, .. } => *broken,
1117            Self::CreateView { broken, .. } => *broken,
1118            Self::CreateMaterializedView { broken, .. } => *broken,
1119            Self::CreateIndex { broken, .. } => *broken,
1120            Self::Subscribe { broken, .. } => *broken,
1121        }
1122    }
1123}
1124
1125impl ExplaineeStatementKind {
1126    pub fn supports(&self, stage: &ExplainStage) -> bool {
1127        use ExplainStage::*;
1128        match self {
1129            Self::Select => true,
1130            Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1131            Self::CreateMaterializedView => true,
1132            Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1133            // SUBSCRIBE doesn't support RAW, DECORRELATED, or LOCAL stages because
1134            // it takes MIR directly rather than going through HIR lowering.
1135            Self::Subscribe => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1136        }
1137    }
1138}
1139
1140impl std::fmt::Display for ExplaineeStatementKind {
1141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1142        match self {
1143            Self::Select => write!(f, "SELECT"),
1144            Self::CreateView => write!(f, "CREATE VIEW"),
1145            Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1146            Self::CreateIndex => write!(f, "CREATE INDEX"),
1147            Self::Subscribe => write!(f, "SUBSCRIBE"),
1148        }
1149    }
1150}
1151
1152#[derive(Clone, Debug)]
1153pub struct ExplainPushdownPlan {
1154    pub explainee: Explainee,
1155}
1156
1157#[derive(Clone, Debug)]
1158pub struct ExplainTimestampPlan {
1159    pub format: ExplainFormat,
1160    pub raw_plan: HirRelationExpr,
1161    pub when: QueryWhen,
1162}
1163
1164#[derive(Debug)]
1165pub struct ExplainSinkSchemaPlan {
1166    pub sink_from: GlobalId,
1167    pub json_schema: String,
1168}
1169
1170#[derive(Debug)]
1171pub struct SendDiffsPlan {
1172    pub id: CatalogItemId,
1173    pub updates: Vec<(Row, Diff)>,
1174    pub kind: MutationKind,
1175    pub returning: Vec<(Row, NonZeroUsize)>,
1176    pub max_result_size: u64,
1177}
1178
1179#[derive(Debug)]
1180pub struct InsertPlan {
1181    pub id: CatalogItemId,
1182    pub values: HirRelationExpr,
1183    pub returning: Vec<mz_expr::MirScalarExpr>,
1184}
1185
1186#[derive(Debug)]
1187pub struct ReadThenWritePlan {
1188    pub id: CatalogItemId,
1189    pub selection: HirRelationExpr,
1190    pub finishing: RowSetFinishing,
1191    pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1192    pub kind: MutationKind,
1193    pub returning: Vec<mz_expr::MirScalarExpr>,
1194}
1195
1196/// Generated by `ALTER ... IF EXISTS` if the named object did not exist.
1197#[derive(Debug)]
1198pub struct AlterNoopPlan {
1199    pub object_type: ObjectType,
1200}
1201
1202#[derive(Debug)]
1203pub struct AlterSetClusterPlan {
1204    pub id: CatalogItemId,
1205    pub set_cluster: ClusterId,
1206}
1207
1208#[derive(Debug)]
1209pub struct AlterRetainHistoryPlan {
1210    pub id: CatalogItemId,
1211    pub value: Option<Value>,
1212    pub window: CompactionWindow,
1213    pub object_type: ObjectType,
1214}
1215
1216#[derive(Debug)]
1217pub struct AlterSourceTimestampIntervalPlan {
1218    pub id: CatalogItemId,
1219    pub value: Option<Value>,
1220    pub interval: Duration,
1221}
1222
1223#[derive(Debug, Clone)]
1224
1225pub enum AlterOptionParameter<T = String> {
1226    Set(T),
1227    Reset,
1228    Unchanged,
1229}
1230
1231#[derive(Debug)]
1232pub enum AlterConnectionAction {
1233    RotateKeys,
1234    AlterOptions {
1235        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1236        drop_options: BTreeSet<ConnectionOptionName>,
1237        validate: bool,
1238    },
1239}
1240
1241#[derive(Debug)]
1242pub struct AlterConnectionPlan {
1243    pub id: CatalogItemId,
1244    pub action: AlterConnectionAction,
1245}
1246
1247#[derive(Debug)]
1248pub enum AlterSourceAction {
1249    AddSubsourceExports {
1250        subsources: Vec<CreateSourcePlanBundle>,
1251        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1252    },
1253    RefreshReferences {
1254        references: SourceReferences,
1255    },
1256}
1257
1258#[derive(Debug)]
1259pub struct AlterSourcePlan {
1260    pub item_id: CatalogItemId,
1261    pub ingestion_id: GlobalId,
1262    pub action: AlterSourceAction,
1263}
1264
1265#[derive(Debug, Clone)]
1266pub struct AlterSinkPlan {
1267    pub item_id: CatalogItemId,
1268    pub global_id: GlobalId,
1269    pub sink: Sink,
1270    pub with_snapshot: bool,
1271    pub in_cluster: ClusterId,
1272}
1273
1274#[derive(Debug, Clone)]
1275pub struct AlterClusterPlan {
1276    pub id: ClusterId,
1277    pub name: String,
1278    pub options: PlanClusterOption,
1279    pub strategy: AlterClusterPlanStrategy,
1280}
1281
1282#[derive(Debug)]
1283pub struct AlterClusterRenamePlan {
1284    pub id: ClusterId,
1285    pub name: String,
1286    pub to_name: String,
1287}
1288
1289#[derive(Debug)]
1290pub struct AlterClusterReplicaRenamePlan {
1291    pub cluster_id: ClusterId,
1292    pub replica_id: ReplicaId,
1293    pub name: QualifiedReplica,
1294    pub to_name: String,
1295}
1296
1297#[derive(Debug)]
1298pub struct AlterItemRenamePlan {
1299    pub id: CatalogItemId,
1300    pub current_full_name: FullItemName,
1301    pub to_name: String,
1302    pub object_type: ObjectType,
1303}
1304
1305#[derive(Debug)]
1306pub struct AlterSchemaRenamePlan {
1307    pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1308    pub new_schema_name: String,
1309}
1310
1311#[derive(Debug)]
1312pub struct AlterSchemaSwapPlan {
1313    pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1314    pub schema_a_name: String,
1315    pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1316    pub schema_b_name: String,
1317    pub name_temp: String,
1318}
1319
1320#[derive(Debug)]
1321pub struct AlterClusterSwapPlan {
1322    pub id_a: ClusterId,
1323    pub id_b: ClusterId,
1324    pub name_a: String,
1325    pub name_b: String,
1326    pub name_temp: String,
1327}
1328
1329#[derive(Debug)]
1330pub struct AlterSecretPlan {
1331    pub id: CatalogItemId,
1332    pub secret_as: MirScalarExpr,
1333}
1334
1335#[derive(Debug)]
1336pub struct AlterSystemSetPlan {
1337    pub name: String,
1338    pub value: VariableValue,
1339}
1340
1341#[derive(Debug)]
1342pub struct AlterSystemResetPlan {
1343    pub name: String,
1344}
1345
1346#[derive(Debug)]
1347pub struct AlterSystemResetAllPlan {}
1348
1349#[derive(Debug)]
1350pub struct AlterRolePlan {
1351    pub id: RoleId,
1352    pub name: String,
1353    pub option: PlannedAlterRoleOption,
1354}
1355
1356#[derive(Debug)]
1357pub struct AlterOwnerPlan {
1358    pub id: ObjectId,
1359    pub object_type: ObjectType,
1360    pub new_owner: RoleId,
1361}
1362
1363#[derive(Debug)]
1364pub struct AlterTablePlan {
1365    pub relation_id: CatalogItemId,
1366    pub column_name: ColumnName,
1367    pub column_type: SqlColumnType,
1368    pub raw_sql_type: RawDataType,
1369}
1370
1371#[derive(Debug, Clone)]
1372pub struct AlterMaterializedViewApplyReplacementPlan {
1373    pub id: CatalogItemId,
1374    pub replacement_id: CatalogItemId,
1375}
1376
1377#[derive(Debug)]
1378pub struct DeclarePlan {
1379    pub name: String,
1380    pub stmt: Statement<Raw>,
1381    pub sql: String,
1382    pub params: Params,
1383}
1384
1385#[derive(Debug)]
1386pub struct FetchPlan {
1387    pub name: String,
1388    pub count: Option<FetchDirection>,
1389    pub timeout: ExecuteTimeout,
1390}
1391
1392#[derive(Debug)]
1393pub struct ClosePlan {
1394    pub name: String,
1395}
1396
1397#[derive(Debug)]
1398pub struct PreparePlan {
1399    pub name: String,
1400    pub stmt: Statement<Raw>,
1401    pub sql: String,
1402    pub desc: StatementDesc,
1403}
1404
1405#[derive(Debug)]
1406pub struct ExecutePlan {
1407    pub name: String,
1408    pub params: Params,
1409}
1410
1411#[derive(Debug)]
1412pub struct DeallocatePlan {
1413    pub name: Option<String>,
1414}
1415
1416#[derive(Debug)]
1417pub struct RaisePlan {
1418    pub severity: NoticeSeverity,
1419}
1420
1421#[derive(Debug)]
1422pub struct GrantRolePlan {
1423    /// The roles that are gaining members.
1424    pub role_ids: Vec<RoleId>,
1425    /// The roles that will be added to `role_id`.
1426    pub member_ids: Vec<RoleId>,
1427    /// The role that granted the membership.
1428    pub grantor_id: RoleId,
1429}
1430
1431#[derive(Debug)]
1432pub struct RevokeRolePlan {
1433    /// The roles that are losing members.
1434    pub role_ids: Vec<RoleId>,
1435    /// The roles that will be removed from `role_id`.
1436    pub member_ids: Vec<RoleId>,
1437    /// The role that revoked the membership.
1438    pub grantor_id: RoleId,
1439}
1440
1441#[derive(Debug)]
1442pub struct UpdatePrivilege {
1443    /// The privileges being granted/revoked on an object.
1444    pub acl_mode: AclMode,
1445    /// The ID of the object receiving privileges.
1446    pub target_id: SystemObjectId,
1447    /// The role that is granting the privileges.
1448    pub grantor: RoleId,
1449}
1450
1451#[derive(Debug)]
1452pub struct GrantPrivilegesPlan {
1453    /// Description of each privilege being granted.
1454    pub update_privileges: Vec<UpdatePrivilege>,
1455    /// The roles that will granted the privileges.
1456    pub grantees: Vec<RoleId>,
1457}
1458
1459#[derive(Debug)]
1460pub struct RevokePrivilegesPlan {
1461    /// Description of each privilege being revoked.
1462    pub update_privileges: Vec<UpdatePrivilege>,
1463    /// The roles that will have privileges revoked.
1464    pub revokees: Vec<RoleId>,
1465}
1466#[derive(Debug)]
1467pub struct AlterDefaultPrivilegesPlan {
1468    /// Description of objects that match this default privilege.
1469    pub privilege_objects: Vec<DefaultPrivilegeObject>,
1470    /// The privilege to be granted/revoked from the matching objects.
1471    pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1472    /// Whether this is a grant or revoke.
1473    pub is_grant: bool,
1474}
1475
1476#[derive(Debug)]
1477pub struct ReassignOwnedPlan {
1478    /// The roles whose owned objects are being reassigned.
1479    pub old_roles: Vec<RoleId>,
1480    /// The new owner of the objects.
1481    pub new_role: RoleId,
1482    /// All object IDs to reassign.
1483    pub reassign_ids: Vec<ObjectId>,
1484}
1485
1486#[derive(Debug)]
1487pub struct CommentPlan {
1488    /// The object that this comment is associated with.
1489    pub object_id: CommentObjectId,
1490    /// A sub-component of the object that this comment is associated with, e.g. a column.
1491    ///
1492    /// TODO(parkmycar): <https://github.com/MaterializeInc/database-issues/issues/6711>.
1493    pub sub_component: Option<usize>,
1494    /// The comment itself. If `None` that indicates we should clear the existing comment.
1495    pub comment: Option<String>,
1496}
1497
1498#[derive(Clone, Debug)]
1499pub enum TableDataSource {
1500    /// The table owns data created via INSERT/UPDATE/DELETE statements.
1501    TableWrites { defaults: Vec<Expr<Aug>> },
1502
1503    /// The table receives its data from the identified `DataSourceDesc`.
1504    /// This table type does not support INSERT/UPDATE/DELETE statements.
1505    DataSource {
1506        desc: DataSourceDesc,
1507        timeline: Timeline,
1508    },
1509}
1510
1511#[derive(Clone, Debug)]
1512pub struct Table {
1513    pub create_sql: String,
1514    pub desc: VersionedRelationDesc,
1515    pub temporary: bool,
1516    pub compaction_window: Option<CompactionWindow>,
1517    pub data_source: TableDataSource,
1518}
1519
1520#[derive(Clone, Debug)]
1521pub struct Source {
1522    pub create_sql: String,
1523    pub data_source: DataSourceDesc,
1524    pub desc: RelationDesc,
1525    pub compaction_window: Option<CompactionWindow>,
1526}
1527
1528#[derive(Debug, Clone)]
1529pub enum DataSourceDesc {
1530    /// Receives data from an external system.
1531    Ingestion(SourceDesc<ReferencedConnection>),
1532    /// Receives data from an external system.
1533    OldSyntaxIngestion {
1534        desc: SourceDesc<ReferencedConnection>,
1535        // If we're dealing with an old syntax ingestion the progress id will be some other collection
1536        // and the ingestion itself will have the data from a default external reference
1537        progress_subsource: CatalogItemId,
1538        data_config: SourceExportDataConfig<ReferencedConnection>,
1539        details: SourceExportDetails,
1540    },
1541    /// This source receives its data from the identified ingestion,
1542    /// specifically the output identified by `external_reference`.
1543    IngestionExport {
1544        ingestion_id: CatalogItemId,
1545        external_reference: UnresolvedItemName,
1546        details: SourceExportDetails,
1547        data_config: SourceExportDataConfig<ReferencedConnection>,
1548    },
1549    /// Receives data from the source's reclocking/remapping operations.
1550    Progress,
1551    /// Receives data from HTTP post requests.
1552    Webhook {
1553        validate_using: Option<WebhookValidation>,
1554        body_format: WebhookBodyFormat,
1555        headers: WebhookHeaders,
1556        /// Only `Some` when created via `CREATE TABLE ... FROM WEBHOOK`.
1557        cluster_id: Option<StorageInstanceId>,
1558    },
1559}
1560
1561#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1562pub struct WebhookValidation {
1563    /// The expression used to validate a request.
1564    pub expression: MirScalarExpr,
1565    /// Description of the source that will be created.
1566    pub relation_desc: RelationDesc,
1567    /// The column index to provide the request body and whether to provide it as bytes.
1568    pub bodies: Vec<(usize, bool)>,
1569    /// The column index to provide the request headers and whether to provide the values as bytes.
1570    pub headers: Vec<(usize, bool)>,
1571    /// Any secrets that are used in that validation.
1572    pub secrets: Vec<WebhookValidationSecret>,
1573}
1574
1575impl WebhookValidation {
1576    const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1577
1578    /// Attempt to reduce the internal [`MirScalarExpr`] into a simpler expression.
1579    ///
1580    /// The reduction happens on a separate thread, we also only wait for
1581    /// `WebhookValidation::MAX_REDUCE_TIME` before timing out and returning an error.
1582    pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1583        let WebhookValidation {
1584            expression,
1585            relation_desc,
1586            ..
1587        } = self;
1588
1589        // On a different thread, attempt to reduce the expression.
1590        let mut expression_ = expression.clone();
1591        let desc_ = relation_desc.clone();
1592        let reduce_task = mz_ore::task::spawn_blocking(
1593            || "webhook-validation-reduce",
1594            move || {
1595                let repr_col_types: Vec<ReprColumnType> = desc_
1596                    .typ()
1597                    .column_types
1598                    .iter()
1599                    .map(ReprColumnType::from)
1600                    .collect();
1601                expression_.reduce(&repr_col_types);
1602                expression_
1603            },
1604        );
1605
1606        match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1607            Ok(reduced_expr) => {
1608                *expression = reduced_expr;
1609                Ok(())
1610            }
1611            Err(_) => Err("timeout"),
1612        }
1613    }
1614}
1615
1616#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1617pub struct WebhookHeaders {
1618    /// Optionally include a column named `headers` whose content is possibly filtered.
1619    pub header_column: Option<WebhookHeaderFilters>,
1620    /// The column index to provide the specific request header, and whether to provide it as bytes.
1621    pub mapped_headers: BTreeMap<usize, (String, bool)>,
1622}
1623
1624impl WebhookHeaders {
1625    /// Returns the number of columns needed to represent our headers.
1626    pub fn num_columns(&self) -> usize {
1627        let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1628        let mapped_headers = self.mapped_headers.len();
1629
1630        header_column + mapped_headers
1631    }
1632}
1633
1634#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1635pub struct WebhookHeaderFilters {
1636    pub block: BTreeSet<String>,
1637    pub allow: BTreeSet<String>,
1638}
1639
1640#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Arbitrary)]
1641pub enum WebhookBodyFormat {
1642    Json { array: bool },
1643    Bytes,
1644    Text,
1645}
1646
1647impl From<WebhookBodyFormat> for SqlScalarType {
1648    fn from(value: WebhookBodyFormat) -> Self {
1649        match value {
1650            WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1651            WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1652            WebhookBodyFormat::Text => SqlScalarType::String,
1653        }
1654    }
1655}
1656
1657#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1658pub struct WebhookValidationSecret {
1659    /// Identifies the secret by [`CatalogItemId`].
1660    pub id: CatalogItemId,
1661    /// Column index for the expression context that this secret was originally evaluated in.
1662    pub column_idx: usize,
1663    /// Whether or not this secret should be provided to the expression as Bytes or a String.
1664    pub use_bytes: bool,
1665}
1666
1667#[derive(Clone, Debug)]
1668pub struct Connection {
1669    pub create_sql: String,
1670    pub details: ConnectionDetails,
1671}
1672
1673#[derive(Clone, Debug, Serialize)]
1674pub enum ConnectionDetails {
1675    Kafka(KafkaConnection<ReferencedConnection>),
1676    Csr(CsrConnection<ReferencedConnection>),
1677    Postgres(PostgresConnection<ReferencedConnection>),
1678    Ssh {
1679        connection: SshConnection,
1680        key_1: SshKey,
1681        key_2: SshKey,
1682    },
1683    Aws(AwsConnection),
1684    AwsPrivatelink(AwsPrivatelinkConnection),
1685    MySql(MySqlConnection<ReferencedConnection>),
1686    SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1687    IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1688}
1689
1690impl ConnectionDetails {
1691    pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1692        match self {
1693            ConnectionDetails::Kafka(c) => {
1694                mz_storage_types::connections::Connection::Kafka(c.clone())
1695            }
1696            ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1697            ConnectionDetails::Postgres(c) => {
1698                mz_storage_types::connections::Connection::Postgres(c.clone())
1699            }
1700            ConnectionDetails::Ssh { connection, .. } => {
1701                mz_storage_types::connections::Connection::Ssh(connection.clone())
1702            }
1703            ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1704            ConnectionDetails::AwsPrivatelink(c) => {
1705                mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1706            }
1707            ConnectionDetails::MySql(c) => {
1708                mz_storage_types::connections::Connection::MySql(c.clone())
1709            }
1710            ConnectionDetails::SqlServer(c) => {
1711                mz_storage_types::connections::Connection::SqlServer(c.clone())
1712            }
1713            ConnectionDetails::IcebergCatalog(c) => {
1714                mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1715            }
1716        }
1717    }
1718}
1719
1720#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1721pub struct NetworkPolicyRule {
1722    pub name: String,
1723    pub action: NetworkPolicyRuleAction,
1724    pub address: PolicyAddress,
1725    pub direction: NetworkPolicyRuleDirection,
1726}
1727
1728#[derive(
1729    Debug,
1730    Clone,
1731    Serialize,
1732    Deserialize,
1733    PartialEq,
1734    Eq,
1735    Ord,
1736    PartialOrd,
1737    Hash
1738)]
1739pub enum NetworkPolicyRuleAction {
1740    Allow,
1741}
1742
1743impl std::fmt::Display for NetworkPolicyRuleAction {
1744    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1745        match self {
1746            Self::Allow => write!(f, "allow"),
1747        }
1748    }
1749}
1750impl TryFrom<&str> for NetworkPolicyRuleAction {
1751    type Error = PlanError;
1752    fn try_from(value: &str) -> Result<Self, Self::Error> {
1753        match value.to_uppercase().as_str() {
1754            "ALLOW" => Ok(Self::Allow),
1755            _ => Err(PlanError::Unstructured(
1756                "Allow is the only valid option".into(),
1757            )),
1758        }
1759    }
1760}
1761
1762#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1763pub enum NetworkPolicyRuleDirection {
1764    Ingress,
1765}
1766impl std::fmt::Display for NetworkPolicyRuleDirection {
1767    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1768        match self {
1769            Self::Ingress => write!(f, "ingress"),
1770        }
1771    }
1772}
1773impl TryFrom<&str> for NetworkPolicyRuleDirection {
1774    type Error = PlanError;
1775    fn try_from(value: &str) -> Result<Self, Self::Error> {
1776        match value.to_uppercase().as_str() {
1777            "INGRESS" => Ok(Self::Ingress),
1778            _ => Err(PlanError::Unstructured(
1779                "Ingress is the only valid option".into(),
1780            )),
1781        }
1782    }
1783}
1784
1785#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1786pub struct PolicyAddress(pub IpNet);
1787impl std::fmt::Display for PolicyAddress {
1788    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1789        write!(f, "{}", &self.0.to_string())
1790    }
1791}
1792impl From<String> for PolicyAddress {
1793    fn from(value: String) -> Self {
1794        Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1795    }
1796}
1797impl TryFrom<&str> for PolicyAddress {
1798    type Error = PlanError;
1799    fn try_from(value: &str) -> Result<Self, Self::Error> {
1800        let net = IpNet::from_str(value)
1801            .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1802        Ok(Self(net))
1803    }
1804}
1805
1806impl Serialize for PolicyAddress {
1807    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1808    where
1809        S: serde::Serializer,
1810    {
1811        serializer.serialize_str(&format!("{}", &self.0))
1812    }
1813}
1814
1815#[derive(Clone, Debug, Serialize)]
1816pub enum SshKey {
1817    PublicOnly(String),
1818    Both(SshKeyPair),
1819}
1820
1821impl SshKey {
1822    pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1823        match self {
1824            SshKey::PublicOnly(_) => None,
1825            SshKey::Both(key_pair) => Some(key_pair),
1826        }
1827    }
1828
1829    pub fn public_key(&self) -> String {
1830        match self {
1831            SshKey::PublicOnly(s) => s.into(),
1832            SshKey::Both(p) => p.ssh_public_key(),
1833        }
1834    }
1835}
1836
1837#[derive(Clone, Debug)]
1838pub struct Secret {
1839    pub create_sql: String,
1840    pub secret_as: MirScalarExpr,
1841}
1842
1843#[derive(Clone, Debug)]
1844pub struct Sink {
1845    /// Parse-able SQL that is stored durably and defines this sink.
1846    pub create_sql: String,
1847    /// Collection we read into this sink.
1848    pub from: GlobalId,
1849    /// Type of connection to the external service we sink into.
1850    pub connection: StorageSinkConnection<ReferencedConnection>,
1851    // TODO(guswynn): this probably should just be in the `connection`.
1852    pub envelope: SinkEnvelope,
1853    pub version: u64,
1854    pub commit_interval: Option<Duration>,
1855}
1856
1857#[derive(Clone, Debug)]
1858pub struct View {
1859    /// Parse-able SQL that is stored durably and defines this view.
1860    pub create_sql: String,
1861    /// Unoptimized high-level expression from parsing the `create_sql`.
1862    pub expr: HirRelationExpr,
1863    /// All of the catalog objects that are referenced by this view, according to the `expr`.
1864    pub dependencies: DependencyIds,
1865    /// Columns of this view.
1866    pub column_names: Vec<ColumnName>,
1867    /// If this view is created in the temporary schema, e.g. `CREATE TEMPORARY ...`.
1868    pub temporary: bool,
1869}
1870
1871#[derive(Clone, Debug)]
1872pub struct MaterializedView {
1873    /// Parse-able SQL that is stored durably and defines this materialized view.
1874    pub create_sql: String,
1875    /// Unoptimized high-level expression from parsing the `create_sql`.
1876    pub expr: HirRelationExpr,
1877    /// All of the catalog objects that are referenced by this materialized view, according to the `expr`.
1878    pub dependencies: DependencyIds,
1879    /// Columns of this view.
1880    pub column_names: Vec<ColumnName>,
1881    pub replacement_target: Option<CatalogItemId>,
1882    /// Cluster this materialized view will get installed on.
1883    pub cluster_id: ClusterId,
1884    /// If set, only install this materialized view's dataflow on the specified replica.
1885    pub target_replica: Option<ReplicaId>,
1886    pub non_null_assertions: Vec<usize>,
1887    pub compaction_window: Option<CompactionWindow>,
1888    pub refresh_schedule: Option<RefreshSchedule>,
1889    pub as_of: Option<Timestamp>,
1890}
1891
1892#[derive(Clone, Debug)]
1893pub struct Index {
1894    /// Parse-able SQL that is stored durably and defines this index.
1895    pub create_sql: String,
1896    /// Collection this index is on top of.
1897    pub on: GlobalId,
1898    pub keys: Vec<mz_expr::MirScalarExpr>,
1899    pub compaction_window: Option<CompactionWindow>,
1900    pub cluster_id: ClusterId,
1901}
1902
1903#[derive(Clone, Debug)]
1904pub struct Type {
1905    pub create_sql: String,
1906    pub inner: CatalogType<IdReference>,
1907}
1908
1909/// Specifies when a `Peek` or `Subscribe` should occur.
1910#[derive(Deserialize, Clone, Debug, PartialEq)]
1911pub enum QueryWhen {
1912    /// The peek should occur at the latest possible timestamp that allows the
1913    /// peek to complete immediately.
1914    Immediately,
1915    /// The peek should occur at a timestamp that allows the peek to see all
1916    /// data written to tables within Materialize.
1917    FreshestTableWrite,
1918    /// The peek should occur at the timestamp described by the specified
1919    /// expression.
1920    ///
1921    /// The expression may have any type.
1922    AtTimestamp(Timestamp),
1923    /// Same as Immediately, but will also advance to at least the specified
1924    /// expression.
1925    AtLeastTimestamp(Timestamp),
1926}
1927
1928impl QueryWhen {
1929    /// Returns a timestamp to which the candidate must be advanced.
1930    pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1931        match self {
1932            QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1933            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1934        }
1935    }
1936    /// Returns whether the candidate's upper bound is constrained.
1937    /// This is only true for `AtTimestamp` since it is the only variant that
1938    /// specifies a timestamp.
1939    pub fn constrains_upper(&self) -> bool {
1940        match self {
1941            QueryWhen::AtTimestamp(_) => true,
1942            QueryWhen::AtLeastTimestamp(_)
1943            | QueryWhen::Immediately
1944            | QueryWhen::FreshestTableWrite => false,
1945        }
1946    }
1947    /// Returns whether the candidate must be advanced to the since.
1948    pub fn advance_to_since(&self) -> bool {
1949        match self {
1950            QueryWhen::Immediately
1951            | QueryWhen::AtLeastTimestamp(_)
1952            | QueryWhen::FreshestTableWrite => true,
1953            QueryWhen::AtTimestamp(_) => false,
1954        }
1955    }
1956    /// Returns whether the candidate can be advanced to the upper.
1957    pub fn can_advance_to_upper(&self) -> bool {
1958        match self {
1959            QueryWhen::Immediately => true,
1960            QueryWhen::FreshestTableWrite
1961            | QueryWhen::AtTimestamp(_)
1962            | QueryWhen::AtLeastTimestamp(_) => false,
1963        }
1964    }
1965
1966    /// Returns whether the candidate can be advanced to the timeline's timestamp.
1967    pub fn can_advance_to_timeline_ts(&self) -> bool {
1968        match self {
1969            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1970            QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1971        }
1972    }
1973    /// Returns whether the candidate must be advanced to the timeline's timestamp.
1974    pub fn must_advance_to_timeline_ts(&self) -> bool {
1975        match self {
1976            QueryWhen::FreshestTableWrite => true,
1977            QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1978                false
1979            }
1980        }
1981    }
1982    /// Returns whether the selected timestamp should be tracked within the current transaction.
1983    pub fn is_transactional(&self) -> bool {
1984        match self {
1985            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1986            QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1987        }
1988    }
1989}
1990
1991#[derive(Debug, Copy, Clone)]
1992pub enum MutationKind {
1993    Insert,
1994    Update,
1995    Delete,
1996}
1997
1998#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1999pub enum CopyFormat {
2000    Text,
2001    Csv,
2002    Binary,
2003    Parquet,
2004}
2005
2006#[derive(Debug, Copy, Clone)]
2007pub enum ExecuteTimeout {
2008    None,
2009    Seconds(f64),
2010    WaitOnce,
2011}
2012
2013#[derive(Clone, Debug)]
2014pub enum IndexOption {
2015    /// Configures the logical compaction window for an index.
2016    RetainHistory(CompactionWindow),
2017}
2018
2019#[derive(Clone, Debug)]
2020pub enum TableOption {
2021    /// Configures the logical compaction window for a table.
2022    RetainHistory(CompactionWindow),
2023}
2024
2025#[derive(Clone, Debug)]
2026pub struct PlanClusterOption {
2027    pub availability_zones: AlterOptionParameter<Vec<String>>,
2028    pub introspection_debugging: AlterOptionParameter<bool>,
2029    pub introspection_interval: AlterOptionParameter<OptionalDuration>,
2030    pub managed: AlterOptionParameter<bool>,
2031    pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
2032    pub replication_factor: AlterOptionParameter<u32>,
2033    pub size: AlterOptionParameter,
2034    pub schedule: AlterOptionParameter<ClusterSchedule>,
2035    pub workload_class: AlterOptionParameter<Option<String>>,
2036}
2037
2038impl Default for PlanClusterOption {
2039    fn default() -> Self {
2040        Self {
2041            availability_zones: AlterOptionParameter::Unchanged,
2042            introspection_debugging: AlterOptionParameter::Unchanged,
2043            introspection_interval: AlterOptionParameter::Unchanged,
2044            managed: AlterOptionParameter::Unchanged,
2045            replicas: AlterOptionParameter::Unchanged,
2046            replication_factor: AlterOptionParameter::Unchanged,
2047            size: AlterOptionParameter::Unchanged,
2048            schedule: AlterOptionParameter::Unchanged,
2049            workload_class: AlterOptionParameter::Unchanged,
2050        }
2051    }
2052}
2053
2054#[derive(Clone, Debug, PartialEq, Eq)]
2055pub enum AlterClusterPlanStrategy {
2056    None,
2057    For(Duration),
2058    UntilReady {
2059        on_timeout: OnTimeoutAction,
2060        timeout: Duration,
2061    },
2062}
2063
2064#[derive(Clone, Debug, PartialEq, Eq)]
2065pub enum OnTimeoutAction {
2066    Commit,
2067    Rollback,
2068}
2069
2070impl Default for OnTimeoutAction {
2071    fn default() -> Self {
2072        Self::Commit
2073    }
2074}
2075
2076impl TryFrom<&str> for OnTimeoutAction {
2077    type Error = PlanError;
2078    fn try_from(value: &str) -> Result<Self, Self::Error> {
2079        match value.to_uppercase().as_str() {
2080            "COMMIT" => Ok(Self::Commit),
2081            "ROLLBACK" => Ok(Self::Rollback),
2082            _ => Err(PlanError::Unstructured(
2083                "Valid options are COMMIT, ROLLBACK".into(),
2084            )),
2085        }
2086    }
2087}
2088
2089impl AlterClusterPlanStrategy {
2090    pub fn is_none(&self) -> bool {
2091        matches!(self, Self::None)
2092    }
2093    pub fn is_some(&self) -> bool {
2094        !matches!(self, Self::None)
2095    }
2096}
2097
2098impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2099    type Error = PlanError;
2100
2101    fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2102        Ok(match value.wait {
2103            Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2104            Some(ClusterAlterOptionValue::UntilReady(options)) => {
2105                let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2106                Self::UntilReady {
2107                    timeout: match extracted.timeout {
2108                        Some(d) => d,
2109                        None => Err(PlanError::UntilReadyTimeoutRequired)?,
2110                    },
2111                    on_timeout: match extracted.on_timeout {
2112                        Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2113                            PlanError::InvalidOptionValue {
2114                                option_name: "ON TIMEOUT".into(),
2115                                err: Box::new(e),
2116                            }
2117                        })?,
2118                        None => OnTimeoutAction::default(),
2119                    },
2120                }
2121            }
2122            None => Self::None,
2123        })
2124    }
2125}
2126
2127/// A vector of values to which parameter references should be bound.
2128#[derive(Debug, Clone)]
2129pub struct Params {
2130    /// The datums that were provided in the EXECUTE statement.
2131    pub datums: Row,
2132    /// The types of the datums provided in the EXECUTE statement.
2133    pub execute_types: Vec<SqlScalarType>,
2134    /// The types that the prepared statement expects based on its definition.
2135    pub expected_types: Vec<SqlScalarType>,
2136}
2137
2138impl Params {
2139    /// Returns a `Params` with no parameters.
2140    pub fn empty() -> Params {
2141        Params {
2142            datums: Row::pack_slice(&[]),
2143            execute_types: vec![],
2144            expected_types: vec![],
2145        }
2146    }
2147}
2148
2149/// Controls planning of a SQL query.
2150#[derive(
2151    Ord,
2152    PartialOrd,
2153    Clone,
2154    Debug,
2155    Eq,
2156    PartialEq,
2157    Serialize,
2158    Deserialize,
2159    Hash,
2160    Copy
2161)]
2162pub struct PlanContext {
2163    pub wall_time: DateTime<Utc>,
2164    pub ignore_if_exists_errors: bool,
2165}
2166
2167impl PlanContext {
2168    pub fn new(wall_time: DateTime<Utc>) -> Self {
2169        Self {
2170            wall_time,
2171            ignore_if_exists_errors: false,
2172        }
2173    }
2174
2175    /// Return a PlanContext with zero values. This should only be used when
2176    /// planning is required but unused (like in `plan_create_table()`) or in
2177    /// tests.
2178    pub fn zero() -> Self {
2179        PlanContext {
2180            wall_time: now::to_datetime(NOW_ZERO()),
2181            ignore_if_exists_errors: false,
2182        }
2183    }
2184
2185    pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2186        self.ignore_if_exists_errors = value;
2187        self
2188    }
2189}