Skip to main content

mz_sql/
plan.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL planning.
11//!
12//! SQL planning is the process of taking the abstract syntax tree of a
13//! [`Statement`] and turning it into a [`Plan`] that the dataflow layer can
14//! execute.
15//!
16//! Statements must be purified before they can be planned. See the
17//! [`pure`](crate::pure) module for details.
18
19// Internal module layout.
20//
21// The entry point for planning is `statement::handle_statement`. That function
22// dispatches to a more specific `handle` function for the particular statement
23// type. For most statements, this `handle` function is uninteresting and short,
24// but anything involving a `SELECT` statement gets complicated. `SELECT`
25// queries wind through the functions in the `query` module, starting with
26// `plan_root_query` and fanning out based on the contents of the `SELECT`
27// statement.
28
29use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{
41    CollectionPlan, ColumnOrder, MapFilterProject, MirRelationExpr, MirScalarExpr, RowSetFinishing,
42};
43use mz_ore::now::{self, NOW_ZERO};
44use mz_pgcopy::CopyFormatParams;
45use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
46use mz_repr::explain::{ExplainConfig, ExplainFormat};
47use mz_repr::network_policy_id::NetworkPolicyId;
48use mz_repr::optimize::OptimizerFeatureOverrides;
49use mz_repr::refresh_schedule::RefreshSchedule;
50use mz_repr::role_id::RoleId;
51use mz_repr::{
52    CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, Row, SqlColumnType,
53    SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
54};
55use mz_sql_parser::ast::{
56    AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
57    RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
58    Value, WithOptionValue,
59};
60use mz_ssh_util::keys::SshKeyPair;
61use mz_storage_types::connections::aws::AwsConnection;
62use mz_storage_types::connections::inline::ReferencedConnection;
63use mz_storage_types::connections::{
64    AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
65    MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
66};
67use mz_storage_types::instances::StorageInstanceId;
68use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
69use mz_storage_types::sources::{
70    SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
71};
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74
75use crate::ast::{
76    ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
77    TransactionAccessMode,
78};
79use crate::catalog::{
80    CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
81    RoleAttributesRaw,
82};
83use crate::names::{
84    Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
85    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
86};
87
88pub(crate) mod error;
89pub(crate) mod explain;
90pub(crate) mod hir;
91pub(crate) mod literal;
92pub(crate) mod lowering;
93pub(crate) mod notice;
94pub(crate) mod plan_utils;
95pub(crate) mod query;
96pub(crate) mod scope;
97pub(crate) mod side_effecting_func;
98pub(crate) mod statement;
99pub(crate) mod transform_ast;
100pub(crate) mod transform_hir;
101pub(crate) mod typeconv;
102pub(crate) mod with_options;
103
104use crate::plan;
105use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
106use crate::plan::with_options::OptionalDuration;
107pub use error::PlanError;
108pub use explain::normalize_subqueries;
109pub use hir::{
110    AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
111    WindowExprType,
112};
113pub use lowering::Config as HirToMirConfig;
114pub use notice::PlanNotice;
115pub use query::{ExprContext, QueryContext, QueryLifetime};
116pub use scope::Scope;
117pub use side_effecting_func::SideEffectingFunc;
118pub use statement::ddl::{
119    AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
120    PlannedAlterRoleOption, PlannedRoleVariable, SqlServerConfigOptionExtracted,
121};
122pub use statement::{
123    StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
124    resolve_cluster_for_materialized_view,
125};
126
127use self::statement::ddl::ClusterAlterOptionExtracted;
128use self::with_options::TryFromValue;
129
130/// Instructions for executing a SQL query.
131#[derive(Debug, EnumKind)]
132#[enum_kind(PlanKind)]
133pub enum Plan {
134    CreateConnection(CreateConnectionPlan),
135    CreateDatabase(CreateDatabasePlan),
136    CreateSchema(CreateSchemaPlan),
137    CreateRole(CreateRolePlan),
138    CreateCluster(CreateClusterPlan),
139    CreateClusterReplica(CreateClusterReplicaPlan),
140    CreateSource(CreateSourcePlan),
141    CreateSources(Vec<CreateSourcePlanBundle>),
142    CreateSecret(CreateSecretPlan),
143    CreateSink(CreateSinkPlan),
144    CreateTable(CreateTablePlan),
145    CreateView(CreateViewPlan),
146    CreateMaterializedView(CreateMaterializedViewPlan),
147    CreateContinualTask(CreateContinualTaskPlan),
148    CreateNetworkPolicy(CreateNetworkPolicyPlan),
149    CreateIndex(CreateIndexPlan),
150    CreateType(CreateTypePlan),
151    Comment(CommentPlan),
152    DiscardTemp,
153    DiscardAll,
154    DropObjects(DropObjectsPlan),
155    DropOwned(DropOwnedPlan),
156    EmptyQuery,
157    ShowAllVariables,
158    ShowCreate(ShowCreatePlan),
159    ShowColumns(ShowColumnsPlan),
160    ShowVariable(ShowVariablePlan),
161    InspectShard(InspectShardPlan),
162    SetVariable(SetVariablePlan),
163    ResetVariable(ResetVariablePlan),
164    SetTransaction(SetTransactionPlan),
165    StartTransaction(StartTransactionPlan),
166    CommitTransaction(CommitTransactionPlan),
167    AbortTransaction(AbortTransactionPlan),
168    Select(SelectPlan),
169    Subscribe(SubscribePlan),
170    CopyFrom(CopyFromPlan),
171    CopyTo(CopyToPlan),
172    ExplainPlan(ExplainPlanPlan),
173    ExplainPushdown(ExplainPushdownPlan),
174    ExplainTimestamp(ExplainTimestampPlan),
175    ExplainSinkSchema(ExplainSinkSchemaPlan),
176    Insert(InsertPlan),
177    AlterCluster(AlterClusterPlan),
178    AlterClusterSwap(AlterClusterSwapPlan),
179    AlterNoop(AlterNoopPlan),
180    AlterSetCluster(AlterSetClusterPlan),
181    AlterConnection(AlterConnectionPlan),
182    AlterSource(AlterSourcePlan),
183    AlterClusterRename(AlterClusterRenamePlan),
184    AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
185    AlterItemRename(AlterItemRenamePlan),
186    AlterSchemaRename(AlterSchemaRenamePlan),
187    AlterSchemaSwap(AlterSchemaSwapPlan),
188    AlterSecret(AlterSecretPlan),
189    AlterSink(AlterSinkPlan),
190    AlterSystemSet(AlterSystemSetPlan),
191    AlterSystemReset(AlterSystemResetPlan),
192    AlterSystemResetAll(AlterSystemResetAllPlan),
193    AlterRole(AlterRolePlan),
194    AlterOwner(AlterOwnerPlan),
195    AlterTableAddColumn(AlterTablePlan),
196    AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementPlan),
197    AlterNetworkPolicy(AlterNetworkPolicyPlan),
198    Declare(DeclarePlan),
199    Fetch(FetchPlan),
200    Close(ClosePlan),
201    ReadThenWrite(ReadThenWritePlan),
202    Prepare(PreparePlan),
203    Execute(ExecutePlan),
204    Deallocate(DeallocatePlan),
205    Raise(RaisePlan),
206    GrantRole(GrantRolePlan),
207    RevokeRole(RevokeRolePlan),
208    GrantPrivileges(GrantPrivilegesPlan),
209    RevokePrivileges(RevokePrivilegesPlan),
210    AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
211    ReassignOwned(ReassignOwnedPlan),
212    SideEffectingFunc(SideEffectingFunc),
213    ValidateConnection(ValidateConnectionPlan),
214    AlterRetainHistory(AlterRetainHistoryPlan),
215}
216
217impl Plan {
218    /// Expresses which [`StatementKind`] can generate which set of
219    /// [`PlanKind`].
220    pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
221        match stmt {
222            StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
223            StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
224            StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
225            StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
226            StatementKind::AlterObjectRename => &[
227                PlanKind::AlterClusterRename,
228                PlanKind::AlterClusterReplicaRename,
229                PlanKind::AlterItemRename,
230                PlanKind::AlterSchemaRename,
231                PlanKind::AlterNoop,
232            ],
233            StatementKind::AlterObjectSwap => &[
234                PlanKind::AlterClusterSwap,
235                PlanKind::AlterSchemaSwap,
236                PlanKind::AlterNoop,
237            ],
238            StatementKind::AlterRole => &[PlanKind::AlterRole],
239            StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
240            StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
241            StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
242            StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
243            StatementKind::AlterSource => &[
244                PlanKind::AlterNoop,
245                PlanKind::AlterSource,
246                PlanKind::AlterRetainHistory,
247            ],
248            StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
249            StatementKind::AlterSystemResetAll => {
250                &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
251            }
252            StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
253            StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
254            StatementKind::AlterTableAddColumn => {
255                &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
256            }
257            StatementKind::AlterMaterializedViewApplyReplacement => &[
258                PlanKind::AlterNoop,
259                PlanKind::AlterMaterializedViewApplyReplacement,
260            ],
261            StatementKind::Close => &[PlanKind::Close],
262            StatementKind::Comment => &[PlanKind::Comment],
263            StatementKind::Commit => &[PlanKind::CommitTransaction],
264            StatementKind::Copy => &[
265                PlanKind::CopyFrom,
266                PlanKind::Select,
267                PlanKind::Subscribe,
268                PlanKind::CopyTo,
269            ],
270            StatementKind::CreateCluster => &[PlanKind::CreateCluster],
271            StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
272            StatementKind::CreateConnection => &[PlanKind::CreateConnection],
273            StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
274            StatementKind::CreateIndex => &[PlanKind::CreateIndex],
275            StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
276            StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
277            StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
278            StatementKind::CreateRole => &[PlanKind::CreateRole],
279            StatementKind::CreateSchema => &[PlanKind::CreateSchema],
280            StatementKind::CreateSecret => &[PlanKind::CreateSecret],
281            StatementKind::CreateSink => &[PlanKind::CreateSink],
282            StatementKind::CreateSource | StatementKind::CreateSubsource => {
283                &[PlanKind::CreateSource]
284            }
285            StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
286            StatementKind::CreateTable => &[PlanKind::CreateTable],
287            StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
288            StatementKind::CreateType => &[PlanKind::CreateType],
289            StatementKind::CreateView => &[PlanKind::CreateView],
290            StatementKind::Deallocate => &[PlanKind::Deallocate],
291            StatementKind::Declare => &[PlanKind::Declare],
292            StatementKind::Delete => &[PlanKind::ReadThenWrite],
293            StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
294            StatementKind::DropObjects => &[PlanKind::DropObjects],
295            StatementKind::DropOwned => &[PlanKind::DropOwned],
296            StatementKind::Execute => &[PlanKind::Execute],
297            StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
298            StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
299            StatementKind::ExplainAnalyzeObject => &[PlanKind::Select],
300            StatementKind::ExplainAnalyzeCluster => &[PlanKind::Select],
301            StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
302            StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
303            StatementKind::Fetch => &[PlanKind::Fetch],
304            StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
305            StatementKind::GrantRole => &[PlanKind::GrantRole],
306            StatementKind::Insert => &[PlanKind::Insert],
307            StatementKind::Prepare => &[PlanKind::Prepare],
308            StatementKind::Raise => &[PlanKind::Raise],
309            StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
310            StatementKind::ResetVariable => &[PlanKind::ResetVariable],
311            StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
312            StatementKind::RevokeRole => &[PlanKind::RevokeRole],
313            StatementKind::Rollback => &[PlanKind::AbortTransaction],
314            StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
315            StatementKind::SetTransaction => &[PlanKind::SetTransaction],
316            StatementKind::SetVariable => &[PlanKind::SetVariable],
317            StatementKind::Show => &[
318                PlanKind::Select,
319                PlanKind::ShowVariable,
320                PlanKind::ShowCreate,
321                PlanKind::ShowColumns,
322                PlanKind::ShowAllVariables,
323                PlanKind::InspectShard,
324            ],
325            StatementKind::StartTransaction => &[PlanKind::StartTransaction],
326            StatementKind::Subscribe => &[PlanKind::Subscribe],
327            StatementKind::Update => &[PlanKind::ReadThenWrite],
328            StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
329            StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
330        }
331    }
332
333    /// Returns a human readable name of the plan. Meant for use in messages sent back to a user.
334    pub fn name(&self) -> &str {
335        match self {
336            Plan::CreateConnection(_) => "create connection",
337            Plan::CreateDatabase(_) => "create database",
338            Plan::CreateSchema(_) => "create schema",
339            Plan::CreateRole(_) => "create role",
340            Plan::CreateCluster(_) => "create cluster",
341            Plan::CreateClusterReplica(_) => "create cluster replica",
342            Plan::CreateSource(_) => "create source",
343            Plan::CreateSources(_) => "create source",
344            Plan::CreateSecret(_) => "create secret",
345            Plan::CreateSink(_) => "create sink",
346            Plan::CreateTable(_) => "create table",
347            Plan::CreateView(_) => "create view",
348            Plan::CreateMaterializedView(_) => "create materialized view",
349            Plan::CreateContinualTask(_) => "create continual task",
350            Plan::CreateIndex(_) => "create index",
351            Plan::CreateType(_) => "create type",
352            Plan::CreateNetworkPolicy(_) => "create network policy",
353            Plan::Comment(_) => "comment",
354            Plan::DiscardTemp => "discard temp",
355            Plan::DiscardAll => "discard all",
356            Plan::DropObjects(plan) => match plan.object_type {
357                ObjectType::Table => "drop table",
358                ObjectType::View => "drop view",
359                ObjectType::MaterializedView => "drop materialized view",
360                ObjectType::Source => "drop source",
361                ObjectType::Sink => "drop sink",
362                ObjectType::Index => "drop index",
363                ObjectType::Type => "drop type",
364                ObjectType::Role => "drop roles",
365                ObjectType::Cluster => "drop clusters",
366                ObjectType::ClusterReplica => "drop cluster replicas",
367                ObjectType::Secret => "drop secret",
368                ObjectType::Connection => "drop connection",
369                ObjectType::Database => "drop database",
370                ObjectType::Schema => "drop schema",
371                ObjectType::Func => "drop function",
372                ObjectType::ContinualTask => "drop continual task",
373                ObjectType::NetworkPolicy => "drop network policy",
374            },
375            Plan::DropOwned(_) => "drop owned",
376            Plan::EmptyQuery => "do nothing",
377            Plan::ShowAllVariables => "show all variables",
378            Plan::ShowCreate(_) => "show create",
379            Plan::ShowColumns(_) => "show columns",
380            Plan::ShowVariable(_) => "show variable",
381            Plan::InspectShard(_) => "inspect shard",
382            Plan::SetVariable(_) => "set variable",
383            Plan::ResetVariable(_) => "reset variable",
384            Plan::SetTransaction(_) => "set transaction",
385            Plan::StartTransaction(_) => "start transaction",
386            Plan::CommitTransaction(_) => "commit",
387            Plan::AbortTransaction(_) => "abort",
388            Plan::Select(_) => "select",
389            Plan::Subscribe(_) => "subscribe",
390            Plan::CopyFrom(_) => "copy from",
391            Plan::CopyTo(_) => "copy to",
392            Plan::ExplainPlan(_) => "explain plan",
393            Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
394            Plan::ExplainTimestamp(_) => "explain timestamp",
395            Plan::ExplainSinkSchema(_) => "explain schema",
396            Plan::Insert(_) => "insert",
397            Plan::AlterNoop(plan) => match plan.object_type {
398                ObjectType::Table => "alter table",
399                ObjectType::View => "alter view",
400                ObjectType::MaterializedView => "alter materialized view",
401                ObjectType::Source => "alter source",
402                ObjectType::Sink => "alter sink",
403                ObjectType::Index => "alter index",
404                ObjectType::Type => "alter type",
405                ObjectType::Role => "alter role",
406                ObjectType::Cluster => "alter cluster",
407                ObjectType::ClusterReplica => "alter cluster replica",
408                ObjectType::Secret => "alter secret",
409                ObjectType::Connection => "alter connection",
410                ObjectType::Database => "alter database",
411                ObjectType::Schema => "alter schema",
412                ObjectType::Func => "alter function",
413                ObjectType::ContinualTask => "alter continual task",
414                ObjectType::NetworkPolicy => "alter network policy",
415            },
416            Plan::AlterCluster(_) => "alter cluster",
417            Plan::AlterClusterRename(_) => "alter cluster rename",
418            Plan::AlterClusterSwap(_) => "alter cluster swap",
419            Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
420            Plan::AlterSetCluster(_) => "alter set cluster",
421            Plan::AlterConnection(_) => "alter connection",
422            Plan::AlterSource(_) => "alter source",
423            Plan::AlterItemRename(_) => "rename item",
424            Plan::AlterSchemaRename(_) => "alter rename schema",
425            Plan::AlterSchemaSwap(_) => "alter swap schema",
426            Plan::AlterSecret(_) => "alter secret",
427            Plan::AlterSink(_) => "alter sink",
428            Plan::AlterSystemSet(_) => "alter system",
429            Plan::AlterSystemReset(_) => "alter system",
430            Plan::AlterSystemResetAll(_) => "alter system",
431            Plan::AlterRole(_) => "alter role",
432            Plan::AlterNetworkPolicy(_) => "alter network policy",
433            Plan::AlterOwner(plan) => match plan.object_type {
434                ObjectType::Table => "alter table owner",
435                ObjectType::View => "alter view owner",
436                ObjectType::MaterializedView => "alter materialized view owner",
437                ObjectType::Source => "alter source owner",
438                ObjectType::Sink => "alter sink owner",
439                ObjectType::Index => "alter index owner",
440                ObjectType::Type => "alter type owner",
441                ObjectType::Role => "alter role owner",
442                ObjectType::Cluster => "alter cluster owner",
443                ObjectType::ClusterReplica => "alter cluster replica owner",
444                ObjectType::Secret => "alter secret owner",
445                ObjectType::Connection => "alter connection owner",
446                ObjectType::Database => "alter database owner",
447                ObjectType::Schema => "alter schema owner",
448                ObjectType::Func => "alter function owner",
449                ObjectType::ContinualTask => "alter continual task owner",
450                ObjectType::NetworkPolicy => "alter network policy owner",
451            },
452            Plan::AlterTableAddColumn(_) => "alter table add column",
453            Plan::AlterMaterializedViewApplyReplacement(_) => {
454                "alter materialized view apply replacement"
455            }
456            Plan::Declare(_) => "declare",
457            Plan::Fetch(_) => "fetch",
458            Plan::Close(_) => "close",
459            Plan::ReadThenWrite(plan) => match plan.kind {
460                MutationKind::Insert => "insert into select",
461                MutationKind::Update => "update",
462                MutationKind::Delete => "delete",
463            },
464            Plan::Prepare(_) => "prepare",
465            Plan::Execute(_) => "execute",
466            Plan::Deallocate(_) => "deallocate",
467            Plan::Raise(_) => "raise",
468            Plan::GrantRole(_) => "grant role",
469            Plan::RevokeRole(_) => "revoke role",
470            Plan::GrantPrivileges(_) => "grant privilege",
471            Plan::RevokePrivileges(_) => "revoke privilege",
472            Plan::AlterDefaultPrivileges(_) => "alter default privileges",
473            Plan::ReassignOwned(_) => "reassign owned",
474            Plan::SideEffectingFunc(_) => "side effecting func",
475            Plan::ValidateConnection(_) => "validate connection",
476            Plan::AlterRetainHistory(_) => "alter retain history",
477        }
478    }
479
480    /// Returns `true` iff this `Plan` is allowed to be executed in read-only
481    /// mode.
482    ///
483    /// We use an explicit allow-list, to avoid future additions automatically
484    /// falling into the `true` category.
485    pub fn allowed_in_read_only(&self) -> bool {
486        match self {
487            // These two set non-durable session variables, so are okay in
488            // read-only mode.
489            Plan::SetVariable(_) => true,
490            Plan::ResetVariable(_) => true,
491            Plan::SetTransaction(_) => true,
492            Plan::StartTransaction(_) => true,
493            Plan::CommitTransaction(_) => true,
494            Plan::AbortTransaction(_) => true,
495            Plan::Select(_) => true,
496            Plan::EmptyQuery => true,
497            Plan::ShowAllVariables => true,
498            Plan::ShowCreate(_) => true,
499            Plan::ShowColumns(_) => true,
500            Plan::ShowVariable(_) => true,
501            Plan::InspectShard(_) => true,
502            Plan::Subscribe(_) => true,
503            Plan::CopyTo(_) => true,
504            Plan::ExplainPlan(_) => true,
505            Plan::ExplainPushdown(_) => true,
506            Plan::ExplainTimestamp(_) => true,
507            Plan::ExplainSinkSchema(_) => true,
508            Plan::ValidateConnection(_) => true,
509            _ => false,
510        }
511    }
512}
513
514#[derive(Debug)]
515pub struct StartTransactionPlan {
516    pub access: Option<TransactionAccessMode>,
517    pub isolation_level: Option<TransactionIsolationLevel>,
518}
519
520#[derive(Debug)]
521pub enum TransactionType {
522    Explicit,
523    Implicit,
524}
525
526impl TransactionType {
527    pub fn is_explicit(&self) -> bool {
528        matches!(self, TransactionType::Explicit)
529    }
530
531    pub fn is_implicit(&self) -> bool {
532        matches!(self, TransactionType::Implicit)
533    }
534}
535
536#[derive(Debug)]
537pub struct CommitTransactionPlan {
538    pub transaction_type: TransactionType,
539}
540
541#[derive(Debug)]
542pub struct AbortTransactionPlan {
543    pub transaction_type: TransactionType,
544}
545
546#[derive(Debug)]
547pub struct CreateDatabasePlan {
548    pub name: String,
549    pub if_not_exists: bool,
550}
551
552#[derive(Debug)]
553pub struct CreateSchemaPlan {
554    pub database_spec: ResolvedDatabaseSpecifier,
555    pub schema_name: String,
556    pub if_not_exists: bool,
557}
558
559#[derive(Debug)]
560pub struct CreateRolePlan {
561    pub name: String,
562    pub attributes: RoleAttributesRaw,
563}
564
565#[derive(Debug, PartialEq, Eq, Clone)]
566pub struct CreateClusterPlan {
567    pub name: String,
568    pub variant: CreateClusterVariant,
569    pub workload_class: Option<String>,
570}
571
572#[derive(Debug, PartialEq, Eq, Clone)]
573pub enum CreateClusterVariant {
574    Managed(CreateClusterManagedPlan),
575    Unmanaged(CreateClusterUnmanagedPlan),
576}
577
578#[derive(Debug, PartialEq, Eq, Clone)]
579pub struct CreateClusterUnmanagedPlan {
580    pub replicas: Vec<(String, ReplicaConfig)>,
581}
582
583#[derive(Debug, PartialEq, Eq, Clone)]
584pub struct CreateClusterManagedPlan {
585    pub replication_factor: u32,
586    pub size: String,
587    pub availability_zones: Vec<String>,
588    pub compute: ComputeReplicaConfig,
589    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
590    pub schedule: ClusterSchedule,
591}
592
593#[derive(Debug)]
594pub struct CreateClusterReplicaPlan {
595    pub cluster_id: ClusterId,
596    pub name: String,
597    pub config: ReplicaConfig,
598}
599
600/// Configuration of introspection for a cluster replica.
601#[derive(
602    Clone,
603    Copy,
604    Debug,
605    Serialize,
606    Deserialize,
607    PartialOrd,
608    Ord,
609    PartialEq,
610    Eq
611)]
612pub struct ComputeReplicaIntrospectionConfig {
613    /// Whether to introspect the introspection.
614    pub debugging: bool,
615    /// The interval at which to introspect.
616    pub interval: Duration,
617}
618
619#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
620pub struct ComputeReplicaConfig {
621    pub introspection: Option<ComputeReplicaIntrospectionConfig>,
622}
623
624#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
625pub enum ReplicaConfig {
626    Unorchestrated {
627        storagectl_addrs: Vec<String>,
628        computectl_addrs: Vec<String>,
629        compute: ComputeReplicaConfig,
630    },
631    Orchestrated {
632        size: String,
633        availability_zone: Option<String>,
634        compute: ComputeReplicaConfig,
635        internal: bool,
636        billed_as: Option<String>,
637    },
638}
639
640#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
641pub enum ClusterSchedule {
642    /// The system won't automatically turn the cluster On or Off.
643    Manual,
644    /// The cluster will be On when a REFRESH materialized view on it needs to refresh.
645    /// `hydration_time_estimate` determines how much time before a refresh to turn the
646    /// cluster On, so that it can rehydrate already before the refresh time.
647    Refresh { hydration_time_estimate: Duration },
648}
649
650impl Default for ClusterSchedule {
651    fn default() -> Self {
652        // (Has to be consistent with `impl Default for ClusterScheduleOptionValue`.)
653        ClusterSchedule::Manual
654    }
655}
656
657#[derive(Debug)]
658pub struct CreateSourcePlan {
659    pub name: QualifiedItemName,
660    pub source: Source,
661    pub if_not_exists: bool,
662    pub timeline: Timeline,
663    // None for subsources, which run on the parent cluster.
664    pub in_cluster: Option<ClusterId>,
665}
666
667#[derive(Clone, Debug, PartialEq, Eq)]
668pub struct SourceReferences {
669    pub updated_at: u64,
670    pub references: Vec<SourceReference>,
671}
672
673/// An available external reference for a source and if possible to retrieve,
674/// any column names it contains.
675#[derive(Clone, Debug, PartialEq, Eq)]
676pub struct SourceReference {
677    pub name: String,
678    pub namespace: Option<String>,
679    pub columns: Vec<String>,
680}
681
682/// A [`CreateSourcePlan`] and the metadata necessary to sequence it.
683#[derive(Debug)]
684pub struct CreateSourcePlanBundle {
685    /// ID of this source in the Catalog.
686    pub item_id: CatalogItemId,
687    /// ID used to reference this source from outside the catalog, e.g. compute.
688    pub global_id: GlobalId,
689    /// Details of the source to create.
690    pub plan: CreateSourcePlan,
691    /// Other catalog objects that are referenced by this source, determined at name resolution.
692    pub resolved_ids: ResolvedIds,
693    /// All the available upstream references for this source.
694    /// Populated for top-level sources that can contain subsources/tables
695    /// and used during sequencing to populate the appropriate catalog fields.
696    pub available_source_references: Option<SourceReferences>,
697}
698
699#[derive(Debug)]
700pub struct CreateConnectionPlan {
701    pub name: QualifiedItemName,
702    pub if_not_exists: bool,
703    pub connection: Connection,
704    pub validate: bool,
705}
706
707#[derive(Debug)]
708pub struct ValidateConnectionPlan {
709    /// ID of the connection in the Catalog.
710    pub id: CatalogItemId,
711    /// The connection to validate.
712    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
713}
714
715#[derive(Debug)]
716pub struct CreateSecretPlan {
717    pub name: QualifiedItemName,
718    pub secret: Secret,
719    pub if_not_exists: bool,
720}
721
722#[derive(Debug)]
723pub struct CreateSinkPlan {
724    pub name: QualifiedItemName,
725    pub sink: Sink,
726    pub with_snapshot: bool,
727    pub if_not_exists: bool,
728    pub in_cluster: ClusterId,
729}
730
731#[derive(Debug)]
732pub struct CreateTablePlan {
733    pub name: QualifiedItemName,
734    pub table: Table,
735    pub if_not_exists: bool,
736}
737
738#[derive(Debug, Clone)]
739pub struct CreateViewPlan {
740    pub name: QualifiedItemName,
741    pub view: View,
742    /// The Catalog objects that this view is replacing, if any.
743    pub replace: Option<CatalogItemId>,
744    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
745    pub drop_ids: Vec<CatalogItemId>,
746    pub if_not_exists: bool,
747    /// True if the view contains an expression that can make the exact column list
748    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
749    pub ambiguous_columns: bool,
750}
751
752#[derive(Debug, Clone)]
753pub struct CreateMaterializedViewPlan {
754    pub name: QualifiedItemName,
755    pub materialized_view: MaterializedView,
756    /// The Catalog objects that this materialized view is replacing, if any.
757    pub replace: Option<CatalogItemId>,
758    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
759    pub drop_ids: Vec<CatalogItemId>,
760    pub if_not_exists: bool,
761    /// True if the materialized view contains an expression that can make the exact column list
762    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
763    pub ambiguous_columns: bool,
764}
765
766#[derive(Debug, Clone)]
767pub struct CreateContinualTaskPlan {
768    pub name: QualifiedItemName,
769    /// During initial creation, the `LocalId` placeholder for this CT in `continual_task.expr`.
770    /// None on restart.
771    pub placeholder_id: Option<mz_expr::LocalId>,
772    pub desc: RelationDesc,
773    /// ID of the collection we read into this continual task.
774    pub input_id: GlobalId,
775    pub with_snapshot: bool,
776    /// Definition for the continual task.
777    pub continual_task: MaterializedView,
778}
779
780#[derive(Debug, Clone)]
781pub struct CreateNetworkPolicyPlan {
782    pub name: String,
783    pub rules: Vec<NetworkPolicyRule>,
784}
785
786#[derive(Debug, Clone)]
787pub struct AlterNetworkPolicyPlan {
788    pub id: NetworkPolicyId,
789    pub name: String,
790    pub rules: Vec<NetworkPolicyRule>,
791}
792
793#[derive(Debug, Clone)]
794pub struct CreateIndexPlan {
795    pub name: QualifiedItemName,
796    pub index: Index,
797    pub if_not_exists: bool,
798}
799
800#[derive(Debug)]
801pub struct CreateTypePlan {
802    pub name: QualifiedItemName,
803    pub typ: Type,
804}
805
806#[derive(Debug)]
807pub struct DropObjectsPlan {
808    /// The IDs of only the objects directly referenced in the `DROP` statement.
809    pub referenced_ids: Vec<ObjectId>,
810    /// All object IDs to drop. Includes `referenced_ids` and all descendants.
811    pub drop_ids: Vec<ObjectId>,
812    /// The type of object that was dropped explicitly in the DROP statement. `ids` may contain
813    /// objects of different types due to CASCADE.
814    pub object_type: ObjectType,
815}
816
817#[derive(Debug)]
818pub struct DropOwnedPlan {
819    /// The role IDs that own the objects.
820    pub role_ids: Vec<RoleId>,
821    /// All object IDs to drop.
822    pub drop_ids: Vec<ObjectId>,
823    /// The privileges to revoke.
824    pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
825    /// The default privileges to revoke.
826    pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
827}
828
829#[derive(Debug)]
830pub struct ShowVariablePlan {
831    pub name: String,
832}
833
834#[derive(Debug)]
835pub struct InspectShardPlan {
836    /// ID of the storage collection to inspect.
837    pub id: GlobalId,
838}
839
840#[derive(Debug)]
841pub struct SetVariablePlan {
842    pub name: String,
843    pub value: VariableValue,
844    pub local: bool,
845}
846
847#[derive(Debug)]
848pub enum VariableValue {
849    Default,
850    Values(Vec<String>),
851}
852
853#[derive(Debug)]
854pub struct ResetVariablePlan {
855    pub name: String,
856}
857
858#[derive(Debug)]
859pub struct SetTransactionPlan {
860    pub local: bool,
861    pub modes: Vec<TransactionMode>,
862}
863
864/// A plan for select statements.
865#[derive(Clone, Debug)]
866pub struct SelectPlan {
867    /// The `SELECT` statement itself. Used for explain/notices, but not otherwise
868    /// load-bearing. Boxed to save stack space.
869    pub select: Option<Box<SelectStatement<Aug>>>,
870    /// The plan as a HIR.
871    pub source: HirRelationExpr,
872    /// At what time should this select happen?
873    pub when: QueryWhen,
874    /// Instructions how to form the result set.
875    pub finishing: RowSetFinishing,
876    /// For `COPY TO STDOUT`, the format to use.
877    pub copy_to: Option<CopyFormat>,
878}
879
880impl SelectPlan {
881    pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
882        let arity = typ.arity();
883        SelectPlan {
884            select: None,
885            source: HirRelationExpr::Constant { rows, typ },
886            when: QueryWhen::Immediately,
887            finishing: RowSetFinishing::trivial(arity),
888            copy_to: None,
889        }
890    }
891}
892
893#[derive(Debug, Clone)]
894pub enum SubscribeOutput {
895    Diffs,
896    WithinTimestampOrderBy {
897        /// We pretend that mz_diff is prepended to the normal columns, making it index 0
898        order_by: Vec<ColumnOrder>,
899    },
900    EnvelopeUpsert {
901        /// Order by with just keys
902        order_by_keys: Vec<ColumnOrder>,
903    },
904    EnvelopeDebezium {
905        /// Order by with just keys
906        order_by_keys: Vec<ColumnOrder>,
907    },
908}
909
910#[derive(Debug, Clone)]
911pub struct SubscribePlan {
912    pub from: SubscribeFrom,
913    pub with_snapshot: bool,
914    pub when: QueryWhen,
915    pub up_to: Option<Timestamp>,
916    pub copy_to: Option<CopyFormat>,
917    pub emit_progress: bool,
918    pub output: SubscribeOutput,
919}
920
921#[derive(Debug, Clone)]
922pub enum SubscribeFrom {
923    /// ID of the collection to subscribe to.
924    Id(GlobalId),
925    /// Query to subscribe to.
926    Query {
927        expr: MirRelationExpr,
928        desc: RelationDesc,
929    },
930}
931
932impl SubscribeFrom {
933    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
934        match self {
935            SubscribeFrom::Id(id) => BTreeSet::from([*id]),
936            SubscribeFrom::Query { expr, .. } => expr.depends_on(),
937        }
938    }
939
940    pub fn contains_temporal(&self) -> bool {
941        match self {
942            SubscribeFrom::Id(_) => false,
943            SubscribeFrom::Query { expr, .. } => expr.contains_temporal(),
944        }
945    }
946}
947
948#[derive(Debug)]
949pub struct ShowCreatePlan {
950    pub id: ObjectId,
951    pub row: Row,
952}
953
954#[derive(Debug)]
955pub struct ShowColumnsPlan {
956    pub id: CatalogItemId,
957    pub select_plan: SelectPlan,
958    pub new_resolved_ids: ResolvedIds,
959}
960
961#[derive(Debug)]
962pub struct CopyFromPlan {
963    /// Table we're copying into.
964    pub target_id: CatalogItemId,
965    /// Human-readable full name of the target table.
966    pub target_name: String,
967    /// Source we're copying data from.
968    pub source: CopyFromSource,
969    /// How input columns map to those on the destination table.
970    ///
971    /// TODO(cf2): Remove this field in favor of the mfp.
972    pub columns: Vec<ColumnIndex>,
973    /// [`RelationDesc`] describing the input data.
974    pub source_desc: RelationDesc,
975    /// Changes the shape of the input data to match the destination table.
976    pub mfp: MapFilterProject,
977    /// Format specific params for copying the input data.
978    pub params: CopyFormatParams<'static>,
979    /// Filter for the source files we're copying from, e.g. an S3 prefix.
980    pub filter: Option<CopyFromFilter>,
981}
982
983#[derive(Debug)]
984pub enum CopyFromSource {
985    /// Copying from a file local to the user, transmitted via pgwire.
986    Stdin,
987    /// A remote resource, e.g. HTTP file.
988    ///
989    /// The contained [`HirScalarExpr`] evaluates to the Url for the remote resource.
990    Url(HirScalarExpr),
991    /// A file in an S3 bucket.
992    AwsS3 {
993        /// Expression that evaluates to the file we want to copy.
994        uri: HirScalarExpr,
995        /// Details for how we connect to AWS S3.
996        connection: AwsConnection,
997        /// ID of the connection object.
998        connection_id: CatalogItemId,
999    },
1000}
1001
1002#[derive(Debug)]
1003pub enum CopyFromFilter {
1004    Files(Vec<String>),
1005    Pattern(String),
1006}
1007
1008/// `COPY TO S3`
1009///
1010/// (This is a completely different thing from `COPY TO STDOUT`. That is a `Plan::Select` with
1011/// `copy_to` set.)
1012#[derive(Debug, Clone)]
1013pub struct CopyToPlan {
1014    /// The select query plan whose data will be copied to destination uri.
1015    pub select_plan: SelectPlan,
1016    pub desc: RelationDesc,
1017    /// The scalar expression to be resolved to get the destination uri.
1018    pub to: HirScalarExpr,
1019    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
1020    /// The ID of the connection.
1021    pub connection_id: CatalogItemId,
1022    pub format: S3SinkFormat,
1023    pub max_file_size: u64,
1024}
1025
1026#[derive(Clone, Debug)]
1027pub struct ExplainPlanPlan {
1028    pub stage: ExplainStage,
1029    pub format: ExplainFormat,
1030    pub config: ExplainConfig,
1031    pub explainee: Explainee,
1032}
1033
1034/// The type of object to be explained
1035#[derive(Clone, Debug)]
1036pub enum Explainee {
1037    /// Lookup and explain a plan saved for an view.
1038    View(CatalogItemId),
1039    /// Lookup and explain a plan saved for an existing materialized view.
1040    MaterializedView(CatalogItemId),
1041    /// Lookup and explain a plan saved for an existing index.
1042    Index(CatalogItemId),
1043    /// Replan an existing view.
1044    ReplanView(CatalogItemId),
1045    /// Replan an existing materialized view.
1046    ReplanMaterializedView(CatalogItemId),
1047    /// Replan an existing index.
1048    ReplanIndex(CatalogItemId),
1049    /// A SQL statement.
1050    Statement(ExplaineeStatement),
1051}
1052
1053/// Explainee types that are statements.
1054#[derive(Clone, Debug, EnumKind)]
1055#[enum_kind(ExplaineeStatementKind)]
1056pub enum ExplaineeStatement {
1057    /// The object to be explained is a SELECT statement.
1058    Select {
1059        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1060        broken: bool,
1061        plan: plan::SelectPlan,
1062        desc: RelationDesc,
1063    },
1064    /// The object to be explained is a CREATE VIEW.
1065    CreateView {
1066        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1067        broken: bool,
1068        plan: plan::CreateViewPlan,
1069    },
1070    /// The object to be explained is a CREATE MATERIALIZED VIEW.
1071    CreateMaterializedView {
1072        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1073        broken: bool,
1074        plan: plan::CreateMaterializedViewPlan,
1075    },
1076    /// The object to be explained is a CREATE INDEX.
1077    CreateIndex {
1078        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1079        broken: bool,
1080        plan: plan::CreateIndexPlan,
1081    },
1082    /// The object to be explained is a SUBSCRIBE statement.
1083    Subscribe {
1084        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1085        broken: bool,
1086        plan: plan::SubscribePlan,
1087    },
1088}
1089
1090impl ExplaineeStatement {
1091    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1092        match self {
1093            Self::Select { plan, .. } => plan.source.depends_on(),
1094            Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1095            Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1096            Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1097            Self::Subscribe { plan, .. } => plan.from.depends_on(),
1098        }
1099    }
1100
1101    /// Statements that have their `broken` flag set are expected to cause a
1102    /// panic in the optimizer code. In this case:
1103    ///
1104    /// 1. The optimizer pipeline execution will stop, but the panic will be
1105    ///    intercepted and will not propagate to the caller. The partial
1106    ///    optimizer trace collected until this point will be available.
1107    /// 2. The optimizer trace tracing subscriber will delegate regular tracing
1108    ///    spans and events to the default subscriber.
1109    ///
1110    /// This is useful when debugging queries that cause panics.
1111    pub fn broken(&self) -> bool {
1112        match self {
1113            Self::Select { broken, .. } => *broken,
1114            Self::CreateView { broken, .. } => *broken,
1115            Self::CreateMaterializedView { broken, .. } => *broken,
1116            Self::CreateIndex { broken, .. } => *broken,
1117            Self::Subscribe { broken, .. } => *broken,
1118        }
1119    }
1120}
1121
1122impl ExplaineeStatementKind {
1123    pub fn supports(&self, stage: &ExplainStage) -> bool {
1124        use ExplainStage::*;
1125        match self {
1126            Self::Select => true,
1127            Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1128            Self::CreateMaterializedView => true,
1129            Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1130            // SUBSCRIBE doesn't support RAW, DECORRELATED, or LOCAL stages because
1131            // it takes MIR directly rather than going through HIR lowering.
1132            Self::Subscribe => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1133        }
1134    }
1135}
1136
1137impl std::fmt::Display for ExplaineeStatementKind {
1138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1139        match self {
1140            Self::Select => write!(f, "SELECT"),
1141            Self::CreateView => write!(f, "CREATE VIEW"),
1142            Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1143            Self::CreateIndex => write!(f, "CREATE INDEX"),
1144            Self::Subscribe => write!(f, "SUBSCRIBE"),
1145        }
1146    }
1147}
1148
1149#[derive(Clone, Debug)]
1150pub struct ExplainPushdownPlan {
1151    pub explainee: Explainee,
1152}
1153
1154#[derive(Clone, Debug)]
1155pub struct ExplainTimestampPlan {
1156    pub format: ExplainFormat,
1157    pub raw_plan: HirRelationExpr,
1158    pub when: QueryWhen,
1159}
1160
1161#[derive(Debug)]
1162pub struct ExplainSinkSchemaPlan {
1163    pub sink_from: GlobalId,
1164    pub json_schema: String,
1165}
1166
1167#[derive(Debug)]
1168pub struct SendDiffsPlan {
1169    pub id: CatalogItemId,
1170    pub updates: Vec<(Row, Diff)>,
1171    pub kind: MutationKind,
1172    pub returning: Vec<(Row, NonZeroUsize)>,
1173    pub max_result_size: u64,
1174}
1175
1176#[derive(Debug)]
1177pub struct InsertPlan {
1178    pub id: CatalogItemId,
1179    pub values: HirRelationExpr,
1180    pub returning: Vec<mz_expr::MirScalarExpr>,
1181}
1182
1183#[derive(Debug)]
1184pub struct ReadThenWritePlan {
1185    pub id: CatalogItemId,
1186    pub selection: HirRelationExpr,
1187    pub finishing: RowSetFinishing,
1188    pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1189    pub kind: MutationKind,
1190    pub returning: Vec<mz_expr::MirScalarExpr>,
1191}
1192
1193/// Generated by `ALTER ... IF EXISTS` if the named object did not exist.
1194#[derive(Debug)]
1195pub struct AlterNoopPlan {
1196    pub object_type: ObjectType,
1197}
1198
1199#[derive(Debug)]
1200pub struct AlterSetClusterPlan {
1201    pub id: CatalogItemId,
1202    pub set_cluster: ClusterId,
1203}
1204
1205#[derive(Debug)]
1206pub struct AlterRetainHistoryPlan {
1207    pub id: CatalogItemId,
1208    pub value: Option<Value>,
1209    pub window: CompactionWindow,
1210    pub object_type: ObjectType,
1211}
1212
1213#[derive(Debug, Clone)]
1214
1215pub enum AlterOptionParameter<T = String> {
1216    Set(T),
1217    Reset,
1218    Unchanged,
1219}
1220
1221#[derive(Debug)]
1222pub enum AlterConnectionAction {
1223    RotateKeys,
1224    AlterOptions {
1225        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1226        drop_options: BTreeSet<ConnectionOptionName>,
1227        validate: bool,
1228    },
1229}
1230
1231#[derive(Debug)]
1232pub struct AlterConnectionPlan {
1233    pub id: CatalogItemId,
1234    pub action: AlterConnectionAction,
1235}
1236
1237#[derive(Debug)]
1238pub enum AlterSourceAction {
1239    AddSubsourceExports {
1240        subsources: Vec<CreateSourcePlanBundle>,
1241        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1242    },
1243    RefreshReferences {
1244        references: SourceReferences,
1245    },
1246}
1247
1248#[derive(Debug)]
1249pub struct AlterSourcePlan {
1250    pub item_id: CatalogItemId,
1251    pub ingestion_id: GlobalId,
1252    pub action: AlterSourceAction,
1253}
1254
1255#[derive(Debug, Clone)]
1256pub struct AlterSinkPlan {
1257    pub item_id: CatalogItemId,
1258    pub global_id: GlobalId,
1259    pub sink: Sink,
1260    pub with_snapshot: bool,
1261    pub in_cluster: ClusterId,
1262}
1263
1264#[derive(Debug, Clone)]
1265pub struct AlterClusterPlan {
1266    pub id: ClusterId,
1267    pub name: String,
1268    pub options: PlanClusterOption,
1269    pub strategy: AlterClusterPlanStrategy,
1270}
1271
1272#[derive(Debug)]
1273pub struct AlterClusterRenamePlan {
1274    pub id: ClusterId,
1275    pub name: String,
1276    pub to_name: String,
1277}
1278
1279#[derive(Debug)]
1280pub struct AlterClusterReplicaRenamePlan {
1281    pub cluster_id: ClusterId,
1282    pub replica_id: ReplicaId,
1283    pub name: QualifiedReplica,
1284    pub to_name: String,
1285}
1286
1287#[derive(Debug)]
1288pub struct AlterItemRenamePlan {
1289    pub id: CatalogItemId,
1290    pub current_full_name: FullItemName,
1291    pub to_name: String,
1292    pub object_type: ObjectType,
1293}
1294
1295#[derive(Debug)]
1296pub struct AlterSchemaRenamePlan {
1297    pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1298    pub new_schema_name: String,
1299}
1300
1301#[derive(Debug)]
1302pub struct AlterSchemaSwapPlan {
1303    pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1304    pub schema_a_name: String,
1305    pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1306    pub schema_b_name: String,
1307    pub name_temp: String,
1308}
1309
1310#[derive(Debug)]
1311pub struct AlterClusterSwapPlan {
1312    pub id_a: ClusterId,
1313    pub id_b: ClusterId,
1314    pub name_a: String,
1315    pub name_b: String,
1316    pub name_temp: String,
1317}
1318
1319#[derive(Debug)]
1320pub struct AlterSecretPlan {
1321    pub id: CatalogItemId,
1322    pub secret_as: MirScalarExpr,
1323}
1324
1325#[derive(Debug)]
1326pub struct AlterSystemSetPlan {
1327    pub name: String,
1328    pub value: VariableValue,
1329}
1330
1331#[derive(Debug)]
1332pub struct AlterSystemResetPlan {
1333    pub name: String,
1334}
1335
1336#[derive(Debug)]
1337pub struct AlterSystemResetAllPlan {}
1338
1339#[derive(Debug)]
1340pub struct AlterRolePlan {
1341    pub id: RoleId,
1342    pub name: String,
1343    pub option: PlannedAlterRoleOption,
1344}
1345
1346#[derive(Debug)]
1347pub struct AlterOwnerPlan {
1348    pub id: ObjectId,
1349    pub object_type: ObjectType,
1350    pub new_owner: RoleId,
1351}
1352
1353#[derive(Debug)]
1354pub struct AlterTablePlan {
1355    pub relation_id: CatalogItemId,
1356    pub column_name: ColumnName,
1357    pub column_type: SqlColumnType,
1358    pub raw_sql_type: RawDataType,
1359}
1360
1361#[derive(Debug, Clone)]
1362pub struct AlterMaterializedViewApplyReplacementPlan {
1363    pub id: CatalogItemId,
1364    pub replacement_id: CatalogItemId,
1365}
1366
1367#[derive(Debug)]
1368pub struct DeclarePlan {
1369    pub name: String,
1370    pub stmt: Statement<Raw>,
1371    pub sql: String,
1372    pub params: Params,
1373}
1374
1375#[derive(Debug)]
1376pub struct FetchPlan {
1377    pub name: String,
1378    pub count: Option<FetchDirection>,
1379    pub timeout: ExecuteTimeout,
1380}
1381
1382#[derive(Debug)]
1383pub struct ClosePlan {
1384    pub name: String,
1385}
1386
1387#[derive(Debug)]
1388pub struct PreparePlan {
1389    pub name: String,
1390    pub stmt: Statement<Raw>,
1391    pub sql: String,
1392    pub desc: StatementDesc,
1393}
1394
1395#[derive(Debug)]
1396pub struct ExecutePlan {
1397    pub name: String,
1398    pub params: Params,
1399}
1400
1401#[derive(Debug)]
1402pub struct DeallocatePlan {
1403    pub name: Option<String>,
1404}
1405
1406#[derive(Debug)]
1407pub struct RaisePlan {
1408    pub severity: NoticeSeverity,
1409}
1410
1411#[derive(Debug)]
1412pub struct GrantRolePlan {
1413    /// The roles that are gaining members.
1414    pub role_ids: Vec<RoleId>,
1415    /// The roles that will be added to `role_id`.
1416    pub member_ids: Vec<RoleId>,
1417    /// The role that granted the membership.
1418    pub grantor_id: RoleId,
1419}
1420
1421#[derive(Debug)]
1422pub struct RevokeRolePlan {
1423    /// The roles that are losing members.
1424    pub role_ids: Vec<RoleId>,
1425    /// The roles that will be removed from `role_id`.
1426    pub member_ids: Vec<RoleId>,
1427    /// The role that revoked the membership.
1428    pub grantor_id: RoleId,
1429}
1430
1431#[derive(Debug)]
1432pub struct UpdatePrivilege {
1433    /// The privileges being granted/revoked on an object.
1434    pub acl_mode: AclMode,
1435    /// The ID of the object receiving privileges.
1436    pub target_id: SystemObjectId,
1437    /// The role that is granting the privileges.
1438    pub grantor: RoleId,
1439}
1440
1441#[derive(Debug)]
1442pub struct GrantPrivilegesPlan {
1443    /// Description of each privilege being granted.
1444    pub update_privileges: Vec<UpdatePrivilege>,
1445    /// The roles that will granted the privileges.
1446    pub grantees: Vec<RoleId>,
1447}
1448
1449#[derive(Debug)]
1450pub struct RevokePrivilegesPlan {
1451    /// Description of each privilege being revoked.
1452    pub update_privileges: Vec<UpdatePrivilege>,
1453    /// The roles that will have privileges revoked.
1454    pub revokees: Vec<RoleId>,
1455}
1456#[derive(Debug)]
1457pub struct AlterDefaultPrivilegesPlan {
1458    /// Description of objects that match this default privilege.
1459    pub privilege_objects: Vec<DefaultPrivilegeObject>,
1460    /// The privilege to be granted/revoked from the matching objects.
1461    pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1462    /// Whether this is a grant or revoke.
1463    pub is_grant: bool,
1464}
1465
1466#[derive(Debug)]
1467pub struct ReassignOwnedPlan {
1468    /// The roles whose owned objects are being reassigned.
1469    pub old_roles: Vec<RoleId>,
1470    /// The new owner of the objects.
1471    pub new_role: RoleId,
1472    /// All object IDs to reassign.
1473    pub reassign_ids: Vec<ObjectId>,
1474}
1475
1476#[derive(Debug)]
1477pub struct CommentPlan {
1478    /// The object that this comment is associated with.
1479    pub object_id: CommentObjectId,
1480    /// A sub-component of the object that this comment is associated with, e.g. a column.
1481    ///
1482    /// TODO(parkmycar): <https://github.com/MaterializeInc/database-issues/issues/6711>.
1483    pub sub_component: Option<usize>,
1484    /// The comment itself. If `None` that indicates we should clear the existing comment.
1485    pub comment: Option<String>,
1486}
1487
1488#[derive(Clone, Debug)]
1489pub enum TableDataSource {
1490    /// The table owns data created via INSERT/UPDATE/DELETE statements.
1491    TableWrites { defaults: Vec<Expr<Aug>> },
1492
1493    /// The table receives its data from the identified `DataSourceDesc`.
1494    /// This table type does not support INSERT/UPDATE/DELETE statements.
1495    DataSource {
1496        desc: DataSourceDesc,
1497        timeline: Timeline,
1498    },
1499}
1500
1501#[derive(Clone, Debug)]
1502pub struct Table {
1503    pub create_sql: String,
1504    pub desc: VersionedRelationDesc,
1505    pub temporary: bool,
1506    pub compaction_window: Option<CompactionWindow>,
1507    pub data_source: TableDataSource,
1508}
1509
1510#[derive(Clone, Debug)]
1511pub struct Source {
1512    pub create_sql: String,
1513    pub data_source: DataSourceDesc,
1514    pub desc: RelationDesc,
1515    pub compaction_window: Option<CompactionWindow>,
1516}
1517
1518#[derive(Debug, Clone)]
1519pub enum DataSourceDesc {
1520    /// Receives data from an external system.
1521    Ingestion(SourceDesc<ReferencedConnection>),
1522    /// Receives data from an external system.
1523    OldSyntaxIngestion {
1524        desc: SourceDesc<ReferencedConnection>,
1525        // If we're dealing with an old syntax ingestion the progress id will be some other collection
1526        // and the ingestion itself will have the data from a default external reference
1527        progress_subsource: CatalogItemId,
1528        data_config: SourceExportDataConfig<ReferencedConnection>,
1529        details: SourceExportDetails,
1530    },
1531    /// This source receives its data from the identified ingestion,
1532    /// specifically the output identified by `external_reference`.
1533    IngestionExport {
1534        ingestion_id: CatalogItemId,
1535        external_reference: UnresolvedItemName,
1536        details: SourceExportDetails,
1537        data_config: SourceExportDataConfig<ReferencedConnection>,
1538    },
1539    /// Receives data from the source's reclocking/remapping operations.
1540    Progress,
1541    /// Receives data from HTTP post requests.
1542    Webhook {
1543        validate_using: Option<WebhookValidation>,
1544        body_format: WebhookBodyFormat,
1545        headers: WebhookHeaders,
1546        /// Only `Some` when created via `CREATE TABLE ... FROM WEBHOOK`.
1547        cluster_id: Option<StorageInstanceId>,
1548    },
1549}
1550
1551#[derive(Clone, Debug, Serialize)]
1552pub struct WebhookValidation {
1553    /// The expression used to validate a request.
1554    pub expression: MirScalarExpr,
1555    /// Description of the source that will be created.
1556    pub relation_desc: RelationDesc,
1557    /// The column index to provide the request body and whether to provide it as bytes.
1558    pub bodies: Vec<(usize, bool)>,
1559    /// The column index to provide the request headers and whether to provide the values as bytes.
1560    pub headers: Vec<(usize, bool)>,
1561    /// Any secrets that are used in that validation.
1562    pub secrets: Vec<WebhookValidationSecret>,
1563}
1564
1565impl WebhookValidation {
1566    const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1567
1568    /// Attempt to reduce the internal [`MirScalarExpr`] into a simpler expression.
1569    ///
1570    /// The reduction happens on a separate thread, we also only wait for
1571    /// `WebhookValidation::MAX_REDUCE_TIME` before timing out and returning an error.
1572    pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1573        let WebhookValidation {
1574            expression,
1575            relation_desc,
1576            ..
1577        } = self;
1578
1579        // On a different thread, attempt to reduce the expression.
1580        let mut expression_ = expression.clone();
1581        let desc_ = relation_desc.clone();
1582        let reduce_task = mz_ore::task::spawn_blocking(
1583            || "webhook-validation-reduce",
1584            move || {
1585                expression_.reduce(&desc_.typ().column_types);
1586                expression_
1587            },
1588        );
1589
1590        match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1591            Ok(reduced_expr) => {
1592                *expression = reduced_expr;
1593                Ok(())
1594            }
1595            Err(_) => Err("timeout"),
1596        }
1597    }
1598}
1599
1600#[derive(Clone, Debug, Default, Serialize)]
1601pub struct WebhookHeaders {
1602    /// Optionally include a column named `headers` whose content is possibly filtered.
1603    pub header_column: Option<WebhookHeaderFilters>,
1604    /// The column index to provide the specific request header, and whether to provide it as bytes.
1605    pub mapped_headers: BTreeMap<usize, (String, bool)>,
1606}
1607
1608impl WebhookHeaders {
1609    /// Returns the number of columns needed to represent our headers.
1610    pub fn num_columns(&self) -> usize {
1611        let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1612        let mapped_headers = self.mapped_headers.len();
1613
1614        header_column + mapped_headers
1615    }
1616}
1617
1618#[derive(Clone, Debug, Default, Serialize)]
1619pub struct WebhookHeaderFilters {
1620    pub block: BTreeSet<String>,
1621    pub allow: BTreeSet<String>,
1622}
1623
1624#[derive(Copy, Clone, Debug, Serialize, Arbitrary)]
1625pub enum WebhookBodyFormat {
1626    Json { array: bool },
1627    Bytes,
1628    Text,
1629}
1630
1631impl From<WebhookBodyFormat> for SqlScalarType {
1632    fn from(value: WebhookBodyFormat) -> Self {
1633        match value {
1634            WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1635            WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1636            WebhookBodyFormat::Text => SqlScalarType::String,
1637        }
1638    }
1639}
1640
1641#[derive(Clone, Debug, Serialize)]
1642pub struct WebhookValidationSecret {
1643    /// Identifies the secret by [`CatalogItemId`].
1644    pub id: CatalogItemId,
1645    /// Column index for the expression context that this secret was originally evaluated in.
1646    pub column_idx: usize,
1647    /// Whether or not this secret should be provided to the expression as Bytes or a String.
1648    pub use_bytes: bool,
1649}
1650
1651#[derive(Clone, Debug)]
1652pub struct Connection {
1653    pub create_sql: String,
1654    pub details: ConnectionDetails,
1655}
1656
1657#[derive(Clone, Debug, Serialize)]
1658pub enum ConnectionDetails {
1659    Kafka(KafkaConnection<ReferencedConnection>),
1660    Csr(CsrConnection<ReferencedConnection>),
1661    Postgres(PostgresConnection<ReferencedConnection>),
1662    Ssh {
1663        connection: SshConnection,
1664        key_1: SshKey,
1665        key_2: SshKey,
1666    },
1667    Aws(AwsConnection),
1668    AwsPrivatelink(AwsPrivatelinkConnection),
1669    MySql(MySqlConnection<ReferencedConnection>),
1670    SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1671    IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1672}
1673
1674impl ConnectionDetails {
1675    pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1676        match self {
1677            ConnectionDetails::Kafka(c) => {
1678                mz_storage_types::connections::Connection::Kafka(c.clone())
1679            }
1680            ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1681            ConnectionDetails::Postgres(c) => {
1682                mz_storage_types::connections::Connection::Postgres(c.clone())
1683            }
1684            ConnectionDetails::Ssh { connection, .. } => {
1685                mz_storage_types::connections::Connection::Ssh(connection.clone())
1686            }
1687            ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1688            ConnectionDetails::AwsPrivatelink(c) => {
1689                mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1690            }
1691            ConnectionDetails::MySql(c) => {
1692                mz_storage_types::connections::Connection::MySql(c.clone())
1693            }
1694            ConnectionDetails::SqlServer(c) => {
1695                mz_storage_types::connections::Connection::SqlServer(c.clone())
1696            }
1697            ConnectionDetails::IcebergCatalog(c) => {
1698                mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1699            }
1700        }
1701    }
1702}
1703
1704#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1705pub struct NetworkPolicyRule {
1706    pub name: String,
1707    pub action: NetworkPolicyRuleAction,
1708    pub address: PolicyAddress,
1709    pub direction: NetworkPolicyRuleDirection,
1710}
1711
1712#[derive(
1713    Debug,
1714    Clone,
1715    Serialize,
1716    Deserialize,
1717    PartialEq,
1718    Eq,
1719    Ord,
1720    PartialOrd,
1721    Hash
1722)]
1723pub enum NetworkPolicyRuleAction {
1724    Allow,
1725}
1726
1727impl std::fmt::Display for NetworkPolicyRuleAction {
1728    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1729        match self {
1730            Self::Allow => write!(f, "allow"),
1731        }
1732    }
1733}
1734impl TryFrom<&str> for NetworkPolicyRuleAction {
1735    type Error = PlanError;
1736    fn try_from(value: &str) -> Result<Self, Self::Error> {
1737        match value.to_uppercase().as_str() {
1738            "ALLOW" => Ok(Self::Allow),
1739            _ => Err(PlanError::Unstructured(
1740                "Allow is the only valid option".into(),
1741            )),
1742        }
1743    }
1744}
1745
1746#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1747pub enum NetworkPolicyRuleDirection {
1748    Ingress,
1749}
1750impl std::fmt::Display for NetworkPolicyRuleDirection {
1751    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1752        match self {
1753            Self::Ingress => write!(f, "ingress"),
1754        }
1755    }
1756}
1757impl TryFrom<&str> for NetworkPolicyRuleDirection {
1758    type Error = PlanError;
1759    fn try_from(value: &str) -> Result<Self, Self::Error> {
1760        match value.to_uppercase().as_str() {
1761            "INGRESS" => Ok(Self::Ingress),
1762            _ => Err(PlanError::Unstructured(
1763                "Ingress is the only valid option".into(),
1764            )),
1765        }
1766    }
1767}
1768
1769#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1770pub struct PolicyAddress(pub IpNet);
1771impl std::fmt::Display for PolicyAddress {
1772    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1773        write!(f, "{}", &self.0.to_string())
1774    }
1775}
1776impl From<String> for PolicyAddress {
1777    fn from(value: String) -> Self {
1778        Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1779    }
1780}
1781impl TryFrom<&str> for PolicyAddress {
1782    type Error = PlanError;
1783    fn try_from(value: &str) -> Result<Self, Self::Error> {
1784        let net = IpNet::from_str(value)
1785            .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1786        Ok(Self(net))
1787    }
1788}
1789
1790impl Serialize for PolicyAddress {
1791    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1792    where
1793        S: serde::Serializer,
1794    {
1795        serializer.serialize_str(&format!("{}", &self.0))
1796    }
1797}
1798
1799#[derive(Clone, Debug, Serialize)]
1800pub enum SshKey {
1801    PublicOnly(String),
1802    Both(SshKeyPair),
1803}
1804
1805impl SshKey {
1806    pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1807        match self {
1808            SshKey::PublicOnly(_) => None,
1809            SshKey::Both(key_pair) => Some(key_pair),
1810        }
1811    }
1812
1813    pub fn public_key(&self) -> String {
1814        match self {
1815            SshKey::PublicOnly(s) => s.into(),
1816            SshKey::Both(p) => p.ssh_public_key(),
1817        }
1818    }
1819}
1820
1821#[derive(Clone, Debug)]
1822pub struct Secret {
1823    pub create_sql: String,
1824    pub secret_as: MirScalarExpr,
1825}
1826
1827#[derive(Clone, Debug)]
1828pub struct Sink {
1829    /// Parse-able SQL that is stored durably and defines this sink.
1830    pub create_sql: String,
1831    /// Collection we read into this sink.
1832    pub from: GlobalId,
1833    /// Type of connection to the external service we sink into.
1834    pub connection: StorageSinkConnection<ReferencedConnection>,
1835    // TODO(guswynn): this probably should just be in the `connection`.
1836    pub envelope: SinkEnvelope,
1837    pub version: u64,
1838    pub commit_interval: Option<Duration>,
1839}
1840
1841#[derive(Clone, Debug)]
1842pub struct View {
1843    /// Parse-able SQL that is stored durably and defines this view.
1844    pub create_sql: String,
1845    /// Unoptimized high-level expression from parsing the `create_sql`.
1846    pub expr: HirRelationExpr,
1847    /// All of the catalog objects that are referenced by this view, according to the `expr`.
1848    pub dependencies: DependencyIds,
1849    /// Columns of this view.
1850    pub column_names: Vec<ColumnName>,
1851    /// If this view is created in the temporary schema, e.g. `CREATE TEMPORARY ...`.
1852    pub temporary: bool,
1853}
1854
1855#[derive(Clone, Debug)]
1856pub struct MaterializedView {
1857    /// Parse-able SQL that is stored durably and defines this materialized view.
1858    pub create_sql: String,
1859    /// Unoptimized high-level expression from parsing the `create_sql`.
1860    pub expr: HirRelationExpr,
1861    /// All of the catalog objects that are referenced by this materialized view, according to the `expr`.
1862    pub dependencies: DependencyIds,
1863    /// Columns of this view.
1864    pub column_names: Vec<ColumnName>,
1865    pub replacement_target: Option<CatalogItemId>,
1866    /// Cluster this materialized view will get installed on.
1867    pub cluster_id: ClusterId,
1868    pub non_null_assertions: Vec<usize>,
1869    pub compaction_window: Option<CompactionWindow>,
1870    pub refresh_schedule: Option<RefreshSchedule>,
1871    pub as_of: Option<Timestamp>,
1872}
1873
1874#[derive(Clone, Debug)]
1875pub struct Index {
1876    /// Parse-able SQL that is stored durably and defines this index.
1877    pub create_sql: String,
1878    /// Collection this index is on top of.
1879    pub on: GlobalId,
1880    pub keys: Vec<mz_expr::MirScalarExpr>,
1881    pub compaction_window: Option<CompactionWindow>,
1882    pub cluster_id: ClusterId,
1883}
1884
1885#[derive(Clone, Debug)]
1886pub struct Type {
1887    pub create_sql: String,
1888    pub inner: CatalogType<IdReference>,
1889}
1890
1891/// Specifies when a `Peek` or `Subscribe` should occur.
1892#[derive(Deserialize, Clone, Debug, PartialEq)]
1893pub enum QueryWhen {
1894    /// The peek should occur at the latest possible timestamp that allows the
1895    /// peek to complete immediately.
1896    Immediately,
1897    /// The peek should occur at a timestamp that allows the peek to see all
1898    /// data written to tables within Materialize.
1899    FreshestTableWrite,
1900    /// The peek should occur at the timestamp described by the specified
1901    /// expression.
1902    ///
1903    /// The expression may have any type.
1904    AtTimestamp(Timestamp),
1905    /// Same as Immediately, but will also advance to at least the specified
1906    /// expression.
1907    AtLeastTimestamp(Timestamp),
1908}
1909
1910impl QueryWhen {
1911    /// Returns a timestamp to which the candidate must be advanced.
1912    pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1913        match self {
1914            QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1915            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1916        }
1917    }
1918    /// Returns whether the candidate's upper bound is constrained.
1919    /// This is only true for `AtTimestamp` since it is the only variant that
1920    /// specifies a timestamp.
1921    pub fn constrains_upper(&self) -> bool {
1922        match self {
1923            QueryWhen::AtTimestamp(_) => true,
1924            QueryWhen::AtLeastTimestamp(_)
1925            | QueryWhen::Immediately
1926            | QueryWhen::FreshestTableWrite => false,
1927        }
1928    }
1929    /// Returns whether the candidate must be advanced to the since.
1930    pub fn advance_to_since(&self) -> bool {
1931        match self {
1932            QueryWhen::Immediately
1933            | QueryWhen::AtLeastTimestamp(_)
1934            | QueryWhen::FreshestTableWrite => true,
1935            QueryWhen::AtTimestamp(_) => false,
1936        }
1937    }
1938    /// Returns whether the candidate can be advanced to the upper.
1939    pub fn can_advance_to_upper(&self) -> bool {
1940        match self {
1941            QueryWhen::Immediately => true,
1942            QueryWhen::FreshestTableWrite
1943            | QueryWhen::AtTimestamp(_)
1944            | QueryWhen::AtLeastTimestamp(_) => false,
1945        }
1946    }
1947
1948    /// Returns whether the candidate can be advanced to the timeline's timestamp.
1949    pub fn can_advance_to_timeline_ts(&self) -> bool {
1950        match self {
1951            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1952            QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1953        }
1954    }
1955    /// Returns whether the candidate must be advanced to the timeline's timestamp.
1956    pub fn must_advance_to_timeline_ts(&self) -> bool {
1957        match self {
1958            QueryWhen::FreshestTableWrite => true,
1959            QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1960                false
1961            }
1962        }
1963    }
1964    /// Returns whether the selected timestamp should be tracked within the current transaction.
1965    pub fn is_transactional(&self) -> bool {
1966        match self {
1967            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1968            QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1969        }
1970    }
1971}
1972
1973#[derive(Debug, Copy, Clone)]
1974pub enum MutationKind {
1975    Insert,
1976    Update,
1977    Delete,
1978}
1979
1980#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1981pub enum CopyFormat {
1982    Text,
1983    Csv,
1984    Binary,
1985    Parquet,
1986}
1987
1988#[derive(Debug, Copy, Clone)]
1989pub enum ExecuteTimeout {
1990    None,
1991    Seconds(f64),
1992    WaitOnce,
1993}
1994
1995#[derive(Clone, Debug)]
1996pub enum IndexOption {
1997    /// Configures the logical compaction window for an index.
1998    RetainHistory(CompactionWindow),
1999}
2000
2001#[derive(Clone, Debug)]
2002pub enum TableOption {
2003    /// Configures the logical compaction window for a table.
2004    RetainHistory(CompactionWindow),
2005}
2006
2007#[derive(Clone, Debug)]
2008pub struct PlanClusterOption {
2009    pub availability_zones: AlterOptionParameter<Vec<String>>,
2010    pub introspection_debugging: AlterOptionParameter<bool>,
2011    pub introspection_interval: AlterOptionParameter<OptionalDuration>,
2012    pub managed: AlterOptionParameter<bool>,
2013    pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
2014    pub replication_factor: AlterOptionParameter<u32>,
2015    pub size: AlterOptionParameter,
2016    pub schedule: AlterOptionParameter<ClusterSchedule>,
2017    pub workload_class: AlterOptionParameter<Option<String>>,
2018}
2019
2020impl Default for PlanClusterOption {
2021    fn default() -> Self {
2022        Self {
2023            availability_zones: AlterOptionParameter::Unchanged,
2024            introspection_debugging: AlterOptionParameter::Unchanged,
2025            introspection_interval: AlterOptionParameter::Unchanged,
2026            managed: AlterOptionParameter::Unchanged,
2027            replicas: AlterOptionParameter::Unchanged,
2028            replication_factor: AlterOptionParameter::Unchanged,
2029            size: AlterOptionParameter::Unchanged,
2030            schedule: AlterOptionParameter::Unchanged,
2031            workload_class: AlterOptionParameter::Unchanged,
2032        }
2033    }
2034}
2035
2036#[derive(Clone, Debug, PartialEq, Eq)]
2037pub enum AlterClusterPlanStrategy {
2038    None,
2039    For(Duration),
2040    UntilReady {
2041        on_timeout: OnTimeoutAction,
2042        timeout: Duration,
2043    },
2044}
2045
2046#[derive(Clone, Debug, PartialEq, Eq)]
2047pub enum OnTimeoutAction {
2048    Commit,
2049    Rollback,
2050}
2051
2052impl Default for OnTimeoutAction {
2053    fn default() -> Self {
2054        Self::Commit
2055    }
2056}
2057
2058impl TryFrom<&str> for OnTimeoutAction {
2059    type Error = PlanError;
2060    fn try_from(value: &str) -> Result<Self, Self::Error> {
2061        match value.to_uppercase().as_str() {
2062            "COMMIT" => Ok(Self::Commit),
2063            "ROLLBACK" => Ok(Self::Rollback),
2064            _ => Err(PlanError::Unstructured(
2065                "Valid options are COMMIT, ROLLBACK".into(),
2066            )),
2067        }
2068    }
2069}
2070
2071impl AlterClusterPlanStrategy {
2072    pub fn is_none(&self) -> bool {
2073        matches!(self, Self::None)
2074    }
2075    pub fn is_some(&self) -> bool {
2076        !matches!(self, Self::None)
2077    }
2078}
2079
2080impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2081    type Error = PlanError;
2082
2083    fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2084        Ok(match value.wait {
2085            Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2086            Some(ClusterAlterOptionValue::UntilReady(options)) => {
2087                let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2088                Self::UntilReady {
2089                    timeout: match extracted.timeout {
2090                        Some(d) => d,
2091                        None => Err(PlanError::UntilReadyTimeoutRequired)?,
2092                    },
2093                    on_timeout: match extracted.on_timeout {
2094                        Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2095                            PlanError::InvalidOptionValue {
2096                                option_name: "ON TIMEOUT".into(),
2097                                err: Box::new(e),
2098                            }
2099                        })?,
2100                        None => OnTimeoutAction::default(),
2101                    },
2102                }
2103            }
2104            None => Self::None,
2105        })
2106    }
2107}
2108
2109/// A vector of values to which parameter references should be bound.
2110#[derive(Debug, Clone)]
2111pub struct Params {
2112    /// The datums that were provided in the EXECUTE statement.
2113    pub datums: Row,
2114    /// The types of the datums provided in the EXECUTE statement.
2115    pub execute_types: Vec<SqlScalarType>,
2116    /// The types that the prepared statement expects based on its definition.
2117    pub expected_types: Vec<SqlScalarType>,
2118}
2119
2120impl Params {
2121    /// Returns a `Params` with no parameters.
2122    pub fn empty() -> Params {
2123        Params {
2124            datums: Row::pack_slice(&[]),
2125            execute_types: vec![],
2126            expected_types: vec![],
2127        }
2128    }
2129}
2130
2131/// Controls planning of a SQL query.
2132#[derive(
2133    Ord,
2134    PartialOrd,
2135    Clone,
2136    Debug,
2137    Eq,
2138    PartialEq,
2139    Serialize,
2140    Deserialize,
2141    Hash,
2142    Copy
2143)]
2144pub struct PlanContext {
2145    pub wall_time: DateTime<Utc>,
2146    pub ignore_if_exists_errors: bool,
2147}
2148
2149impl PlanContext {
2150    pub fn new(wall_time: DateTime<Utc>) -> Self {
2151        Self {
2152            wall_time,
2153            ignore_if_exists_errors: false,
2154        }
2155    }
2156
2157    /// Return a PlanContext with zero values. This should only be used when
2158    /// planning is required but unused (like in `plan_create_table()`) or in
2159    /// tests.
2160    pub fn zero() -> Self {
2161        PlanContext {
2162            wall_time: now::to_datetime(NOW_ZERO()),
2163            ignore_if_exists_errors: false,
2164        }
2165    }
2166
2167    pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2168        self.ignore_if_exists_errors = value;
2169        self
2170    }
2171}