Skip to main content

mz_sql/
plan.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL planning.
11//!
12//! SQL planning is the process of taking the abstract syntax tree of a
13//! [`Statement`] and turning it into a [`Plan`] that the dataflow layer can
14//! execute.
15//!
16//! Statements must be purified before they can be planned. See the
17//! [`pure`](crate::pure) module for details.
18
19// Internal module layout.
20//
21// The entry point for planning is `statement::handle_statement`. That function
22// dispatches to a more specific `handle` function for the particular statement
23// type. For most statements, this `handle` function is uninteresting and short,
24// but anything involving a `SELECT` statement gets complicated. `SELECT`
25// queries wind through the functions in the `query` module, starting with
26// `plan_root_query` and fanning out based on the contents of the `SELECT`
27// statement.
28
29use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{CollectionPlan, ColumnOrder, MapFilterProject, MirScalarExpr, RowSetFinishing};
41use mz_ore::now::{self, NOW_ZERO};
42use mz_pgcopy::CopyFormatParams;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
44use mz_repr::explain::{ExplainConfig, ExplainFormat};
45use mz_repr::network_policy_id::NetworkPolicyId;
46use mz_repr::optimize::OptimizerFeatureOverrides;
47use mz_repr::refresh_schedule::RefreshSchedule;
48use mz_repr::role_id::RoleId;
49use mz_repr::{
50    CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, ReprColumnType, Row,
51    SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
52};
53use mz_sql_parser::ast::{
54    AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
55    RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
56    Value, WithOptionValue,
57};
58use mz_ssh_util::keys::SshKeyPair;
59use mz_storage_types::connections::aws::AwsConnection;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::connections::{
62    AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
63    MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
64};
65use mz_storage_types::instances::StorageInstanceId;
66use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
67use mz_storage_types::sources::{
68    SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
69};
70use proptest_derive::Arbitrary;
71use serde::{Deserialize, Serialize};
72
73use crate::ast::{
74    ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
75    TransactionAccessMode,
76};
77use crate::catalog::{
78    CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
79    RoleAttributesRaw,
80};
81use crate::names::{
82    Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
83    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
84};
85
86pub(crate) mod error;
87pub(crate) mod explain;
88pub(crate) mod hir;
89pub(crate) mod literal;
90pub(crate) mod lowering;
91pub(crate) mod notice;
92pub(crate) mod plan_utils;
93pub(crate) mod query;
94pub(crate) mod scope;
95pub(crate) mod side_effecting_func;
96pub(crate) mod statement;
97pub(crate) mod transform_ast;
98pub(crate) mod transform_hir;
99pub(crate) mod typeconv;
100pub(crate) mod with_options;
101
102use crate::plan;
103use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
104use crate::plan::with_options::OptionalDuration;
105pub use error::PlanError;
106pub use explain::normalize_subqueries;
107pub use hir::{
108    AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
109    WindowExprType,
110};
111pub use lowering::Config as HirToMirConfig;
112pub use notice::PlanNotice;
113pub use query::{ExprContext, QueryContext, QueryLifetime};
114pub use scope::Scope;
115pub use side_effecting_func::SideEffectingFunc;
116pub use statement::ddl::{
117    AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
118    PlannedAlterRoleOption, PlannedRoleVariable, SqlServerConfigOptionExtracted,
119};
120pub use statement::{
121    StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
122    resolve_cluster_for_materialized_view,
123};
124
125use self::statement::ddl::ClusterAlterOptionExtracted;
126use self::with_options::TryFromValue;
127
128/// Instructions for executing a SQL query.
129#[derive(Debug, EnumKind)]
130#[enum_kind(PlanKind)]
131pub enum Plan {
132    CreateConnection(CreateConnectionPlan),
133    CreateDatabase(CreateDatabasePlan),
134    CreateSchema(CreateSchemaPlan),
135    CreateRole(CreateRolePlan),
136    CreateCluster(CreateClusterPlan),
137    CreateClusterReplica(CreateClusterReplicaPlan),
138    CreateSource(CreateSourcePlan),
139    CreateSources(Vec<CreateSourcePlanBundle>),
140    CreateSecret(CreateSecretPlan),
141    CreateSink(CreateSinkPlan),
142    CreateTable(CreateTablePlan),
143    CreateView(CreateViewPlan),
144    CreateMaterializedView(CreateMaterializedViewPlan),
145    CreateContinualTask(CreateContinualTaskPlan),
146    CreateNetworkPolicy(CreateNetworkPolicyPlan),
147    CreateIndex(CreateIndexPlan),
148    CreateType(CreateTypePlan),
149    Comment(CommentPlan),
150    DiscardTemp,
151    DiscardAll,
152    DropObjects(DropObjectsPlan),
153    DropOwned(DropOwnedPlan),
154    EmptyQuery,
155    ShowAllVariables,
156    ShowCreate(ShowCreatePlan),
157    ShowColumns(ShowColumnsPlan),
158    ShowVariable(ShowVariablePlan),
159    InspectShard(InspectShardPlan),
160    SetVariable(SetVariablePlan),
161    ResetVariable(ResetVariablePlan),
162    SetTransaction(SetTransactionPlan),
163    StartTransaction(StartTransactionPlan),
164    CommitTransaction(CommitTransactionPlan),
165    AbortTransaction(AbortTransactionPlan),
166    Select(SelectPlan),
167    Subscribe(SubscribePlan),
168    CopyFrom(CopyFromPlan),
169    CopyTo(CopyToPlan),
170    ExplainPlan(ExplainPlanPlan),
171    ExplainPushdown(ExplainPushdownPlan),
172    ExplainTimestamp(ExplainTimestampPlan),
173    ExplainSinkSchema(ExplainSinkSchemaPlan),
174    Insert(InsertPlan),
175    AlterCluster(AlterClusterPlan),
176    AlterClusterSwap(AlterClusterSwapPlan),
177    AlterNoop(AlterNoopPlan),
178    AlterSetCluster(AlterSetClusterPlan),
179    AlterConnection(AlterConnectionPlan),
180    AlterSource(AlterSourcePlan),
181    AlterClusterRename(AlterClusterRenamePlan),
182    AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
183    AlterItemRename(AlterItemRenamePlan),
184    AlterSchemaRename(AlterSchemaRenamePlan),
185    AlterSchemaSwap(AlterSchemaSwapPlan),
186    AlterSecret(AlterSecretPlan),
187    AlterSink(AlterSinkPlan),
188    AlterSystemSet(AlterSystemSetPlan),
189    AlterSystemReset(AlterSystemResetPlan),
190    AlterSystemResetAll(AlterSystemResetAllPlan),
191    AlterRole(AlterRolePlan),
192    AlterOwner(AlterOwnerPlan),
193    AlterTableAddColumn(AlterTablePlan),
194    AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementPlan),
195    AlterNetworkPolicy(AlterNetworkPolicyPlan),
196    Declare(DeclarePlan),
197    Fetch(FetchPlan),
198    Close(ClosePlan),
199    ReadThenWrite(ReadThenWritePlan),
200    Prepare(PreparePlan),
201    Execute(ExecutePlan),
202    Deallocate(DeallocatePlan),
203    Raise(RaisePlan),
204    GrantRole(GrantRolePlan),
205    RevokeRole(RevokeRolePlan),
206    GrantPrivileges(GrantPrivilegesPlan),
207    RevokePrivileges(RevokePrivilegesPlan),
208    AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
209    ReassignOwned(ReassignOwnedPlan),
210    SideEffectingFunc(SideEffectingFunc),
211    ValidateConnection(ValidateConnectionPlan),
212    AlterRetainHistory(AlterRetainHistoryPlan),
213    AlterSourceTimestampInterval(AlterSourceTimestampIntervalPlan),
214}
215
216impl Plan {
217    /// Expresses which [`StatementKind`] can generate which set of
218    /// [`PlanKind`].
219    pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
220        match stmt {
221            StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
222            StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
223            StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
224            StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
225            StatementKind::AlterObjectRename => &[
226                PlanKind::AlterClusterRename,
227                PlanKind::AlterClusterReplicaRename,
228                PlanKind::AlterItemRename,
229                PlanKind::AlterSchemaRename,
230                PlanKind::AlterNoop,
231            ],
232            StatementKind::AlterObjectSwap => &[
233                PlanKind::AlterClusterSwap,
234                PlanKind::AlterSchemaSwap,
235                PlanKind::AlterNoop,
236            ],
237            StatementKind::AlterRole => &[PlanKind::AlterRole],
238            StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
239            StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
240            StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
241            StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
242            StatementKind::AlterSource => &[
243                PlanKind::AlterNoop,
244                PlanKind::AlterSource,
245                PlanKind::AlterRetainHistory,
246                PlanKind::AlterSourceTimestampInterval,
247            ],
248            StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
249            StatementKind::AlterSystemResetAll => {
250                &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
251            }
252            StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
253            StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
254            StatementKind::AlterTableAddColumn => {
255                &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
256            }
257            StatementKind::AlterMaterializedViewApplyReplacement => &[
258                PlanKind::AlterNoop,
259                PlanKind::AlterMaterializedViewApplyReplacement,
260            ],
261            StatementKind::Close => &[PlanKind::Close],
262            StatementKind::Comment => &[PlanKind::Comment],
263            StatementKind::Commit => &[PlanKind::CommitTransaction],
264            StatementKind::Copy => &[
265                PlanKind::CopyFrom,
266                PlanKind::Select,
267                PlanKind::Subscribe,
268                PlanKind::CopyTo,
269            ],
270            StatementKind::CreateCluster => &[PlanKind::CreateCluster],
271            StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
272            StatementKind::CreateConnection => &[PlanKind::CreateConnection],
273            StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
274            StatementKind::CreateIndex => &[PlanKind::CreateIndex],
275            StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
276            StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
277            StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
278            StatementKind::CreateRole => &[PlanKind::CreateRole],
279            StatementKind::CreateSchema => &[PlanKind::CreateSchema],
280            StatementKind::CreateSecret => &[PlanKind::CreateSecret],
281            StatementKind::CreateSink => &[PlanKind::CreateSink],
282            StatementKind::CreateSource | StatementKind::CreateSubsource => {
283                &[PlanKind::CreateSource]
284            }
285            StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
286            StatementKind::CreateTable => &[PlanKind::CreateTable],
287            StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
288            StatementKind::CreateType => &[PlanKind::CreateType],
289            StatementKind::CreateView => &[PlanKind::CreateView],
290            StatementKind::Deallocate => &[PlanKind::Deallocate],
291            StatementKind::Declare => &[PlanKind::Declare],
292            StatementKind::Delete => &[PlanKind::ReadThenWrite],
293            StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
294            StatementKind::DropObjects => &[PlanKind::DropObjects],
295            StatementKind::DropOwned => &[PlanKind::DropOwned],
296            StatementKind::Execute => &[PlanKind::Execute],
297            StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
298            StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
299            StatementKind::ExplainAnalyzeObject => &[PlanKind::Select],
300            StatementKind::ExplainAnalyzeCluster => &[PlanKind::Select],
301            StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
302            StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
303            StatementKind::Fetch => &[PlanKind::Fetch],
304            StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
305            StatementKind::GrantRole => &[PlanKind::GrantRole],
306            StatementKind::Insert => &[PlanKind::Insert],
307            StatementKind::Prepare => &[PlanKind::Prepare],
308            StatementKind::Raise => &[PlanKind::Raise],
309            StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
310            StatementKind::ResetVariable => &[PlanKind::ResetVariable],
311            StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
312            StatementKind::RevokeRole => &[PlanKind::RevokeRole],
313            StatementKind::Rollback => &[PlanKind::AbortTransaction],
314            StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
315            StatementKind::SetTransaction => &[PlanKind::SetTransaction],
316            StatementKind::SetVariable => &[PlanKind::SetVariable],
317            StatementKind::Show => &[
318                PlanKind::Select,
319                PlanKind::ShowVariable,
320                PlanKind::ShowCreate,
321                PlanKind::ShowColumns,
322                PlanKind::ShowAllVariables,
323                PlanKind::InspectShard,
324            ],
325            StatementKind::StartTransaction => &[PlanKind::StartTransaction],
326            StatementKind::Subscribe => &[PlanKind::Subscribe],
327            StatementKind::Update => &[PlanKind::ReadThenWrite],
328            StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
329            StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
330        }
331    }
332
333    /// Returns a human readable name of the plan. Meant for use in messages sent back to a user.
334    pub fn name(&self) -> &str {
335        match self {
336            Plan::CreateConnection(_) => "create connection",
337            Plan::CreateDatabase(_) => "create database",
338            Plan::CreateSchema(_) => "create schema",
339            Plan::CreateRole(_) => "create role",
340            Plan::CreateCluster(_) => "create cluster",
341            Plan::CreateClusterReplica(_) => "create cluster replica",
342            Plan::CreateSource(_) => "create source",
343            Plan::CreateSources(_) => "create source",
344            Plan::CreateSecret(_) => "create secret",
345            Plan::CreateSink(_) => "create sink",
346            Plan::CreateTable(_) => "create table",
347            Plan::CreateView(_) => "create view",
348            Plan::CreateMaterializedView(_) => "create materialized view",
349            Plan::CreateContinualTask(_) => "create continual task",
350            Plan::CreateIndex(_) => "create index",
351            Plan::CreateType(_) => "create type",
352            Plan::CreateNetworkPolicy(_) => "create network policy",
353            Plan::Comment(_) => "comment",
354            Plan::DiscardTemp => "discard temp",
355            Plan::DiscardAll => "discard all",
356            Plan::DropObjects(plan) => match plan.object_type {
357                ObjectType::Table => "drop table",
358                ObjectType::View => "drop view",
359                ObjectType::MaterializedView => "drop materialized view",
360                ObjectType::Source => "drop source",
361                ObjectType::Sink => "drop sink",
362                ObjectType::Index => "drop index",
363                ObjectType::Type => "drop type",
364                ObjectType::Role => "drop roles",
365                ObjectType::Cluster => "drop clusters",
366                ObjectType::ClusterReplica => "drop cluster replicas",
367                ObjectType::Secret => "drop secret",
368                ObjectType::Connection => "drop connection",
369                ObjectType::Database => "drop database",
370                ObjectType::Schema => "drop schema",
371                ObjectType::Func => "drop function",
372                ObjectType::ContinualTask => "drop continual task",
373                ObjectType::NetworkPolicy => "drop network policy",
374            },
375            Plan::DropOwned(_) => "drop owned",
376            Plan::EmptyQuery => "do nothing",
377            Plan::ShowAllVariables => "show all variables",
378            Plan::ShowCreate(_) => "show create",
379            Plan::ShowColumns(_) => "show columns",
380            Plan::ShowVariable(_) => "show variable",
381            Plan::InspectShard(_) => "inspect shard",
382            Plan::SetVariable(_) => "set variable",
383            Plan::ResetVariable(_) => "reset variable",
384            Plan::SetTransaction(_) => "set transaction",
385            Plan::StartTransaction(_) => "start transaction",
386            Plan::CommitTransaction(_) => "commit",
387            Plan::AbortTransaction(_) => "abort",
388            Plan::Select(_) => "select",
389            Plan::Subscribe(_) => "subscribe",
390            Plan::CopyFrom(_) => "copy from",
391            Plan::CopyTo(_) => "copy to",
392            Plan::ExplainPlan(_) => "explain plan",
393            Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
394            Plan::ExplainTimestamp(_) => "explain timestamp",
395            Plan::ExplainSinkSchema(_) => "explain schema",
396            Plan::Insert(_) => "insert",
397            Plan::AlterNoop(plan) => match plan.object_type {
398                ObjectType::Table => "alter table",
399                ObjectType::View => "alter view",
400                ObjectType::MaterializedView => "alter materialized view",
401                ObjectType::Source => "alter source",
402                ObjectType::Sink => "alter sink",
403                ObjectType::Index => "alter index",
404                ObjectType::Type => "alter type",
405                ObjectType::Role => "alter role",
406                ObjectType::Cluster => "alter cluster",
407                ObjectType::ClusterReplica => "alter cluster replica",
408                ObjectType::Secret => "alter secret",
409                ObjectType::Connection => "alter connection",
410                ObjectType::Database => "alter database",
411                ObjectType::Schema => "alter schema",
412                ObjectType::Func => "alter function",
413                ObjectType::ContinualTask => "alter continual task",
414                ObjectType::NetworkPolicy => "alter network policy",
415            },
416            Plan::AlterCluster(_) => "alter cluster",
417            Plan::AlterClusterRename(_) => "alter cluster rename",
418            Plan::AlterClusterSwap(_) => "alter cluster swap",
419            Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
420            Plan::AlterSetCluster(_) => "alter set cluster",
421            Plan::AlterConnection(_) => "alter connection",
422            Plan::AlterSource(_) => "alter source",
423            Plan::AlterItemRename(_) => "rename item",
424            Plan::AlterSchemaRename(_) => "alter rename schema",
425            Plan::AlterSchemaSwap(_) => "alter swap schema",
426            Plan::AlterSecret(_) => "alter secret",
427            Plan::AlterSink(_) => "alter sink",
428            Plan::AlterSystemSet(_) => "alter system",
429            Plan::AlterSystemReset(_) => "alter system",
430            Plan::AlterSystemResetAll(_) => "alter system",
431            Plan::AlterRole(_) => "alter role",
432            Plan::AlterNetworkPolicy(_) => "alter network policy",
433            Plan::AlterOwner(plan) => match plan.object_type {
434                ObjectType::Table => "alter table owner",
435                ObjectType::View => "alter view owner",
436                ObjectType::MaterializedView => "alter materialized view owner",
437                ObjectType::Source => "alter source owner",
438                ObjectType::Sink => "alter sink owner",
439                ObjectType::Index => "alter index owner",
440                ObjectType::Type => "alter type owner",
441                ObjectType::Role => "alter role owner",
442                ObjectType::Cluster => "alter cluster owner",
443                ObjectType::ClusterReplica => "alter cluster replica owner",
444                ObjectType::Secret => "alter secret owner",
445                ObjectType::Connection => "alter connection owner",
446                ObjectType::Database => "alter database owner",
447                ObjectType::Schema => "alter schema owner",
448                ObjectType::Func => "alter function owner",
449                ObjectType::ContinualTask => "alter continual task owner",
450                ObjectType::NetworkPolicy => "alter network policy owner",
451            },
452            Plan::AlterTableAddColumn(_) => "alter table add column",
453            Plan::AlterMaterializedViewApplyReplacement(_) => {
454                "alter materialized view apply replacement"
455            }
456            Plan::Declare(_) => "declare",
457            Plan::Fetch(_) => "fetch",
458            Plan::Close(_) => "close",
459            Plan::ReadThenWrite(plan) => match plan.kind {
460                MutationKind::Insert => "insert into select",
461                MutationKind::Update => "update",
462                MutationKind::Delete => "delete",
463            },
464            Plan::Prepare(_) => "prepare",
465            Plan::Execute(_) => "execute",
466            Plan::Deallocate(_) => "deallocate",
467            Plan::Raise(_) => "raise",
468            Plan::GrantRole(_) => "grant role",
469            Plan::RevokeRole(_) => "revoke role",
470            Plan::GrantPrivileges(_) => "grant privilege",
471            Plan::RevokePrivileges(_) => "revoke privilege",
472            Plan::AlterDefaultPrivileges(_) => "alter default privileges",
473            Plan::ReassignOwned(_) => "reassign owned",
474            Plan::SideEffectingFunc(_) => "side effecting func",
475            Plan::ValidateConnection(_) => "validate connection",
476            Plan::AlterRetainHistory(_) => "alter retain history",
477            Plan::AlterSourceTimestampInterval(_) => "alter source timestamp interval",
478        }
479    }
480
481    /// Returns `true` iff this `Plan` is allowed to be executed in read-only
482    /// mode.
483    ///
484    /// We use an explicit allow-list, to avoid future additions automatically
485    /// falling into the `true` category.
486    pub fn allowed_in_read_only(&self) -> bool {
487        match self {
488            // These two set non-durable session variables, so are okay in
489            // read-only mode.
490            Plan::SetVariable(_) => true,
491            Plan::ResetVariable(_) => true,
492            Plan::SetTransaction(_) => true,
493            Plan::StartTransaction(_) => true,
494            Plan::CommitTransaction(_) => true,
495            Plan::AbortTransaction(_) => true,
496            Plan::Select(_) => true,
497            Plan::EmptyQuery => true,
498            Plan::ShowAllVariables => true,
499            Plan::ShowCreate(_) => true,
500            Plan::ShowColumns(_) => true,
501            Plan::ShowVariable(_) => true,
502            Plan::InspectShard(_) => true,
503            Plan::Subscribe(_) => true,
504            Plan::CopyTo(_) => true,
505            Plan::ExplainPlan(_) => true,
506            Plan::ExplainPushdown(_) => true,
507            Plan::ExplainTimestamp(_) => true,
508            Plan::ExplainSinkSchema(_) => true,
509            Plan::ValidateConnection(_) => true,
510            _ => false,
511        }
512    }
513}
514
515#[derive(Debug)]
516pub struct StartTransactionPlan {
517    pub access: Option<TransactionAccessMode>,
518    pub isolation_level: Option<TransactionIsolationLevel>,
519}
520
521#[derive(Debug)]
522pub enum TransactionType {
523    Explicit,
524    Implicit,
525}
526
527impl TransactionType {
528    pub fn is_explicit(&self) -> bool {
529        matches!(self, TransactionType::Explicit)
530    }
531
532    pub fn is_implicit(&self) -> bool {
533        matches!(self, TransactionType::Implicit)
534    }
535}
536
537#[derive(Debug)]
538pub struct CommitTransactionPlan {
539    pub transaction_type: TransactionType,
540}
541
542#[derive(Debug)]
543pub struct AbortTransactionPlan {
544    pub transaction_type: TransactionType,
545}
546
547#[derive(Debug)]
548pub struct CreateDatabasePlan {
549    pub name: String,
550    pub if_not_exists: bool,
551}
552
553#[derive(Debug)]
554pub struct CreateSchemaPlan {
555    pub database_spec: ResolvedDatabaseSpecifier,
556    pub schema_name: String,
557    pub if_not_exists: bool,
558}
559
560#[derive(Debug)]
561pub struct CreateRolePlan {
562    pub name: String,
563    pub attributes: RoleAttributesRaw,
564}
565
566#[derive(Debug, PartialEq, Eq, Clone)]
567pub struct CreateClusterPlan {
568    pub name: String,
569    pub variant: CreateClusterVariant,
570    pub workload_class: Option<String>,
571}
572
573#[derive(Debug, PartialEq, Eq, Clone)]
574pub enum CreateClusterVariant {
575    Managed(CreateClusterManagedPlan),
576    Unmanaged(CreateClusterUnmanagedPlan),
577}
578
579#[derive(Debug, PartialEq, Eq, Clone)]
580pub struct CreateClusterUnmanagedPlan {
581    pub replicas: Vec<(String, ReplicaConfig)>,
582}
583
584#[derive(Debug, PartialEq, Eq, Clone)]
585pub struct CreateClusterManagedPlan {
586    pub replication_factor: u32,
587    pub size: String,
588    pub availability_zones: Vec<String>,
589    pub compute: ComputeReplicaConfig,
590    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
591    pub schedule: ClusterSchedule,
592}
593
594#[derive(Debug)]
595pub struct CreateClusterReplicaPlan {
596    pub cluster_id: ClusterId,
597    pub name: String,
598    pub config: ReplicaConfig,
599}
600
601/// Configuration of introspection for a cluster replica.
602#[derive(
603    Clone,
604    Copy,
605    Debug,
606    Serialize,
607    Deserialize,
608    PartialOrd,
609    Ord,
610    PartialEq,
611    Eq
612)]
613pub struct ComputeReplicaIntrospectionConfig {
614    /// Whether to introspect the introspection.
615    pub debugging: bool,
616    /// The interval at which to introspect.
617    pub interval: Duration,
618}
619
620#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
621pub struct ComputeReplicaConfig {
622    pub introspection: Option<ComputeReplicaIntrospectionConfig>,
623}
624
625#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
626pub enum ReplicaConfig {
627    Unorchestrated {
628        storagectl_addrs: Vec<String>,
629        computectl_addrs: Vec<String>,
630        compute: ComputeReplicaConfig,
631    },
632    Orchestrated {
633        size: String,
634        availability_zone: Option<String>,
635        compute: ComputeReplicaConfig,
636        internal: bool,
637        billed_as: Option<String>,
638    },
639}
640
641#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
642pub enum ClusterSchedule {
643    /// The system won't automatically turn the cluster On or Off.
644    Manual,
645    /// The cluster will be On when a REFRESH materialized view on it needs to refresh.
646    /// `hydration_time_estimate` determines how much time before a refresh to turn the
647    /// cluster On, so that it can rehydrate already before the refresh time.
648    Refresh { hydration_time_estimate: Duration },
649}
650
651impl Default for ClusterSchedule {
652    fn default() -> Self {
653        // (Has to be consistent with `impl Default for ClusterScheduleOptionValue`.)
654        ClusterSchedule::Manual
655    }
656}
657
658#[derive(Debug)]
659pub struct CreateSourcePlan {
660    pub name: QualifiedItemName,
661    pub source: Source,
662    pub if_not_exists: bool,
663    pub timeline: Timeline,
664    // None for subsources, which run on the parent cluster.
665    pub in_cluster: Option<ClusterId>,
666}
667
668#[derive(Clone, Debug, PartialEq, Eq)]
669pub struct SourceReferences {
670    pub updated_at: u64,
671    pub references: Vec<SourceReference>,
672}
673
674/// An available external reference for a source and if possible to retrieve,
675/// any column names it contains.
676#[derive(Clone, Debug, PartialEq, Eq)]
677pub struct SourceReference {
678    pub name: String,
679    pub namespace: Option<String>,
680    pub columns: Vec<String>,
681}
682
683/// A [`CreateSourcePlan`] and the metadata necessary to sequence it.
684#[derive(Debug)]
685pub struct CreateSourcePlanBundle {
686    /// ID of this source in the Catalog.
687    pub item_id: CatalogItemId,
688    /// ID used to reference this source from outside the catalog, e.g. compute.
689    pub global_id: GlobalId,
690    /// Details of the source to create.
691    pub plan: CreateSourcePlan,
692    /// Other catalog objects that are referenced by this source, determined at name resolution.
693    pub resolved_ids: ResolvedIds,
694    /// All the available upstream references for this source.
695    /// Populated for top-level sources that can contain subsources/tables
696    /// and used during sequencing to populate the appropriate catalog fields.
697    pub available_source_references: Option<SourceReferences>,
698}
699
700#[derive(Debug)]
701pub struct CreateConnectionPlan {
702    pub name: QualifiedItemName,
703    pub if_not_exists: bool,
704    pub connection: Connection,
705    pub validate: bool,
706}
707
708#[derive(Debug)]
709pub struct ValidateConnectionPlan {
710    /// ID of the connection in the Catalog.
711    pub id: CatalogItemId,
712    /// The connection to validate.
713    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
714}
715
716#[derive(Debug)]
717pub struct CreateSecretPlan {
718    pub name: QualifiedItemName,
719    pub secret: Secret,
720    pub if_not_exists: bool,
721}
722
723#[derive(Debug)]
724pub struct CreateSinkPlan {
725    pub name: QualifiedItemName,
726    pub sink: Sink,
727    pub with_snapshot: bool,
728    pub if_not_exists: bool,
729    pub in_cluster: ClusterId,
730}
731
732#[derive(Debug)]
733pub struct CreateTablePlan {
734    pub name: QualifiedItemName,
735    pub table: Table,
736    pub if_not_exists: bool,
737}
738
739#[derive(Debug, Clone)]
740pub struct CreateViewPlan {
741    pub name: QualifiedItemName,
742    pub view: View,
743    /// The Catalog objects that this view is replacing, if any.
744    pub replace: Option<CatalogItemId>,
745    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
746    pub drop_ids: Vec<CatalogItemId>,
747    pub if_not_exists: bool,
748    /// True if the view contains an expression that can make the exact column list
749    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
750    pub ambiguous_columns: bool,
751}
752
753#[derive(Debug, Clone)]
754pub struct CreateMaterializedViewPlan {
755    pub name: QualifiedItemName,
756    pub materialized_view: MaterializedView,
757    /// The Catalog objects that this materialized view is replacing, if any.
758    pub replace: Option<CatalogItemId>,
759    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
760    pub drop_ids: Vec<CatalogItemId>,
761    pub if_not_exists: bool,
762    /// True if the materialized view contains an expression that can make the exact column list
763    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
764    pub ambiguous_columns: bool,
765}
766
767#[derive(Debug, Clone)]
768pub struct CreateContinualTaskPlan {
769    pub name: QualifiedItemName,
770    /// During initial creation, the `LocalId` placeholder for this CT in `continual_task.expr`.
771    /// None on restart.
772    pub placeholder_id: Option<mz_expr::LocalId>,
773    pub desc: RelationDesc,
774    /// ID of the collection we read into this continual task.
775    pub input_id: GlobalId,
776    pub with_snapshot: bool,
777    /// Definition for the continual task.
778    pub continual_task: MaterializedView,
779}
780
781#[derive(Debug, Clone)]
782pub struct CreateNetworkPolicyPlan {
783    pub name: String,
784    pub rules: Vec<NetworkPolicyRule>,
785}
786
787#[derive(Debug, Clone)]
788pub struct AlterNetworkPolicyPlan {
789    pub id: NetworkPolicyId,
790    pub name: String,
791    pub rules: Vec<NetworkPolicyRule>,
792}
793
794#[derive(Debug, Clone)]
795pub struct CreateIndexPlan {
796    pub name: QualifiedItemName,
797    pub index: Index,
798    pub if_not_exists: bool,
799}
800
801#[derive(Debug)]
802pub struct CreateTypePlan {
803    pub name: QualifiedItemName,
804    pub typ: Type,
805}
806
807#[derive(Debug)]
808pub struct DropObjectsPlan {
809    /// The IDs of only the objects directly referenced in the `DROP` statement.
810    pub referenced_ids: Vec<ObjectId>,
811    /// All object IDs to drop. Includes `referenced_ids` and all descendants.
812    pub drop_ids: Vec<ObjectId>,
813    /// The type of object that was dropped explicitly in the DROP statement. `ids` may contain
814    /// objects of different types due to CASCADE.
815    pub object_type: ObjectType,
816}
817
818#[derive(Debug)]
819pub struct DropOwnedPlan {
820    /// The role IDs that own the objects.
821    pub role_ids: Vec<RoleId>,
822    /// All object IDs to drop.
823    pub drop_ids: Vec<ObjectId>,
824    /// The privileges to revoke.
825    pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
826    /// The default privileges to revoke.
827    pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
828}
829
830#[derive(Debug)]
831pub struct ShowVariablePlan {
832    pub name: String,
833}
834
835#[derive(Debug)]
836pub struct InspectShardPlan {
837    /// ID of the storage collection to inspect.
838    pub id: GlobalId,
839}
840
841#[derive(Debug)]
842pub struct SetVariablePlan {
843    pub name: String,
844    pub value: VariableValue,
845    pub local: bool,
846}
847
848#[derive(Debug)]
849pub enum VariableValue {
850    Default,
851    Values(Vec<String>),
852}
853
854#[derive(Debug)]
855pub struct ResetVariablePlan {
856    pub name: String,
857}
858
859#[derive(Debug)]
860pub struct SetTransactionPlan {
861    pub local: bool,
862    pub modes: Vec<TransactionMode>,
863}
864
865/// A plan for select statements.
866#[derive(Clone, Debug)]
867pub struct SelectPlan {
868    /// The `SELECT` statement itself. Used for explain/notices, but not otherwise
869    /// load-bearing. Boxed to save stack space.
870    pub select: Option<Box<SelectStatement<Aug>>>,
871    /// The plan as a HIR.
872    pub source: HirRelationExpr,
873    /// At what time should this select happen?
874    pub when: QueryWhen,
875    /// Instructions how to form the result set.
876    pub finishing: RowSetFinishing,
877    /// For `COPY TO STDOUT`, the format to use.
878    pub copy_to: Option<CopyFormat>,
879}
880
881impl SelectPlan {
882    pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
883        let arity = typ.arity();
884        SelectPlan {
885            select: None,
886            source: HirRelationExpr::Constant { rows, typ },
887            when: QueryWhen::Immediately,
888            finishing: RowSetFinishing::trivial(arity),
889            copy_to: None,
890        }
891    }
892}
893
894#[derive(Debug, Clone)]
895pub enum SubscribeOutput {
896    Diffs,
897    WithinTimestampOrderBy {
898        /// We pretend that mz_diff is prepended to the normal columns, making it index 0
899        order_by: Vec<ColumnOrder>,
900    },
901    EnvelopeUpsert {
902        /// Order by with just keys
903        order_by_keys: Vec<ColumnOrder>,
904    },
905    EnvelopeDebezium {
906        /// Order by with just keys
907        order_by_keys: Vec<ColumnOrder>,
908    },
909}
910
911impl SubscribeOutput {
912    pub fn row_order(&self) -> &[ColumnOrder] {
913        match self {
914            SubscribeOutput::Diffs => &[],
915            // This ordering prepends the diff, so its `order_by` field cannot be applied to rows.
916            SubscribeOutput::WithinTimestampOrderBy { .. } => &[],
917            SubscribeOutput::EnvelopeUpsert { order_by_keys } => order_by_keys,
918            SubscribeOutput::EnvelopeDebezium { order_by_keys } => order_by_keys,
919        }
920    }
921}
922
923#[derive(Debug, Clone)]
924pub struct SubscribePlan {
925    pub from: SubscribeFrom,
926    pub with_snapshot: bool,
927    pub when: QueryWhen,
928    pub up_to: Option<Timestamp>,
929    pub copy_to: Option<CopyFormat>,
930    pub emit_progress: bool,
931    pub output: SubscribeOutput,
932}
933
934#[derive(Debug, Clone)]
935pub enum SubscribeFrom {
936    /// ID of the collection to subscribe to.
937    Id(GlobalId),
938    /// Query to subscribe to.
939    Query {
940        expr: HirRelationExpr,
941        desc: RelationDesc,
942    },
943}
944
945impl SubscribeFrom {
946    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
947        match self {
948            SubscribeFrom::Id(id) => BTreeSet::from([*id]),
949            SubscribeFrom::Query { expr, .. } => expr.depends_on(),
950        }
951    }
952
953    pub fn contains_temporal(&self) -> bool {
954        match self {
955            SubscribeFrom::Id(_) => false,
956            SubscribeFrom::Query { expr, .. } => expr
957                .contains_temporal()
958                .expect("Unexpected error in `visit_scalars` call"),
959        }
960    }
961}
962
963#[derive(Debug)]
964pub struct ShowCreatePlan {
965    pub id: ObjectId,
966    pub row: Row,
967}
968
969#[derive(Debug)]
970pub struct ShowColumnsPlan {
971    pub id: CatalogItemId,
972    pub select_plan: SelectPlan,
973    pub new_resolved_ids: ResolvedIds,
974}
975
976#[derive(Debug)]
977pub struct CopyFromPlan {
978    /// Table we're copying into.
979    pub target_id: CatalogItemId,
980    /// Human-readable full name of the target table.
981    pub target_name: String,
982    /// Source we're copying data from.
983    pub source: CopyFromSource,
984    /// How input columns map to those on the destination table.
985    ///
986    /// TODO(cf2): Remove this field in favor of the mfp.
987    pub columns: Vec<ColumnIndex>,
988    /// [`RelationDesc`] describing the input data.
989    pub source_desc: RelationDesc,
990    /// Changes the shape of the input data to match the destination table.
991    pub mfp: MapFilterProject,
992    /// Format specific params for copying the input data.
993    pub params: CopyFormatParams<'static>,
994    /// Filter for the source files we're copying from, e.g. an S3 prefix.
995    pub filter: Option<CopyFromFilter>,
996}
997
998#[derive(Debug)]
999pub enum CopyFromSource {
1000    /// Copying from a file local to the user, transmitted via pgwire.
1001    Stdin,
1002    /// A remote resource, e.g. HTTP file.
1003    ///
1004    /// The contained [`HirScalarExpr`] evaluates to the Url for the remote resource.
1005    Url(HirScalarExpr),
1006    /// A file in an S3 bucket.
1007    AwsS3 {
1008        /// Expression that evaluates to the file we want to copy.
1009        uri: HirScalarExpr,
1010        /// Details for how we connect to AWS S3.
1011        connection: AwsConnection,
1012        /// ID of the connection object.
1013        connection_id: CatalogItemId,
1014    },
1015}
1016
1017#[derive(Debug)]
1018pub enum CopyFromFilter {
1019    Files(Vec<String>),
1020    Pattern(String),
1021}
1022
1023/// `COPY TO S3`
1024///
1025/// (This is a completely different thing from `COPY TO STDOUT`. That is a `Plan::Select` with
1026/// `copy_to` set.)
1027#[derive(Debug, Clone)]
1028pub struct CopyToPlan {
1029    /// The select query plan whose data will be copied to destination uri.
1030    pub select_plan: SelectPlan,
1031    pub desc: RelationDesc,
1032    /// The scalar expression to be resolved to get the destination uri.
1033    pub to: HirScalarExpr,
1034    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
1035    /// The ID of the connection.
1036    pub connection_id: CatalogItemId,
1037    pub format: S3SinkFormat,
1038    pub max_file_size: u64,
1039}
1040
1041#[derive(Clone, Debug)]
1042pub struct ExplainPlanPlan {
1043    pub stage: ExplainStage,
1044    pub format: ExplainFormat,
1045    pub config: ExplainConfig,
1046    pub explainee: Explainee,
1047}
1048
1049/// The type of object to be explained
1050#[derive(Clone, Debug)]
1051pub enum Explainee {
1052    /// Lookup and explain a plan saved for an view.
1053    View(CatalogItemId),
1054    /// Lookup and explain a plan saved for an existing materialized view.
1055    MaterializedView(CatalogItemId),
1056    /// Lookup and explain a plan saved for an existing index.
1057    Index(CatalogItemId),
1058    /// Replan an existing view.
1059    ReplanView(CatalogItemId),
1060    /// Replan an existing materialized view.
1061    ReplanMaterializedView(CatalogItemId),
1062    /// Replan an existing index.
1063    ReplanIndex(CatalogItemId),
1064    /// A SQL statement.
1065    Statement(ExplaineeStatement),
1066}
1067
1068/// Explainee types that are statements.
1069#[derive(Clone, Debug, EnumKind)]
1070#[enum_kind(ExplaineeStatementKind)]
1071pub enum ExplaineeStatement {
1072    /// The object to be explained is a SELECT statement.
1073    Select {
1074        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1075        broken: bool,
1076        plan: plan::SelectPlan,
1077        desc: RelationDesc,
1078    },
1079    /// The object to be explained is a CREATE VIEW.
1080    CreateView {
1081        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1082        broken: bool,
1083        plan: plan::CreateViewPlan,
1084    },
1085    /// The object to be explained is a CREATE MATERIALIZED VIEW.
1086    CreateMaterializedView {
1087        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1088        broken: bool,
1089        plan: plan::CreateMaterializedViewPlan,
1090    },
1091    /// The object to be explained is a CREATE INDEX.
1092    CreateIndex {
1093        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1094        broken: bool,
1095        plan: plan::CreateIndexPlan,
1096    },
1097    /// The object to be explained is a SUBSCRIBE statement.
1098    Subscribe {
1099        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1100        broken: bool,
1101        plan: plan::SubscribePlan,
1102    },
1103}
1104
1105impl ExplaineeStatement {
1106    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1107        match self {
1108            Self::Select { plan, .. } => plan.source.depends_on(),
1109            Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1110            Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1111            Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1112            Self::Subscribe { plan, .. } => plan.from.depends_on(),
1113        }
1114    }
1115
1116    /// Statements that have their `broken` flag set are expected to cause a
1117    /// panic in the optimizer code. In this case:
1118    ///
1119    /// 1. The optimizer pipeline execution will stop, but the panic will be
1120    ///    intercepted and will not propagate to the caller. The partial
1121    ///    optimizer trace collected until this point will be available.
1122    /// 2. The optimizer trace tracing subscriber will delegate regular tracing
1123    ///    spans and events to the default subscriber.
1124    ///
1125    /// This is useful when debugging queries that cause panics.
1126    pub fn broken(&self) -> bool {
1127        match self {
1128            Self::Select { broken, .. } => *broken,
1129            Self::CreateView { broken, .. } => *broken,
1130            Self::CreateMaterializedView { broken, .. } => *broken,
1131            Self::CreateIndex { broken, .. } => *broken,
1132            Self::Subscribe { broken, .. } => *broken,
1133        }
1134    }
1135}
1136
1137impl ExplaineeStatementKind {
1138    pub fn supports(&self, stage: &ExplainStage) -> bool {
1139        use ExplainStage::*;
1140        match self {
1141            Self::Select => true,
1142            Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1143            Self::CreateMaterializedView => true,
1144            Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1145            // SUBSCRIBE doesn't support RAW, DECORRELATED, or LOCAL stages because
1146            // it takes MIR directly rather than going through HIR lowering.
1147            Self::Subscribe => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1148        }
1149    }
1150}
1151
1152impl std::fmt::Display for ExplaineeStatementKind {
1153    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1154        match self {
1155            Self::Select => write!(f, "SELECT"),
1156            Self::CreateView => write!(f, "CREATE VIEW"),
1157            Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1158            Self::CreateIndex => write!(f, "CREATE INDEX"),
1159            Self::Subscribe => write!(f, "SUBSCRIBE"),
1160        }
1161    }
1162}
1163
1164#[derive(Clone, Debug)]
1165pub struct ExplainPushdownPlan {
1166    pub explainee: Explainee,
1167}
1168
1169#[derive(Clone, Debug)]
1170pub struct ExplainTimestampPlan {
1171    pub format: ExplainFormat,
1172    pub raw_plan: HirRelationExpr,
1173    pub when: QueryWhen,
1174}
1175
1176#[derive(Debug)]
1177pub struct ExplainSinkSchemaPlan {
1178    pub sink_from: GlobalId,
1179    pub json_schema: String,
1180}
1181
1182#[derive(Debug)]
1183pub struct SendDiffsPlan {
1184    pub id: CatalogItemId,
1185    pub updates: Vec<(Row, Diff)>,
1186    pub kind: MutationKind,
1187    pub returning: Vec<(Row, NonZeroUsize)>,
1188    pub max_result_size: u64,
1189}
1190
1191#[derive(Debug)]
1192pub struct InsertPlan {
1193    pub id: CatalogItemId,
1194    pub values: HirRelationExpr,
1195    pub returning: Vec<mz_expr::MirScalarExpr>,
1196}
1197
1198#[derive(Debug)]
1199pub struct ReadThenWritePlan {
1200    pub id: CatalogItemId,
1201    pub selection: HirRelationExpr,
1202    pub finishing: RowSetFinishing,
1203    pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1204    pub kind: MutationKind,
1205    pub returning: Vec<mz_expr::MirScalarExpr>,
1206}
1207
1208/// Generated by `ALTER ... IF EXISTS` if the named object did not exist.
1209#[derive(Debug)]
1210pub struct AlterNoopPlan {
1211    pub object_type: ObjectType,
1212}
1213
1214#[derive(Debug)]
1215pub struct AlterSetClusterPlan {
1216    pub id: CatalogItemId,
1217    pub set_cluster: ClusterId,
1218}
1219
1220#[derive(Debug)]
1221pub struct AlterRetainHistoryPlan {
1222    pub id: CatalogItemId,
1223    pub value: Option<Value>,
1224    pub window: CompactionWindow,
1225    pub object_type: ObjectType,
1226}
1227
1228#[derive(Debug)]
1229pub struct AlterSourceTimestampIntervalPlan {
1230    pub id: CatalogItemId,
1231    pub value: Option<Value>,
1232    pub interval: Duration,
1233}
1234
1235#[derive(Debug, Clone)]
1236
1237pub enum AlterOptionParameter<T = String> {
1238    Set(T),
1239    Reset,
1240    Unchanged,
1241}
1242
1243#[derive(Debug)]
1244pub enum AlterConnectionAction {
1245    RotateKeys,
1246    AlterOptions {
1247        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1248        drop_options: BTreeSet<ConnectionOptionName>,
1249        validate: bool,
1250    },
1251}
1252
1253#[derive(Debug)]
1254pub struct AlterConnectionPlan {
1255    pub id: CatalogItemId,
1256    pub action: AlterConnectionAction,
1257}
1258
1259#[derive(Debug)]
1260pub enum AlterSourceAction {
1261    AddSubsourceExports {
1262        subsources: Vec<CreateSourcePlanBundle>,
1263        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1264    },
1265    RefreshReferences {
1266        references: SourceReferences,
1267    },
1268}
1269
1270#[derive(Debug)]
1271pub struct AlterSourcePlan {
1272    pub item_id: CatalogItemId,
1273    pub ingestion_id: GlobalId,
1274    pub action: AlterSourceAction,
1275}
1276
1277#[derive(Debug, Clone)]
1278pub struct AlterSinkPlan {
1279    pub item_id: CatalogItemId,
1280    pub global_id: GlobalId,
1281    pub sink: Sink,
1282    pub with_snapshot: bool,
1283    pub in_cluster: ClusterId,
1284}
1285
1286#[derive(Debug, Clone)]
1287pub struct AlterClusterPlan {
1288    pub id: ClusterId,
1289    pub name: String,
1290    pub options: PlanClusterOption,
1291    pub strategy: AlterClusterPlanStrategy,
1292}
1293
1294#[derive(Debug)]
1295pub struct AlterClusterRenamePlan {
1296    pub id: ClusterId,
1297    pub name: String,
1298    pub to_name: String,
1299}
1300
1301#[derive(Debug)]
1302pub struct AlterClusterReplicaRenamePlan {
1303    pub cluster_id: ClusterId,
1304    pub replica_id: ReplicaId,
1305    pub name: QualifiedReplica,
1306    pub to_name: String,
1307}
1308
1309#[derive(Debug)]
1310pub struct AlterItemRenamePlan {
1311    pub id: CatalogItemId,
1312    pub current_full_name: FullItemName,
1313    pub to_name: String,
1314    pub object_type: ObjectType,
1315}
1316
1317#[derive(Debug)]
1318pub struct AlterSchemaRenamePlan {
1319    pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1320    pub new_schema_name: String,
1321}
1322
1323#[derive(Debug)]
1324pub struct AlterSchemaSwapPlan {
1325    pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1326    pub schema_a_name: String,
1327    pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1328    pub schema_b_name: String,
1329    pub name_temp: String,
1330}
1331
1332#[derive(Debug)]
1333pub struct AlterClusterSwapPlan {
1334    pub id_a: ClusterId,
1335    pub id_b: ClusterId,
1336    pub name_a: String,
1337    pub name_b: String,
1338    pub name_temp: String,
1339}
1340
1341#[derive(Debug)]
1342pub struct AlterSecretPlan {
1343    pub id: CatalogItemId,
1344    pub secret_as: MirScalarExpr,
1345}
1346
1347#[derive(Debug)]
1348pub struct AlterSystemSetPlan {
1349    pub name: String,
1350    pub value: VariableValue,
1351}
1352
1353#[derive(Debug)]
1354pub struct AlterSystemResetPlan {
1355    pub name: String,
1356}
1357
1358#[derive(Debug)]
1359pub struct AlterSystemResetAllPlan {}
1360
1361#[derive(Debug)]
1362pub struct AlterRolePlan {
1363    pub id: RoleId,
1364    pub name: String,
1365    pub option: PlannedAlterRoleOption,
1366}
1367
1368#[derive(Debug)]
1369pub struct AlterOwnerPlan {
1370    pub id: ObjectId,
1371    pub object_type: ObjectType,
1372    pub new_owner: RoleId,
1373}
1374
1375#[derive(Debug)]
1376pub struct AlterTablePlan {
1377    pub relation_id: CatalogItemId,
1378    pub column_name: ColumnName,
1379    pub column_type: SqlColumnType,
1380    pub raw_sql_type: RawDataType,
1381}
1382
1383#[derive(Debug, Clone)]
1384pub struct AlterMaterializedViewApplyReplacementPlan {
1385    pub id: CatalogItemId,
1386    pub replacement_id: CatalogItemId,
1387}
1388
1389#[derive(Debug)]
1390pub struct DeclarePlan {
1391    pub name: String,
1392    pub stmt: Statement<Raw>,
1393    pub sql: String,
1394    pub params: Params,
1395}
1396
1397#[derive(Debug)]
1398pub struct FetchPlan {
1399    pub name: String,
1400    pub count: Option<FetchDirection>,
1401    pub timeout: ExecuteTimeout,
1402}
1403
1404#[derive(Debug)]
1405pub struct ClosePlan {
1406    pub name: String,
1407}
1408
1409#[derive(Debug)]
1410pub struct PreparePlan {
1411    pub name: String,
1412    pub stmt: Statement<Raw>,
1413    pub sql: String,
1414    pub desc: StatementDesc,
1415}
1416
1417#[derive(Debug)]
1418pub struct ExecutePlan {
1419    pub name: String,
1420    pub params: Params,
1421}
1422
1423#[derive(Debug)]
1424pub struct DeallocatePlan {
1425    pub name: Option<String>,
1426}
1427
1428#[derive(Debug)]
1429pub struct RaisePlan {
1430    pub severity: NoticeSeverity,
1431}
1432
1433#[derive(Debug)]
1434pub struct GrantRolePlan {
1435    /// The roles that are gaining members.
1436    pub role_ids: Vec<RoleId>,
1437    /// The roles that will be added to `role_id`.
1438    pub member_ids: Vec<RoleId>,
1439    /// The role that granted the membership.
1440    pub grantor_id: RoleId,
1441}
1442
1443#[derive(Debug)]
1444pub struct RevokeRolePlan {
1445    /// The roles that are losing members.
1446    pub role_ids: Vec<RoleId>,
1447    /// The roles that will be removed from `role_id`.
1448    pub member_ids: Vec<RoleId>,
1449    /// The role that revoked the membership.
1450    pub grantor_id: RoleId,
1451}
1452
1453#[derive(Debug)]
1454pub struct UpdatePrivilege {
1455    /// The privileges being granted/revoked on an object.
1456    pub acl_mode: AclMode,
1457    /// The ID of the object receiving privileges.
1458    pub target_id: SystemObjectId,
1459    /// The role that is granting the privileges.
1460    pub grantor: RoleId,
1461}
1462
1463#[derive(Debug)]
1464pub struct GrantPrivilegesPlan {
1465    /// Description of each privilege being granted.
1466    pub update_privileges: Vec<UpdatePrivilege>,
1467    /// The roles that will granted the privileges.
1468    pub grantees: Vec<RoleId>,
1469}
1470
1471#[derive(Debug)]
1472pub struct RevokePrivilegesPlan {
1473    /// Description of each privilege being revoked.
1474    pub update_privileges: Vec<UpdatePrivilege>,
1475    /// The roles that will have privileges revoked.
1476    pub revokees: Vec<RoleId>,
1477}
1478#[derive(Debug)]
1479pub struct AlterDefaultPrivilegesPlan {
1480    /// Description of objects that match this default privilege.
1481    pub privilege_objects: Vec<DefaultPrivilegeObject>,
1482    /// The privilege to be granted/revoked from the matching objects.
1483    pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1484    /// Whether this is a grant or revoke.
1485    pub is_grant: bool,
1486}
1487
1488#[derive(Debug)]
1489pub struct ReassignOwnedPlan {
1490    /// The roles whose owned objects are being reassigned.
1491    pub old_roles: Vec<RoleId>,
1492    /// The new owner of the objects.
1493    pub new_role: RoleId,
1494    /// All object IDs to reassign.
1495    pub reassign_ids: Vec<ObjectId>,
1496}
1497
1498#[derive(Debug)]
1499pub struct CommentPlan {
1500    /// The object that this comment is associated with.
1501    pub object_id: CommentObjectId,
1502    /// A sub-component of the object that this comment is associated with, e.g. a column.
1503    ///
1504    /// TODO(parkmycar): <https://github.com/MaterializeInc/database-issues/issues/6711>.
1505    pub sub_component: Option<usize>,
1506    /// The comment itself. If `None` that indicates we should clear the existing comment.
1507    pub comment: Option<String>,
1508}
1509
1510#[derive(Clone, Debug)]
1511pub enum TableDataSource {
1512    /// The table owns data created via INSERT/UPDATE/DELETE statements.
1513    TableWrites { defaults: Vec<Expr<Aug>> },
1514
1515    /// The table receives its data from the identified `DataSourceDesc`.
1516    /// This table type does not support INSERT/UPDATE/DELETE statements.
1517    DataSource {
1518        desc: DataSourceDesc,
1519        timeline: Timeline,
1520    },
1521}
1522
1523#[derive(Clone, Debug)]
1524pub struct Table {
1525    pub create_sql: String,
1526    pub desc: VersionedRelationDesc,
1527    pub temporary: bool,
1528    pub compaction_window: Option<CompactionWindow>,
1529    pub data_source: TableDataSource,
1530}
1531
1532#[derive(Clone, Debug)]
1533pub struct Source {
1534    pub create_sql: String,
1535    pub data_source: DataSourceDesc,
1536    pub desc: RelationDesc,
1537    pub compaction_window: Option<CompactionWindow>,
1538}
1539
1540#[derive(Debug, Clone)]
1541pub enum DataSourceDesc {
1542    /// Receives data from an external system.
1543    Ingestion(SourceDesc<ReferencedConnection>),
1544    /// Receives data from an external system.
1545    OldSyntaxIngestion {
1546        desc: SourceDesc<ReferencedConnection>,
1547        // If we're dealing with an old syntax ingestion the progress id will be some other collection
1548        // and the ingestion itself will have the data from a default external reference
1549        progress_subsource: CatalogItemId,
1550        data_config: SourceExportDataConfig<ReferencedConnection>,
1551        details: SourceExportDetails,
1552    },
1553    /// This source receives its data from the identified ingestion,
1554    /// specifically the output identified by `external_reference`.
1555    IngestionExport {
1556        ingestion_id: CatalogItemId,
1557        external_reference: UnresolvedItemName,
1558        details: SourceExportDetails,
1559        data_config: SourceExportDataConfig<ReferencedConnection>,
1560    },
1561    /// Receives data from the source's reclocking/remapping operations.
1562    Progress,
1563    /// Receives data from HTTP post requests.
1564    Webhook {
1565        validate_using: Option<WebhookValidation>,
1566        body_format: WebhookBodyFormat,
1567        headers: WebhookHeaders,
1568        /// Only `Some` when created via `CREATE TABLE ... FROM WEBHOOK`.
1569        cluster_id: Option<StorageInstanceId>,
1570    },
1571}
1572
1573#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1574pub struct WebhookValidation {
1575    /// The expression used to validate a request.
1576    pub expression: MirScalarExpr,
1577    /// Description of the source that will be created.
1578    pub relation_desc: RelationDesc,
1579    /// The column index to provide the request body and whether to provide it as bytes.
1580    pub bodies: Vec<(usize, bool)>,
1581    /// The column index to provide the request headers and whether to provide the values as bytes.
1582    pub headers: Vec<(usize, bool)>,
1583    /// Any secrets that are used in that validation.
1584    pub secrets: Vec<WebhookValidationSecret>,
1585}
1586
1587impl WebhookValidation {
1588    const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1589
1590    /// Attempt to reduce the internal [`MirScalarExpr`] into a simpler expression.
1591    ///
1592    /// The reduction happens on a separate thread, we also only wait for
1593    /// `WebhookValidation::MAX_REDUCE_TIME` before timing out and returning an error.
1594    pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1595        let WebhookValidation {
1596            expression,
1597            relation_desc,
1598            ..
1599        } = self;
1600
1601        // On a different thread, attempt to reduce the expression.
1602        let mut expression_ = expression.clone();
1603        let desc_ = relation_desc.clone();
1604        let reduce_task = mz_ore::task::spawn_blocking(
1605            || "webhook-validation-reduce",
1606            move || {
1607                let repr_col_types: Vec<ReprColumnType> = desc_
1608                    .typ()
1609                    .column_types
1610                    .iter()
1611                    .map(ReprColumnType::from)
1612                    .collect();
1613                expression_.reduce(&repr_col_types);
1614                expression_
1615            },
1616        );
1617
1618        match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1619            Ok(reduced_expr) => {
1620                *expression = reduced_expr;
1621                Ok(())
1622            }
1623            Err(_) => Err("timeout"),
1624        }
1625    }
1626}
1627
1628#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1629pub struct WebhookHeaders {
1630    /// Optionally include a column named `headers` whose content is possibly filtered.
1631    pub header_column: Option<WebhookHeaderFilters>,
1632    /// The column index to provide the specific request header, and whether to provide it as bytes.
1633    pub mapped_headers: BTreeMap<usize, (String, bool)>,
1634}
1635
1636impl WebhookHeaders {
1637    /// Returns the number of columns needed to represent our headers.
1638    pub fn num_columns(&self) -> usize {
1639        let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1640        let mapped_headers = self.mapped_headers.len();
1641
1642        header_column + mapped_headers
1643    }
1644}
1645
1646#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1647pub struct WebhookHeaderFilters {
1648    pub block: BTreeSet<String>,
1649    pub allow: BTreeSet<String>,
1650}
1651
1652#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Arbitrary)]
1653pub enum WebhookBodyFormat {
1654    Json { array: bool },
1655    Bytes,
1656    Text,
1657}
1658
1659impl From<WebhookBodyFormat> for SqlScalarType {
1660    fn from(value: WebhookBodyFormat) -> Self {
1661        match value {
1662            WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1663            WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1664            WebhookBodyFormat::Text => SqlScalarType::String,
1665        }
1666    }
1667}
1668
1669#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1670pub struct WebhookValidationSecret {
1671    /// Identifies the secret by [`CatalogItemId`].
1672    pub id: CatalogItemId,
1673    /// Column index for the expression context that this secret was originally evaluated in.
1674    pub column_idx: usize,
1675    /// Whether or not this secret should be provided to the expression as Bytes or a String.
1676    pub use_bytes: bool,
1677}
1678
1679#[derive(Clone, Debug)]
1680pub struct Connection {
1681    pub create_sql: String,
1682    pub details: ConnectionDetails,
1683}
1684
1685#[derive(Clone, Debug, Serialize)]
1686pub enum ConnectionDetails {
1687    Kafka(KafkaConnection<ReferencedConnection>),
1688    Csr(CsrConnection<ReferencedConnection>),
1689    Postgres(PostgresConnection<ReferencedConnection>),
1690    Ssh {
1691        connection: SshConnection,
1692        key_1: SshKey,
1693        key_2: SshKey,
1694    },
1695    Aws(AwsConnection),
1696    AwsPrivatelink(AwsPrivatelinkConnection),
1697    MySql(MySqlConnection<ReferencedConnection>),
1698    SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1699    IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1700}
1701
1702impl ConnectionDetails {
1703    pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1704        match self {
1705            ConnectionDetails::Kafka(c) => {
1706                mz_storage_types::connections::Connection::Kafka(c.clone())
1707            }
1708            ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1709            ConnectionDetails::Postgres(c) => {
1710                mz_storage_types::connections::Connection::Postgres(c.clone())
1711            }
1712            ConnectionDetails::Ssh { connection, .. } => {
1713                mz_storage_types::connections::Connection::Ssh(connection.clone())
1714            }
1715            ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1716            ConnectionDetails::AwsPrivatelink(c) => {
1717                mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1718            }
1719            ConnectionDetails::MySql(c) => {
1720                mz_storage_types::connections::Connection::MySql(c.clone())
1721            }
1722            ConnectionDetails::SqlServer(c) => {
1723                mz_storage_types::connections::Connection::SqlServer(c.clone())
1724            }
1725            ConnectionDetails::IcebergCatalog(c) => {
1726                mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1727            }
1728        }
1729    }
1730}
1731
1732#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1733pub struct NetworkPolicyRule {
1734    pub name: String,
1735    pub action: NetworkPolicyRuleAction,
1736    pub address: PolicyAddress,
1737    pub direction: NetworkPolicyRuleDirection,
1738}
1739
1740#[derive(
1741    Debug,
1742    Clone,
1743    Serialize,
1744    Deserialize,
1745    PartialEq,
1746    Eq,
1747    Ord,
1748    PartialOrd,
1749    Hash
1750)]
1751pub enum NetworkPolicyRuleAction {
1752    Allow,
1753}
1754
1755impl std::fmt::Display for NetworkPolicyRuleAction {
1756    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1757        match self {
1758            Self::Allow => write!(f, "allow"),
1759        }
1760    }
1761}
1762impl TryFrom<&str> for NetworkPolicyRuleAction {
1763    type Error = PlanError;
1764    fn try_from(value: &str) -> Result<Self, Self::Error> {
1765        match value.to_uppercase().as_str() {
1766            "ALLOW" => Ok(Self::Allow),
1767            _ => Err(PlanError::Unstructured(
1768                "Allow is the only valid option".into(),
1769            )),
1770        }
1771    }
1772}
1773
1774#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1775pub enum NetworkPolicyRuleDirection {
1776    Ingress,
1777}
1778impl std::fmt::Display for NetworkPolicyRuleDirection {
1779    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1780        match self {
1781            Self::Ingress => write!(f, "ingress"),
1782        }
1783    }
1784}
1785impl TryFrom<&str> for NetworkPolicyRuleDirection {
1786    type Error = PlanError;
1787    fn try_from(value: &str) -> Result<Self, Self::Error> {
1788        match value.to_uppercase().as_str() {
1789            "INGRESS" => Ok(Self::Ingress),
1790            _ => Err(PlanError::Unstructured(
1791                "Ingress is the only valid option".into(),
1792            )),
1793        }
1794    }
1795}
1796
1797#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1798pub struct PolicyAddress(pub IpNet);
1799impl std::fmt::Display for PolicyAddress {
1800    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1801        write!(f, "{}", &self.0.to_string())
1802    }
1803}
1804impl From<String> for PolicyAddress {
1805    fn from(value: String) -> Self {
1806        Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1807    }
1808}
1809impl TryFrom<&str> for PolicyAddress {
1810    type Error = PlanError;
1811    fn try_from(value: &str) -> Result<Self, Self::Error> {
1812        let net = IpNet::from_str(value)
1813            .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1814        Ok(Self(net))
1815    }
1816}
1817
1818impl Serialize for PolicyAddress {
1819    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1820    where
1821        S: serde::Serializer,
1822    {
1823        serializer.serialize_str(&format!("{}", &self.0))
1824    }
1825}
1826
1827#[derive(Clone, Debug, Serialize)]
1828pub enum SshKey {
1829    PublicOnly(String),
1830    Both(SshKeyPair),
1831}
1832
1833impl SshKey {
1834    pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1835        match self {
1836            SshKey::PublicOnly(_) => None,
1837            SshKey::Both(key_pair) => Some(key_pair),
1838        }
1839    }
1840
1841    pub fn public_key(&self) -> String {
1842        match self {
1843            SshKey::PublicOnly(s) => s.into(),
1844            SshKey::Both(p) => p.ssh_public_key(),
1845        }
1846    }
1847}
1848
1849#[derive(Clone, Debug)]
1850pub struct Secret {
1851    pub create_sql: String,
1852    pub secret_as: MirScalarExpr,
1853}
1854
1855#[derive(Clone, Debug)]
1856pub struct Sink {
1857    /// Parse-able SQL that is stored durably and defines this sink.
1858    pub create_sql: String,
1859    /// Collection we read into this sink.
1860    pub from: GlobalId,
1861    /// Type of connection to the external service we sink into.
1862    pub connection: StorageSinkConnection<ReferencedConnection>,
1863    // TODO(guswynn): this probably should just be in the `connection`.
1864    pub envelope: SinkEnvelope,
1865    pub version: u64,
1866    pub commit_interval: Option<Duration>,
1867}
1868
1869#[derive(Clone, Debug)]
1870pub struct View {
1871    /// Parse-able SQL that is stored durably and defines this view.
1872    pub create_sql: String,
1873    /// Unoptimized high-level expression from parsing the `create_sql`.
1874    pub expr: HirRelationExpr,
1875    /// All of the catalog objects that are referenced by this view, according to the `expr`.
1876    pub dependencies: DependencyIds,
1877    /// Columns of this view.
1878    pub column_names: Vec<ColumnName>,
1879    /// If this view is created in the temporary schema, e.g. `CREATE TEMPORARY ...`.
1880    pub temporary: bool,
1881}
1882
1883#[derive(Clone, Debug)]
1884pub struct MaterializedView {
1885    /// Parse-able SQL that is stored durably and defines this materialized view.
1886    pub create_sql: String,
1887    /// Unoptimized high-level expression from parsing the `create_sql`.
1888    pub expr: HirRelationExpr,
1889    /// All of the catalog objects that are referenced by this materialized view, according to the `expr`.
1890    pub dependencies: DependencyIds,
1891    /// Columns of this view.
1892    pub column_names: Vec<ColumnName>,
1893    pub replacement_target: Option<CatalogItemId>,
1894    /// Cluster this materialized view will get installed on.
1895    pub cluster_id: ClusterId,
1896    /// If set, only install this materialized view's dataflow on the specified replica.
1897    pub target_replica: Option<ReplicaId>,
1898    pub non_null_assertions: Vec<usize>,
1899    pub compaction_window: Option<CompactionWindow>,
1900    pub refresh_schedule: Option<RefreshSchedule>,
1901    pub as_of: Option<Timestamp>,
1902}
1903
1904#[derive(Clone, Debug)]
1905pub struct Index {
1906    /// Parse-able SQL that is stored durably and defines this index.
1907    pub create_sql: String,
1908    /// Collection this index is on top of.
1909    pub on: GlobalId,
1910    pub keys: Vec<mz_expr::MirScalarExpr>,
1911    pub compaction_window: Option<CompactionWindow>,
1912    pub cluster_id: ClusterId,
1913}
1914
1915#[derive(Clone, Debug)]
1916pub struct Type {
1917    pub create_sql: String,
1918    pub inner: CatalogType<IdReference>,
1919}
1920
1921/// Specifies when a `Peek` or `Subscribe` should occur.
1922#[derive(Deserialize, Clone, Debug, PartialEq)]
1923pub enum QueryWhen {
1924    /// The peek should occur at the latest possible timestamp that allows the
1925    /// peek to complete immediately.
1926    Immediately,
1927    /// The peek should occur at a timestamp that allows the peek to see all
1928    /// data written to tables within Materialize.
1929    FreshestTableWrite,
1930    /// The peek should occur at the timestamp described by the specified
1931    /// expression.
1932    ///
1933    /// The expression may have any type.
1934    AtTimestamp(Timestamp),
1935    /// Same as Immediately, but will also advance to at least the specified
1936    /// expression.
1937    AtLeastTimestamp(Timestamp),
1938}
1939
1940impl QueryWhen {
1941    /// Returns a timestamp to which the candidate must be advanced.
1942    pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1943        match self {
1944            QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1945            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1946        }
1947    }
1948    /// Returns whether the candidate's upper bound is constrained.
1949    /// This is only true for `AtTimestamp` since it is the only variant that
1950    /// specifies a timestamp.
1951    pub fn constrains_upper(&self) -> bool {
1952        match self {
1953            QueryWhen::AtTimestamp(_) => true,
1954            QueryWhen::AtLeastTimestamp(_)
1955            | QueryWhen::Immediately
1956            | QueryWhen::FreshestTableWrite => false,
1957        }
1958    }
1959    /// Returns whether the candidate must be advanced to the since.
1960    pub fn advance_to_since(&self) -> bool {
1961        match self {
1962            QueryWhen::Immediately
1963            | QueryWhen::AtLeastTimestamp(_)
1964            | QueryWhen::FreshestTableWrite => true,
1965            QueryWhen::AtTimestamp(_) => false,
1966        }
1967    }
1968    /// Returns whether the candidate can be advanced to the upper.
1969    pub fn can_advance_to_upper(&self) -> bool {
1970        match self {
1971            QueryWhen::Immediately => true,
1972            QueryWhen::FreshestTableWrite
1973            | QueryWhen::AtTimestamp(_)
1974            | QueryWhen::AtLeastTimestamp(_) => false,
1975        }
1976    }
1977
1978    /// Returns whether the candidate can be advanced to the timeline's timestamp.
1979    pub fn can_advance_to_timeline_ts(&self) -> bool {
1980        match self {
1981            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1982            QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1983        }
1984    }
1985    /// Returns whether the candidate must be advanced to the timeline's timestamp.
1986    pub fn must_advance_to_timeline_ts(&self) -> bool {
1987        match self {
1988            QueryWhen::FreshestTableWrite => true,
1989            QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1990                false
1991            }
1992        }
1993    }
1994    /// Returns whether the selected timestamp should be tracked within the current transaction.
1995    pub fn is_transactional(&self) -> bool {
1996        match self {
1997            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1998            QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1999        }
2000    }
2001}
2002
2003#[derive(Debug, Copy, Clone)]
2004pub enum MutationKind {
2005    Insert,
2006    Update,
2007    Delete,
2008}
2009
2010#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
2011pub enum CopyFormat {
2012    Text,
2013    Csv,
2014    Binary,
2015    Parquet,
2016}
2017
2018#[derive(Debug, Copy, Clone)]
2019pub enum ExecuteTimeout {
2020    None,
2021    Seconds(f64),
2022    WaitOnce,
2023}
2024
2025#[derive(Clone, Debug)]
2026pub enum IndexOption {
2027    /// Configures the logical compaction window for an index.
2028    RetainHistory(CompactionWindow),
2029}
2030
2031#[derive(Clone, Debug)]
2032pub enum TableOption {
2033    /// Configures the logical compaction window for a table.
2034    RetainHistory(CompactionWindow),
2035}
2036
2037#[derive(Clone, Debug)]
2038pub struct PlanClusterOption {
2039    pub availability_zones: AlterOptionParameter<Vec<String>>,
2040    pub introspection_debugging: AlterOptionParameter<bool>,
2041    pub introspection_interval: AlterOptionParameter<OptionalDuration>,
2042    pub managed: AlterOptionParameter<bool>,
2043    pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
2044    pub replication_factor: AlterOptionParameter<u32>,
2045    pub size: AlterOptionParameter,
2046    pub schedule: AlterOptionParameter<ClusterSchedule>,
2047    pub workload_class: AlterOptionParameter<Option<String>>,
2048}
2049
2050impl Default for PlanClusterOption {
2051    fn default() -> Self {
2052        Self {
2053            availability_zones: AlterOptionParameter::Unchanged,
2054            introspection_debugging: AlterOptionParameter::Unchanged,
2055            introspection_interval: AlterOptionParameter::Unchanged,
2056            managed: AlterOptionParameter::Unchanged,
2057            replicas: AlterOptionParameter::Unchanged,
2058            replication_factor: AlterOptionParameter::Unchanged,
2059            size: AlterOptionParameter::Unchanged,
2060            schedule: AlterOptionParameter::Unchanged,
2061            workload_class: AlterOptionParameter::Unchanged,
2062        }
2063    }
2064}
2065
2066#[derive(Clone, Debug, PartialEq, Eq)]
2067pub enum AlterClusterPlanStrategy {
2068    None,
2069    For(Duration),
2070    UntilReady {
2071        on_timeout: OnTimeoutAction,
2072        timeout: Duration,
2073    },
2074}
2075
2076#[derive(Clone, Debug, PartialEq, Eq)]
2077pub enum OnTimeoutAction {
2078    Commit,
2079    Rollback,
2080}
2081
2082impl Default for OnTimeoutAction {
2083    fn default() -> Self {
2084        Self::Commit
2085    }
2086}
2087
2088impl TryFrom<&str> for OnTimeoutAction {
2089    type Error = PlanError;
2090    fn try_from(value: &str) -> Result<Self, Self::Error> {
2091        match value.to_uppercase().as_str() {
2092            "COMMIT" => Ok(Self::Commit),
2093            "ROLLBACK" => Ok(Self::Rollback),
2094            _ => Err(PlanError::Unstructured(
2095                "Valid options are COMMIT, ROLLBACK".into(),
2096            )),
2097        }
2098    }
2099}
2100
2101impl AlterClusterPlanStrategy {
2102    pub fn is_none(&self) -> bool {
2103        matches!(self, Self::None)
2104    }
2105    pub fn is_some(&self) -> bool {
2106        !matches!(self, Self::None)
2107    }
2108}
2109
2110impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2111    type Error = PlanError;
2112
2113    fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2114        Ok(match value.wait {
2115            Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2116            Some(ClusterAlterOptionValue::UntilReady(options)) => {
2117                let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2118                Self::UntilReady {
2119                    timeout: match extracted.timeout {
2120                        Some(d) => d,
2121                        None => Err(PlanError::UntilReadyTimeoutRequired)?,
2122                    },
2123                    on_timeout: match extracted.on_timeout {
2124                        Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2125                            PlanError::InvalidOptionValue {
2126                                option_name: "ON TIMEOUT".into(),
2127                                err: Box::new(e),
2128                            }
2129                        })?,
2130                        None => OnTimeoutAction::default(),
2131                    },
2132                }
2133            }
2134            None => Self::None,
2135        })
2136    }
2137}
2138
2139/// A vector of values to which parameter references should be bound.
2140#[derive(Debug, Clone)]
2141pub struct Params {
2142    /// The datums that were provided in the EXECUTE statement.
2143    pub datums: Row,
2144    /// The types of the datums provided in the EXECUTE statement.
2145    pub execute_types: Vec<SqlScalarType>,
2146    /// The types that the prepared statement expects based on its definition.
2147    pub expected_types: Vec<SqlScalarType>,
2148}
2149
2150impl Params {
2151    /// Returns a `Params` with no parameters.
2152    pub fn empty() -> Params {
2153        Params {
2154            datums: Row::pack_slice(&[]),
2155            execute_types: vec![],
2156            expected_types: vec![],
2157        }
2158    }
2159}
2160
2161/// Controls planning of a SQL query.
2162#[derive(
2163    Ord,
2164    PartialOrd,
2165    Clone,
2166    Debug,
2167    Eq,
2168    PartialEq,
2169    Serialize,
2170    Deserialize,
2171    Hash,
2172    Copy
2173)]
2174pub struct PlanContext {
2175    pub wall_time: DateTime<Utc>,
2176    pub ignore_if_exists_errors: bool,
2177}
2178
2179impl PlanContext {
2180    pub fn new(wall_time: DateTime<Utc>) -> Self {
2181        Self {
2182            wall_time,
2183            ignore_if_exists_errors: false,
2184        }
2185    }
2186
2187    /// Return a PlanContext with zero values. This should only be used when
2188    /// planning is required but unused (like in `plan_create_table()`) or in
2189    /// tests.
2190    pub fn zero() -> Self {
2191        PlanContext {
2192            wall_time: now::to_datetime(NOW_ZERO()),
2193            ignore_if_exists_errors: false,
2194        }
2195    }
2196
2197    pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2198        self.ignore_if_exists_errors = value;
2199        self
2200    }
2201}