Skip to main content

mz_sql/
plan.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL planning.
11//!
12//! SQL planning is the process of taking the abstract syntax tree of a
13//! [`Statement`] and turning it into a [`Plan`] that the dataflow layer can
14//! execute.
15//!
16//! Statements must be purified before they can be planned. See the
17//! [`pure`](crate::pure) module for details.
18
19// Internal module layout.
20//
21// The entry point for planning is `statement::handle_statement`. That function
22// dispatches to a more specific `handle` function for the particular statement
23// type. For most statements, this `handle` function is uninteresting and short,
24// but anything involving a `SELECT` statement gets complicated. `SELECT`
25// queries wind through the functions in the `query` module, starting with
26// `plan_root_query` and fanning out based on the contents of the `SELECT`
27// statement.
28
29use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{CollectionPlan, ColumnOrder, MapFilterProject, MirScalarExpr, RowSetFinishing};
41use mz_ore::now::{self, NOW_ZERO};
42use mz_pgcopy::CopyFormatParams;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
44use mz_repr::explain::{ExplainConfig, ExplainFormat};
45use mz_repr::network_policy_id::NetworkPolicyId;
46use mz_repr::optimize::OptimizerFeatureOverrides;
47use mz_repr::refresh_schedule::RefreshSchedule;
48use mz_repr::role_id::RoleId;
49use mz_repr::{
50    CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, ReprColumnType, Row,
51    SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
52};
53use mz_sql_parser::ast::{
54    AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
55    RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
56    Value, WithOptionValue,
57};
58use mz_ssh_util::keys::SshKeyPair;
59use mz_storage_types::connections::aws::AwsConnection;
60use mz_storage_types::connections::gcp::{GcpConnection, GcpServiceAccountKeyTokenUri};
61use mz_storage_types::connections::inline::ReferencedConnection;
62use mz_storage_types::connections::{
63    AwsPrivatelinkConnection, CsrConnection, GlueSchemaRegistryConnection,
64    IcebergCatalogConnection, KafkaConnection, MySqlConnection, PostgresConnection,
65    SqlServerConnectionDetails, SshConnection,
66};
67use mz_storage_types::instances::StorageInstanceId;
68use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
69use mz_storage_types::sources::{
70    SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
71};
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74
75use crate::ast::{
76    ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
77    TransactionAccessMode,
78};
79use crate::catalog::{
80    CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
81    RoleAttributesRaw,
82};
83use crate::names::{
84    Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
85    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
86};
87
88pub(crate) mod error;
89pub(crate) mod explain;
90pub(crate) mod hir;
91pub(crate) mod literal;
92pub(crate) mod lowering;
93pub(crate) mod notice;
94pub(crate) mod plan_utils;
95pub(crate) mod query;
96pub(crate) mod scope;
97pub(crate) mod side_effecting_func;
98pub(crate) mod statement;
99pub(crate) mod transform_ast;
100pub(crate) mod transform_hir;
101pub(crate) mod typeconv;
102pub(crate) mod with_options;
103
104use crate::plan;
105use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
106use crate::plan::with_options::OptionalDuration;
107pub use error::PlanError;
108pub use explain::normalize_subqueries;
109pub use hir::{
110    AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
111    WindowExprType,
112};
113pub use lowering::Config as HirToMirConfig;
114pub use notice::PlanNotice;
115pub use query::{ExprContext, QueryContext, QueryLifetime};
116pub use scope::Scope;
117pub use side_effecting_func::SideEffectingFunc;
118pub use statement::ddl::{
119    AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
120    PlannedAlterRoleOption, PlannedRoleAttributes, PlannedRoleVariable,
121    SqlServerConfigOptionExtracted,
122};
123pub use statement::{
124    StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
125    resolve_cluster_for_materialized_view,
126};
127
128use self::statement::ddl::ClusterAlterOptionExtracted;
129use self::with_options::TryFromValue;
130
131/// Instructions for executing a SQL query.
132#[derive(Debug, EnumKind)]
133#[enum_kind(PlanKind)]
134pub enum Plan {
135    CreateConnection(CreateConnectionPlan),
136    CreateDatabase(CreateDatabasePlan),
137    CreateSchema(CreateSchemaPlan),
138    CreateRole(CreateRolePlan),
139    CreateCluster(CreateClusterPlan),
140    CreateClusterReplica(CreateClusterReplicaPlan),
141    CreateSource(CreateSourcePlan),
142    CreateSources(Vec<CreateSourcePlanBundle>),
143    CreateSecret(CreateSecretPlan),
144    CreateSink(CreateSinkPlan),
145    CreateTable(CreateTablePlan),
146    CreateView(CreateViewPlan),
147    CreateMaterializedView(CreateMaterializedViewPlan),
148    CreateNetworkPolicy(CreateNetworkPolicyPlan),
149    CreateIndex(CreateIndexPlan),
150    CreateType(CreateTypePlan),
151    Comment(CommentPlan),
152    DiscardTemp,
153    DiscardAll,
154    DropObjects(DropObjectsPlan),
155    DropOwned(DropOwnedPlan),
156    EmptyQuery,
157    ShowAllVariables,
158    ShowCreate(ShowCreatePlan),
159    ShowColumns(ShowColumnsPlan),
160    ShowVariable(ShowVariablePlan),
161    InspectShard(InspectShardPlan),
162    SetVariable(SetVariablePlan),
163    ResetVariable(ResetVariablePlan),
164    SetTransaction(SetTransactionPlan),
165    StartTransaction(StartTransactionPlan),
166    CommitTransaction(CommitTransactionPlan),
167    AbortTransaction(AbortTransactionPlan),
168    Select(SelectPlan),
169    Subscribe(SubscribePlan),
170    CopyFrom(CopyFromPlan),
171    CopyTo(CopyToPlan),
172    ExplainPlan(ExplainPlanPlan),
173    ExplainPushdown(ExplainPushdownPlan),
174    ExplainTimestamp(ExplainTimestampPlan),
175    ExplainSinkSchema(ExplainSinkSchemaPlan),
176    Insert(InsertPlan),
177    AlterCluster(AlterClusterPlan),
178    AlterClusterSwap(AlterClusterSwapPlan),
179    AlterNoop(AlterNoopPlan),
180    AlterSetCluster(AlterSetClusterPlan),
181    AlterConnection(AlterConnectionPlan),
182    AlterSource(AlterSourcePlan),
183    AlterClusterRename(AlterClusterRenamePlan),
184    AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
185    AlterItemRename(AlterItemRenamePlan),
186    AlterSchemaRename(AlterSchemaRenamePlan),
187    AlterSchemaSwap(AlterSchemaSwapPlan),
188    AlterSecret(AlterSecretPlan),
189    AlterSink(AlterSinkPlan),
190    AlterSystemSet(AlterSystemSetPlan),
191    AlterSystemReset(AlterSystemResetPlan),
192    AlterSystemResetAll(AlterSystemResetAllPlan),
193    AlterRole(AlterRolePlan),
194    AlterOwner(AlterOwnerPlan),
195    AlterTableAddColumn(AlterTablePlan),
196    AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementPlan),
197    AlterNetworkPolicy(AlterNetworkPolicyPlan),
198    Declare(DeclarePlan),
199    Fetch(FetchPlan),
200    Close(ClosePlan),
201    ReadThenWrite(ReadThenWritePlan),
202    Prepare(PreparePlan),
203    Execute(ExecutePlan),
204    Deallocate(DeallocatePlan),
205    Raise(RaisePlan),
206    GrantRole(GrantRolePlan),
207    RevokeRole(RevokeRolePlan),
208    GrantPrivileges(GrantPrivilegesPlan),
209    RevokePrivileges(RevokePrivilegesPlan),
210    AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
211    ReassignOwned(ReassignOwnedPlan),
212    SideEffectingFunc(SideEffectingFunc),
213    ValidateConnection(ValidateConnectionPlan),
214    AlterRetainHistory(AlterRetainHistoryPlan),
215    AlterSourceTimestampInterval(AlterSourceTimestampIntervalPlan),
216}
217
218impl Plan {
219    /// Expresses which [`StatementKind`] can generate which set of
220    /// [`PlanKind`].
221    pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
222        match stmt {
223            StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
224            StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
225            StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
226            StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
227            StatementKind::AlterObjectRename => &[
228                PlanKind::AlterClusterRename,
229                PlanKind::AlterClusterReplicaRename,
230                PlanKind::AlterItemRename,
231                PlanKind::AlterSchemaRename,
232                PlanKind::AlterNoop,
233            ],
234            StatementKind::AlterObjectSwap => &[
235                PlanKind::AlterClusterSwap,
236                PlanKind::AlterSchemaSwap,
237                PlanKind::AlterNoop,
238            ],
239            StatementKind::AlterRole => &[PlanKind::AlterRole],
240            StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
241            StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
242            StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
243            StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
244            StatementKind::AlterSource => &[
245                PlanKind::AlterNoop,
246                PlanKind::AlterSource,
247                PlanKind::AlterRetainHistory,
248                PlanKind::AlterSourceTimestampInterval,
249            ],
250            StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
251            StatementKind::AlterSystemResetAll => {
252                &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
253            }
254            StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
255            StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
256            StatementKind::AlterTableAddColumn => {
257                &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
258            }
259            StatementKind::AlterMaterializedViewApplyReplacement => &[
260                PlanKind::AlterNoop,
261                PlanKind::AlterMaterializedViewApplyReplacement,
262            ],
263            StatementKind::Close => &[PlanKind::Close],
264            StatementKind::Comment => &[PlanKind::Comment],
265            StatementKind::Commit => &[PlanKind::CommitTransaction],
266            StatementKind::Copy => &[
267                PlanKind::CopyFrom,
268                PlanKind::Select,
269                PlanKind::Subscribe,
270                PlanKind::CopyTo,
271            ],
272            StatementKind::CreateCluster => &[PlanKind::CreateCluster],
273            StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
274            StatementKind::CreateConnection => &[PlanKind::CreateConnection],
275            StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
276            StatementKind::CreateIndex => &[PlanKind::CreateIndex],
277            StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
278            StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
279            StatementKind::CreateRole => &[PlanKind::CreateRole],
280            StatementKind::CreateSchema => &[PlanKind::CreateSchema],
281            StatementKind::CreateSecret => &[PlanKind::CreateSecret],
282            StatementKind::CreateSink => &[PlanKind::CreateSink],
283            StatementKind::CreateSource | StatementKind::CreateSubsource => {
284                &[PlanKind::CreateSource]
285            }
286            StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
287            StatementKind::CreateTable => &[PlanKind::CreateTable],
288            StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
289            StatementKind::CreateType => &[PlanKind::CreateType],
290            StatementKind::CreateView => &[PlanKind::CreateView],
291            StatementKind::Deallocate => &[PlanKind::Deallocate],
292            StatementKind::Declare => &[PlanKind::Declare],
293            StatementKind::Delete => &[PlanKind::ReadThenWrite],
294            StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
295            StatementKind::DropObjects => &[PlanKind::DropObjects],
296            StatementKind::DropOwned => &[PlanKind::DropOwned],
297            StatementKind::Execute => &[PlanKind::Execute],
298            StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
299            StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
300            StatementKind::ExplainAnalyzeObject => &[PlanKind::Select],
301            StatementKind::ExplainAnalyzeCluster => &[PlanKind::Select],
302            StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
303            StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
304            StatementKind::Fetch => &[PlanKind::Fetch],
305            StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
306            StatementKind::GrantRole => &[PlanKind::GrantRole],
307            StatementKind::Insert => &[PlanKind::Insert],
308            StatementKind::Prepare => &[PlanKind::Prepare],
309            StatementKind::Raise => &[PlanKind::Raise],
310            StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
311            StatementKind::ResetVariable => &[PlanKind::ResetVariable],
312            StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
313            StatementKind::RevokeRole => &[PlanKind::RevokeRole],
314            StatementKind::Rollback => &[PlanKind::AbortTransaction],
315            StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
316            StatementKind::SetTransaction => &[PlanKind::SetTransaction],
317            StatementKind::SetVariable => &[PlanKind::SetVariable],
318            StatementKind::Show => &[
319                PlanKind::Select,
320                PlanKind::ShowVariable,
321                PlanKind::ShowCreate,
322                PlanKind::ShowColumns,
323                PlanKind::ShowAllVariables,
324                PlanKind::InspectShard,
325            ],
326            StatementKind::StartTransaction => &[PlanKind::StartTransaction],
327            StatementKind::Subscribe => &[PlanKind::Subscribe],
328            StatementKind::Update => &[PlanKind::ReadThenWrite],
329            StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
330            StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
331            StatementKind::ExecuteUnitTest => &[],
332        }
333    }
334
335    /// Returns a human readable name of the plan. Meant for use in messages sent back to a user.
336    pub fn name(&self) -> &str {
337        match self {
338            Plan::CreateConnection(_) => "create connection",
339            Plan::CreateDatabase(_) => "create database",
340            Plan::CreateSchema(_) => "create schema",
341            Plan::CreateRole(_) => "create role",
342            Plan::CreateCluster(_) => "create cluster",
343            Plan::CreateClusterReplica(_) => "create cluster replica",
344            Plan::CreateSource(_) => "create source",
345            Plan::CreateSources(_) => "create source",
346            Plan::CreateSecret(_) => "create secret",
347            Plan::CreateSink(_) => "create sink",
348            Plan::CreateTable(_) => "create table",
349            Plan::CreateView(_) => "create view",
350            Plan::CreateMaterializedView(_) => "create materialized view",
351            Plan::CreateIndex(_) => "create index",
352            Plan::CreateType(_) => "create type",
353            Plan::CreateNetworkPolicy(_) => "create network policy",
354            Plan::Comment(_) => "comment",
355            Plan::DiscardTemp => "discard temp",
356            Plan::DiscardAll => "discard all",
357            Plan::DropObjects(plan) => match plan.object_type {
358                ObjectType::Table => "drop table",
359                ObjectType::View => "drop view",
360                ObjectType::MaterializedView => "drop materialized view",
361                ObjectType::Source => "drop source",
362                ObjectType::Sink => "drop sink",
363                ObjectType::Index => "drop index",
364                ObjectType::Type => "drop type",
365                ObjectType::Role => "drop roles",
366                ObjectType::Cluster => "drop clusters",
367                ObjectType::ClusterReplica => "drop cluster replicas",
368                ObjectType::Secret => "drop secret",
369                ObjectType::Connection => "drop connection",
370                ObjectType::Database => "drop database",
371                ObjectType::Schema => "drop schema",
372                ObjectType::Func => "drop function",
373                ObjectType::NetworkPolicy => "drop network policy",
374            },
375            Plan::DropOwned(_) => "drop owned",
376            Plan::EmptyQuery => "do nothing",
377            Plan::ShowAllVariables => "show all variables",
378            Plan::ShowCreate(_) => "show create",
379            Plan::ShowColumns(_) => "show columns",
380            Plan::ShowVariable(_) => "show variable",
381            Plan::InspectShard(_) => "inspect shard",
382            Plan::SetVariable(_) => "set variable",
383            Plan::ResetVariable(_) => "reset variable",
384            Plan::SetTransaction(_) => "set transaction",
385            Plan::StartTransaction(_) => "start transaction",
386            Plan::CommitTransaction(_) => "commit",
387            Plan::AbortTransaction(_) => "abort",
388            Plan::Select(_) => "select",
389            Plan::Subscribe(_) => "subscribe",
390            Plan::CopyFrom(_) => "copy from",
391            Plan::CopyTo(_) => "copy to",
392            Plan::ExplainPlan(_) => "explain plan",
393            Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
394            Plan::ExplainTimestamp(_) => "explain timestamp",
395            Plan::ExplainSinkSchema(_) => "explain schema",
396            Plan::Insert(_) => "insert",
397            Plan::AlterNoop(plan) => match plan.object_type {
398                ObjectType::Table => "alter table",
399                ObjectType::View => "alter view",
400                ObjectType::MaterializedView => "alter materialized view",
401                ObjectType::Source => "alter source",
402                ObjectType::Sink => "alter sink",
403                ObjectType::Index => "alter index",
404                ObjectType::Type => "alter type",
405                ObjectType::Role => "alter role",
406                ObjectType::Cluster => "alter cluster",
407                ObjectType::ClusterReplica => "alter cluster replica",
408                ObjectType::Secret => "alter secret",
409                ObjectType::Connection => "alter connection",
410                ObjectType::Database => "alter database",
411                ObjectType::Schema => "alter schema",
412                ObjectType::Func => "alter function",
413                ObjectType::NetworkPolicy => "alter network policy",
414            },
415            Plan::AlterCluster(_) => "alter cluster",
416            Plan::AlterClusterRename(_) => "alter cluster rename",
417            Plan::AlterClusterSwap(_) => "alter cluster swap",
418            Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
419            Plan::AlterSetCluster(_) => "alter set cluster",
420            Plan::AlterConnection(_) => "alter connection",
421            Plan::AlterSource(_) => "alter source",
422            Plan::AlterItemRename(_) => "rename item",
423            Plan::AlterSchemaRename(_) => "alter rename schema",
424            Plan::AlterSchemaSwap(_) => "alter swap schema",
425            Plan::AlterSecret(_) => "alter secret",
426            Plan::AlterSink(_) => "alter sink",
427            Plan::AlterSystemSet(_) => "alter system",
428            Plan::AlterSystemReset(_) => "alter system",
429            Plan::AlterSystemResetAll(_) => "alter system",
430            Plan::AlterRole(_) => "alter role",
431            Plan::AlterNetworkPolicy(_) => "alter network policy",
432            Plan::AlterOwner(plan) => match plan.object_type {
433                ObjectType::Table => "alter table owner",
434                ObjectType::View => "alter view owner",
435                ObjectType::MaterializedView => "alter materialized view owner",
436                ObjectType::Source => "alter source owner",
437                ObjectType::Sink => "alter sink owner",
438                ObjectType::Index => "alter index owner",
439                ObjectType::Type => "alter type owner",
440                ObjectType::Role => "alter role owner",
441                ObjectType::Cluster => "alter cluster owner",
442                ObjectType::ClusterReplica => "alter cluster replica owner",
443                ObjectType::Secret => "alter secret owner",
444                ObjectType::Connection => "alter connection owner",
445                ObjectType::Database => "alter database owner",
446                ObjectType::Schema => "alter schema owner",
447                ObjectType::Func => "alter function owner",
448                ObjectType::NetworkPolicy => "alter network policy owner",
449            },
450            Plan::AlterTableAddColumn(_) => "alter table add column",
451            Plan::AlterMaterializedViewApplyReplacement(_) => {
452                "alter materialized view apply replacement"
453            }
454            Plan::Declare(_) => "declare",
455            Plan::Fetch(_) => "fetch",
456            Plan::Close(_) => "close",
457            Plan::ReadThenWrite(plan) => match plan.kind {
458                MutationKind::Insert => "insert into select",
459                MutationKind::Update => "update",
460                MutationKind::Delete => "delete",
461            },
462            Plan::Prepare(_) => "prepare",
463            Plan::Execute(_) => "execute",
464            Plan::Deallocate(_) => "deallocate",
465            Plan::Raise(_) => "raise",
466            Plan::GrantRole(_) => "grant role",
467            Plan::RevokeRole(_) => "revoke role",
468            Plan::GrantPrivileges(_) => "grant privilege",
469            Plan::RevokePrivileges(_) => "revoke privilege",
470            Plan::AlterDefaultPrivileges(_) => "alter default privileges",
471            Plan::ReassignOwned(_) => "reassign owned",
472            Plan::SideEffectingFunc(_) => "side effecting func",
473            Plan::ValidateConnection(_) => "validate connection",
474            Plan::AlterRetainHistory(_) => "alter retain history",
475            Plan::AlterSourceTimestampInterval(_) => "alter source timestamp interval",
476        }
477    }
478
479    /// Returns `true` iff this `Plan` is allowed to be executed in read-only
480    /// mode.
481    ///
482    /// We use an explicit allow-list, to avoid future additions automatically
483    /// falling into the `true` category.
484    pub fn allowed_in_read_only(&self) -> bool {
485        match self {
486            // These two set non-durable session variables, so are okay in
487            // read-only mode.
488            Plan::SetVariable(_) => true,
489            Plan::ResetVariable(_) => true,
490            Plan::SetTransaction(_) => true,
491            Plan::StartTransaction(_) => true,
492            Plan::CommitTransaction(_) => true,
493            Plan::AbortTransaction(_) => true,
494            Plan::Select(_) => true,
495            Plan::EmptyQuery => true,
496            Plan::ShowAllVariables => true,
497            Plan::ShowCreate(_) => true,
498            Plan::ShowColumns(_) => true,
499            Plan::ShowVariable(_) => true,
500            Plan::InspectShard(_) => true,
501            Plan::Subscribe(_) => true,
502            Plan::CopyTo(_) => true,
503            Plan::ExplainPlan(_) => true,
504            Plan::ExplainPushdown(_) => true,
505            Plan::ExplainTimestamp(_) => true,
506            Plan::ExplainSinkSchema(_) => true,
507            Plan::ValidateConnection(_) => true,
508            _ => false,
509        }
510    }
511}
512
513#[derive(Debug)]
514pub struct StartTransactionPlan {
515    pub access: Option<TransactionAccessMode>,
516    pub isolation_level: Option<TransactionIsolationLevel>,
517}
518
519#[derive(Debug)]
520pub enum TransactionType {
521    Explicit,
522    Implicit,
523}
524
525impl TransactionType {
526    pub fn is_explicit(&self) -> bool {
527        matches!(self, TransactionType::Explicit)
528    }
529
530    pub fn is_implicit(&self) -> bool {
531        matches!(self, TransactionType::Implicit)
532    }
533}
534
535#[derive(Debug)]
536pub struct CommitTransactionPlan {
537    pub transaction_type: TransactionType,
538}
539
540#[derive(Debug)]
541pub struct AbortTransactionPlan {
542    pub transaction_type: TransactionType,
543}
544
545#[derive(Debug)]
546pub struct CreateDatabasePlan {
547    pub name: String,
548    pub if_not_exists: bool,
549}
550
551#[derive(Debug)]
552pub struct CreateSchemaPlan {
553    pub database_spec: ResolvedDatabaseSpecifier,
554    pub schema_name: String,
555    pub if_not_exists: bool,
556}
557
558#[derive(Debug)]
559pub struct CreateRolePlan {
560    pub name: String,
561    pub attributes: RoleAttributesRaw,
562}
563
564#[derive(Debug, PartialEq, Eq, Clone)]
565pub struct CreateClusterPlan {
566    pub name: String,
567    pub variant: CreateClusterVariant,
568    pub workload_class: Option<String>,
569}
570
571#[derive(Debug, PartialEq, Eq, Clone)]
572pub enum CreateClusterVariant {
573    Managed(CreateClusterManagedPlan),
574    Unmanaged(CreateClusterUnmanagedPlan),
575}
576
577#[derive(Debug, PartialEq, Eq, Clone)]
578pub struct CreateClusterUnmanagedPlan {
579    pub replicas: Vec<(String, ReplicaConfig)>,
580}
581
582#[derive(Debug, PartialEq, Eq, Clone)]
583pub struct CreateClusterManagedPlan {
584    pub replication_factor: u32,
585    pub size: String,
586    pub availability_zones: Vec<String>,
587    pub compute: ComputeReplicaConfig,
588    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
589    pub schedule: ClusterSchedule,
590}
591
592#[derive(Debug)]
593pub struct CreateClusterReplicaPlan {
594    pub cluster_id: ClusterId,
595    pub name: String,
596    pub config: ReplicaConfig,
597}
598
599/// Configuration of introspection for a cluster replica.
600#[derive(
601    Clone,
602    Copy,
603    Debug,
604    Serialize,
605    Deserialize,
606    PartialOrd,
607    Ord,
608    PartialEq,
609    Eq
610)]
611pub struct ComputeReplicaIntrospectionConfig {
612    /// Whether to introspect the introspection.
613    pub debugging: bool,
614    /// The interval at which to introspect.
615    pub interval: Duration,
616}
617
618#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
619pub struct ComputeReplicaConfig {
620    pub introspection: Option<ComputeReplicaIntrospectionConfig>,
621}
622
623#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
624pub enum ReplicaConfig {
625    Unorchestrated {
626        storagectl_addrs: Vec<String>,
627        computectl_addrs: Vec<String>,
628        compute: ComputeReplicaConfig,
629    },
630    Orchestrated {
631        size: String,
632        availability_zone: Option<String>,
633        compute: ComputeReplicaConfig,
634        internal: bool,
635        billed_as: Option<String>,
636    },
637}
638
639#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
640pub enum ClusterSchedule {
641    /// The system won't automatically turn the cluster On or Off.
642    Manual,
643    /// The cluster will be On when a REFRESH materialized view on it needs to refresh.
644    /// `hydration_time_estimate` determines how much time before a refresh to turn the
645    /// cluster On, so that it can rehydrate already before the refresh time.
646    Refresh { hydration_time_estimate: Duration },
647}
648
649impl Default for ClusterSchedule {
650    fn default() -> Self {
651        // (Has to be consistent with `impl Default for ClusterScheduleOptionValue`.)
652        ClusterSchedule::Manual
653    }
654}
655
656/// The user-configured autoscaling policy of a managed cluster.
657///
658/// Extensible: future strategies are added as additional optional sub-policies,
659/// so the block as a whole can grow without changing existing ones.
660#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
661pub struct AutoScalingStrategy {
662    pub on_hydration: Option<OnHydration>,
663}
664
665/// The `ON HYDRATION` autoscaling sub-policy: while objects are un-hydrated, run
666/// an extra replica at `hydration_size` to accelerate hydration.
667#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
668pub struct OnHydration {
669    pub hydration_size: String,
670    /// How long the burst replica lingers after the steady-state replicas
671    /// hydrate. `None` falls back to the system default at the controller.
672    pub linger_duration: Option<Duration>,
673}
674
675#[derive(Debug)]
676pub struct CreateSourcePlan {
677    pub name: QualifiedItemName,
678    pub source: Source,
679    pub if_not_exists: bool,
680    pub timeline: Timeline,
681    // None for subsources, which run on the parent cluster.
682    pub in_cluster: Option<ClusterId>,
683}
684
685#[derive(Clone, Debug, PartialEq, Eq)]
686pub struct SourceReferences {
687    pub updated_at: u64,
688    pub references: Vec<SourceReference>,
689}
690
691/// An available external reference for a source and if possible to retrieve,
692/// any column names it contains.
693#[derive(Clone, Debug, PartialEq, Eq)]
694pub struct SourceReference {
695    pub name: String,
696    pub namespace: Option<String>,
697    pub columns: Vec<String>,
698}
699
700/// A [`CreateSourcePlan`] and the metadata necessary to sequence it.
701#[derive(Debug)]
702pub struct CreateSourcePlanBundle {
703    /// ID of this source in the Catalog.
704    pub item_id: CatalogItemId,
705    /// ID used to reference this source from outside the catalog, e.g. compute.
706    pub global_id: GlobalId,
707    /// Details of the source to create.
708    pub plan: CreateSourcePlan,
709    /// Other catalog objects that are referenced by this source, determined at name resolution.
710    pub resolved_ids: ResolvedIds,
711    /// All the available upstream references for this source.
712    /// Populated for top-level sources that can contain subsources/tables
713    /// and used during sequencing to populate the appropriate catalog fields.
714    pub available_source_references: Option<SourceReferences>,
715}
716
717#[derive(Debug)]
718pub struct CreateConnectionPlan {
719    pub name: QualifiedItemName,
720    pub if_not_exists: bool,
721    pub connection: Connection,
722    pub validate: bool,
723}
724
725#[derive(Debug)]
726pub struct ValidateConnectionPlan {
727    /// ID of the connection in the Catalog.
728    pub id: CatalogItemId,
729    /// The connection to validate.
730    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
731}
732
733#[derive(Debug)]
734pub struct CreateSecretPlan {
735    pub name: QualifiedItemName,
736    pub secret: Secret,
737    pub if_not_exists: bool,
738}
739
740#[derive(Debug)]
741pub struct CreateSinkPlan {
742    pub name: QualifiedItemName,
743    pub sink: Sink,
744    pub with_snapshot: bool,
745    pub if_not_exists: bool,
746    pub in_cluster: ClusterId,
747}
748
749#[derive(Debug)]
750pub struct CreateTablePlan {
751    pub name: QualifiedItemName,
752    pub table: Table,
753    pub if_not_exists: bool,
754}
755
756#[derive(Debug, Clone)]
757pub struct CreateViewPlan {
758    pub name: QualifiedItemName,
759    pub view: View,
760    /// The Catalog objects that this view is replacing, if any.
761    pub replace: Option<CatalogItemId>,
762    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
763    pub drop_ids: Vec<CatalogItemId>,
764    pub if_not_exists: bool,
765    /// True if the view contains an expression that can make the exact column list
766    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
767    pub ambiguous_columns: bool,
768}
769
770#[derive(Debug, Clone)]
771pub struct CreateMaterializedViewPlan {
772    pub name: QualifiedItemName,
773    pub materialized_view: MaterializedView,
774    /// The Catalog objects that this materialized view is replacing, if any.
775    pub replace: Option<CatalogItemId>,
776    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
777    pub drop_ids: Vec<CatalogItemId>,
778    pub if_not_exists: bool,
779    /// True if the materialized view contains an expression that can make the exact column list
780    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
781    pub ambiguous_columns: bool,
782}
783
784#[derive(Debug, Clone)]
785pub struct CreateNetworkPolicyPlan {
786    pub name: String,
787    pub rules: Vec<NetworkPolicyRule>,
788}
789
790#[derive(Debug, Clone)]
791pub struct AlterNetworkPolicyPlan {
792    pub id: NetworkPolicyId,
793    pub name: String,
794    pub rules: Vec<NetworkPolicyRule>,
795}
796
797#[derive(Debug, Clone)]
798pub struct CreateIndexPlan {
799    pub name: QualifiedItemName,
800    pub index: Index,
801    pub if_not_exists: bool,
802}
803
804#[derive(Debug)]
805pub struct CreateTypePlan {
806    pub name: QualifiedItemName,
807    pub typ: Type,
808}
809
810#[derive(Debug)]
811pub struct DropObjectsPlan {
812    /// The IDs of only the objects directly referenced in the `DROP` statement.
813    pub referenced_ids: Vec<ObjectId>,
814    /// All object IDs to drop. Includes `referenced_ids` and all descendants.
815    pub drop_ids: Vec<ObjectId>,
816    /// The type of object that was dropped explicitly in the DROP statement. `ids` may contain
817    /// objects of different types due to CASCADE.
818    pub object_type: ObjectType,
819}
820
821#[derive(Debug)]
822pub struct DropOwnedPlan {
823    /// The role IDs that own the objects.
824    pub role_ids: Vec<RoleId>,
825    /// All object IDs to drop.
826    pub drop_ids: Vec<ObjectId>,
827    /// The privileges to revoke.
828    pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
829    /// The default privileges to revoke.
830    pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
831}
832
833#[derive(Debug)]
834pub struct ShowVariablePlan {
835    pub name: String,
836}
837
838#[derive(Debug)]
839pub struct InspectShardPlan {
840    /// ID of the storage collection to inspect.
841    pub id: GlobalId,
842}
843
844#[derive(Debug)]
845pub struct SetVariablePlan {
846    pub name: String,
847    pub value: VariableValue,
848    pub local: bool,
849}
850
851#[derive(Debug)]
852pub enum VariableValue {
853    Default,
854    Values(Vec<String>),
855}
856
857#[derive(Debug)]
858pub struct ResetVariablePlan {
859    pub name: String,
860}
861
862#[derive(Debug)]
863pub struct SetTransactionPlan {
864    pub local: bool,
865    pub modes: Vec<TransactionMode>,
866}
867
868/// A plan for select statements.
869#[derive(Clone, Debug)]
870pub struct SelectPlan {
871    /// The `SELECT` statement itself. Used for explain/notices, but not otherwise
872    /// load-bearing. Boxed to save stack space.
873    pub select: Option<Box<SelectStatement<Aug>>>,
874    /// The plan as a HIR.
875    pub source: HirRelationExpr,
876    /// At what time should this select happen?
877    pub when: QueryWhen,
878    /// Instructions how to form the result set.
879    pub finishing: RowSetFinishing,
880    /// For `COPY TO STDOUT`, the format to use.
881    pub copy_to: Option<CopyFormat>,
882}
883
884impl SelectPlan {
885    pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
886        let arity = typ.arity();
887        SelectPlan {
888            select: None,
889            source: HirRelationExpr::Constant { rows, typ },
890            when: QueryWhen::Immediately,
891            finishing: RowSetFinishing::trivial(arity),
892            copy_to: None,
893        }
894    }
895}
896
897#[derive(Debug, Clone)]
898pub enum SubscribeOutput {
899    Diffs,
900    WithinTimestampOrderBy {
901        /// We pretend that mz_diff is prepended to the normal columns, making it index 0
902        order_by: Vec<ColumnOrder>,
903    },
904    EnvelopeUpsert {
905        /// Order by with just keys
906        order_by_keys: Vec<ColumnOrder>,
907    },
908    EnvelopeDebezium {
909        /// Order by with just keys
910        order_by_keys: Vec<ColumnOrder>,
911    },
912}
913
914impl SubscribeOutput {
915    pub fn row_order(&self) -> &[ColumnOrder] {
916        match self {
917            SubscribeOutput::Diffs => &[],
918            // This ordering prepends the diff, so its `order_by` field cannot be applied to rows.
919            SubscribeOutput::WithinTimestampOrderBy { .. } => &[],
920            SubscribeOutput::EnvelopeUpsert { order_by_keys } => order_by_keys,
921            SubscribeOutput::EnvelopeDebezium { order_by_keys } => order_by_keys,
922        }
923    }
924}
925
926#[derive(Debug, Clone)]
927pub struct SubscribePlan {
928    pub from: SubscribeFrom,
929    pub with_snapshot: bool,
930    pub when: QueryWhen,
931    pub up_to: Option<Timestamp>,
932    pub copy_to: Option<CopyFormat>,
933    pub emit_progress: bool,
934    pub output: SubscribeOutput,
935}
936
937#[derive(Debug, Clone)]
938pub enum SubscribeFrom {
939    /// ID of the collection to subscribe to.
940    Id(GlobalId),
941    /// Query to subscribe to.
942    Query {
943        expr: HirRelationExpr,
944        desc: RelationDesc,
945    },
946}
947
948impl SubscribeFrom {
949    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
950        match self {
951            SubscribeFrom::Id(id) => BTreeSet::from([*id]),
952            SubscribeFrom::Query { expr, .. } => expr.depends_on(),
953        }
954    }
955
956    pub fn contains_temporal(&self) -> bool {
957        match self {
958            SubscribeFrom::Id(_) => false,
959            SubscribeFrom::Query { expr, .. } => expr.contains_temporal(),
960        }
961    }
962}
963
964#[derive(Debug)]
965pub struct ShowCreatePlan {
966    pub id: ObjectId,
967    pub row: Row,
968}
969
970#[derive(Debug)]
971pub struct ShowColumnsPlan {
972    pub id: CatalogItemId,
973    pub select_plan: SelectPlan,
974    pub new_resolved_ids: ResolvedIds,
975}
976
977#[derive(Debug)]
978pub struct CopyFromPlan {
979    /// Table we're copying into.
980    pub target_id: CatalogItemId,
981    /// Human-readable full name of the target table.
982    pub target_name: String,
983    /// Source we're copying data from.
984    pub source: CopyFromSource,
985    /// How input columns map to those on the destination table.
986    ///
987    /// TODO(cf2): Remove this field in favor of the mfp.
988    pub columns: Vec<ColumnIndex>,
989    /// [`RelationDesc`] describing the input data.
990    pub source_desc: RelationDesc,
991    /// Changes the shape of the input data to match the destination table.
992    pub mfp: MapFilterProject,
993    /// Format specific params for copying the input data.
994    pub params: CopyFormatParams<'static>,
995    /// Filter for the source files we're copying from, e.g. an S3 prefix.
996    pub filter: Option<CopyFromFilter>,
997}
998
999#[derive(Debug)]
1000pub enum CopyFromSource {
1001    /// Copying from a file local to the user, transmitted via pgwire.
1002    Stdin,
1003    /// A remote resource, e.g. HTTP file.
1004    ///
1005    /// The contained [`HirScalarExpr`] evaluates to the Url for the remote resource.
1006    Url(HirScalarExpr),
1007    /// A file in an S3 bucket.
1008    AwsS3 {
1009        /// Expression that evaluates to the file we want to copy.
1010        uri: HirScalarExpr,
1011        /// Details for how we connect to AWS S3.
1012        connection: AwsConnection,
1013        /// ID of the connection object.
1014        connection_id: CatalogItemId,
1015    },
1016}
1017
1018#[derive(Debug)]
1019pub enum CopyFromFilter {
1020    Files(Vec<String>),
1021    Pattern(String),
1022}
1023
1024/// `COPY TO S3`
1025///
1026/// (This is a completely different thing from `COPY TO STDOUT`. That is a `Plan::Select` with
1027/// `copy_to` set.)
1028#[derive(Debug, Clone)]
1029pub struct CopyToPlan {
1030    /// The select query plan whose data will be copied to destination uri.
1031    pub select_plan: SelectPlan,
1032    pub desc: RelationDesc,
1033    /// The scalar expression to be resolved to get the destination uri.
1034    pub to: HirScalarExpr,
1035    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
1036    /// The ID of the connection.
1037    pub connection_id: CatalogItemId,
1038    pub format: S3SinkFormat,
1039    pub max_file_size: u64,
1040}
1041
1042#[derive(Clone, Debug)]
1043pub struct ExplainPlanPlan {
1044    pub stage: ExplainStage,
1045    pub format: ExplainFormat,
1046    pub config: ExplainConfig,
1047    pub explainee: Explainee,
1048}
1049
1050/// The type of object to be explained
1051#[derive(Clone, Debug)]
1052pub enum Explainee {
1053    /// Lookup and explain a plan saved for an view.
1054    View(CatalogItemId),
1055    /// Lookup and explain a plan saved for an existing materialized view.
1056    MaterializedView(CatalogItemId),
1057    /// Lookup and explain a plan saved for an existing index.
1058    Index(CatalogItemId),
1059    /// Replan an existing view.
1060    ReplanView(CatalogItemId),
1061    /// Replan an existing materialized view.
1062    ReplanMaterializedView(CatalogItemId),
1063    /// Replan an existing index.
1064    ReplanIndex(CatalogItemId),
1065    /// A SQL statement.
1066    Statement(ExplaineeStatement),
1067}
1068
1069/// Explainee types that are statements.
1070#[derive(Clone, Debug, EnumKind)]
1071#[enum_kind(ExplaineeStatementKind)]
1072pub enum ExplaineeStatement {
1073    /// The object to be explained is a SELECT statement.
1074    Select {
1075        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1076        broken: bool,
1077        plan: plan::SelectPlan,
1078        desc: RelationDesc,
1079    },
1080    /// The object to be explained is a CREATE VIEW.
1081    CreateView {
1082        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1083        broken: bool,
1084        plan: plan::CreateViewPlan,
1085    },
1086    /// The object to be explained is a CREATE MATERIALIZED VIEW.
1087    CreateMaterializedView {
1088        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1089        broken: bool,
1090        plan: plan::CreateMaterializedViewPlan,
1091    },
1092    /// The object to be explained is a CREATE INDEX.
1093    CreateIndex {
1094        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1095        broken: bool,
1096        plan: plan::CreateIndexPlan,
1097    },
1098    /// The object to be explained is a SUBSCRIBE statement.
1099    Subscribe {
1100        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1101        broken: bool,
1102        plan: plan::SubscribePlan,
1103    },
1104}
1105
1106impl ExplaineeStatement {
1107    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1108        match self {
1109            Self::Select { plan, .. } => plan.source.depends_on(),
1110            Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1111            Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1112            Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1113            Self::Subscribe { plan, .. } => plan.from.depends_on(),
1114        }
1115    }
1116
1117    /// Statements that have their `broken` flag set are expected to cause a
1118    /// panic in the optimizer code. In this case:
1119    ///
1120    /// 1. The optimizer pipeline execution will stop, but the panic will be
1121    ///    intercepted and will not propagate to the caller. The partial
1122    ///    optimizer trace collected until this point will be available.
1123    /// 2. The optimizer trace tracing subscriber will delegate regular tracing
1124    ///    spans and events to the default subscriber.
1125    ///
1126    /// This is useful when debugging queries that cause panics.
1127    pub fn broken(&self) -> bool {
1128        match self {
1129            Self::Select { broken, .. } => *broken,
1130            Self::CreateView { broken, .. } => *broken,
1131            Self::CreateMaterializedView { broken, .. } => *broken,
1132            Self::CreateIndex { broken, .. } => *broken,
1133            Self::Subscribe { broken, .. } => *broken,
1134        }
1135    }
1136}
1137
1138impl ExplaineeStatementKind {
1139    pub fn supports(&self, stage: &ExplainStage) -> bool {
1140        use ExplainStage::*;
1141        match self {
1142            Self::Select => true,
1143            Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1144            Self::CreateMaterializedView => true,
1145            Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1146            // SUBSCRIBE doesn't support RAW, DECORRELATED, or LOCAL stages because
1147            // it takes MIR directly rather than going through HIR lowering.
1148            Self::Subscribe => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1149        }
1150    }
1151}
1152
1153impl std::fmt::Display for ExplaineeStatementKind {
1154    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1155        match self {
1156            Self::Select => write!(f, "SELECT"),
1157            Self::CreateView => write!(f, "CREATE VIEW"),
1158            Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1159            Self::CreateIndex => write!(f, "CREATE INDEX"),
1160            Self::Subscribe => write!(f, "SUBSCRIBE"),
1161        }
1162    }
1163}
1164
1165#[derive(Clone, Debug)]
1166pub struct ExplainPushdownPlan {
1167    pub explainee: Explainee,
1168}
1169
1170#[derive(Clone, Debug)]
1171pub struct ExplainTimestampPlan {
1172    pub format: ExplainFormat,
1173    pub raw_plan: HirRelationExpr,
1174    pub when: QueryWhen,
1175}
1176
1177#[derive(Debug)]
1178pub struct ExplainSinkSchemaPlan {
1179    pub sink_from: GlobalId,
1180    pub json_schema: String,
1181}
1182
1183#[derive(Debug)]
1184pub struct SendDiffsPlan {
1185    pub id: CatalogItemId,
1186    pub updates: Vec<(Row, Diff)>,
1187    pub kind: MutationKind,
1188    pub returning: Vec<(Row, NonZeroUsize)>,
1189    pub max_result_size: u64,
1190}
1191
1192#[derive(Debug)]
1193pub struct InsertPlan {
1194    pub id: CatalogItemId,
1195    pub values: HirRelationExpr,
1196    pub returning: Vec<mz_expr::MirScalarExpr>,
1197}
1198
1199#[derive(Debug)]
1200pub struct ReadThenWritePlan {
1201    pub id: CatalogItemId,
1202    pub selection: HirRelationExpr,
1203    pub finishing: RowSetFinishing,
1204    pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1205    pub kind: MutationKind,
1206    pub returning: Vec<mz_expr::MirScalarExpr>,
1207}
1208
1209/// Generated by `ALTER ... IF EXISTS` if the named object did not exist.
1210#[derive(Debug)]
1211pub struct AlterNoopPlan {
1212    pub object_type: ObjectType,
1213}
1214
1215#[derive(Debug)]
1216pub struct AlterSetClusterPlan {
1217    pub id: CatalogItemId,
1218    pub set_cluster: ClusterId,
1219}
1220
1221#[derive(Debug)]
1222pub struct AlterRetainHistoryPlan {
1223    pub id: CatalogItemId,
1224    pub value: Option<Value>,
1225    pub window: CompactionWindow,
1226    pub object_type: ObjectType,
1227}
1228
1229#[derive(Debug)]
1230pub struct AlterSourceTimestampIntervalPlan {
1231    pub id: CatalogItemId,
1232    pub value: Option<Value>,
1233    pub interval: Duration,
1234}
1235
1236#[derive(Debug, Clone)]
1237
1238pub enum AlterOptionParameter<T = String> {
1239    Set(T),
1240    Reset,
1241    Unchanged,
1242}
1243
1244#[derive(Debug)]
1245pub enum AlterConnectionAction {
1246    RotateKeys,
1247    AlterOptions {
1248        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1249        drop_options: BTreeSet<ConnectionOptionName>,
1250        validate: bool,
1251    },
1252}
1253
1254#[derive(Debug)]
1255pub struct AlterConnectionPlan {
1256    pub id: CatalogItemId,
1257    pub action: AlterConnectionAction,
1258}
1259
1260#[derive(Debug)]
1261pub enum AlterSourceAction {
1262    AddSubsourceExports {
1263        subsources: Vec<CreateSourcePlanBundle>,
1264        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1265    },
1266    RefreshReferences {
1267        references: SourceReferences,
1268    },
1269}
1270
1271#[derive(Debug)]
1272pub struct AlterSourcePlan {
1273    pub item_id: CatalogItemId,
1274    pub ingestion_id: GlobalId,
1275    pub action: AlterSourceAction,
1276}
1277
1278#[derive(Debug, Clone)]
1279pub struct AlterSinkPlan {
1280    pub item_id: CatalogItemId,
1281    pub global_id: GlobalId,
1282    pub sink: Sink,
1283    pub with_snapshot: bool,
1284    pub in_cluster: ClusterId,
1285}
1286
1287#[derive(Debug, Clone)]
1288pub struct AlterClusterPlan {
1289    pub id: ClusterId,
1290    pub name: String,
1291    pub options: PlanClusterOption,
1292    pub strategy: AlterClusterPlanStrategy,
1293}
1294
1295#[derive(Debug)]
1296pub struct AlterClusterRenamePlan {
1297    pub id: ClusterId,
1298    pub name: String,
1299    pub to_name: String,
1300}
1301
1302#[derive(Debug)]
1303pub struct AlterClusterReplicaRenamePlan {
1304    pub cluster_id: ClusterId,
1305    pub replica_id: ReplicaId,
1306    pub name: QualifiedReplica,
1307    pub to_name: String,
1308}
1309
1310#[derive(Debug)]
1311pub struct AlterItemRenamePlan {
1312    pub id: CatalogItemId,
1313    pub current_full_name: FullItemName,
1314    pub to_name: String,
1315    pub object_type: ObjectType,
1316}
1317
1318#[derive(Debug)]
1319pub struct AlterSchemaRenamePlan {
1320    pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1321    pub new_schema_name: String,
1322}
1323
1324#[derive(Debug)]
1325pub struct AlterSchemaSwapPlan {
1326    pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1327    pub schema_a_name: String,
1328    pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1329    pub schema_b_name: String,
1330    pub name_temp: String,
1331}
1332
1333#[derive(Debug)]
1334pub struct AlterClusterSwapPlan {
1335    pub id_a: ClusterId,
1336    pub id_b: ClusterId,
1337    pub name_a: String,
1338    pub name_b: String,
1339    pub name_temp: String,
1340}
1341
1342#[derive(Debug)]
1343pub struct AlterSecretPlan {
1344    pub id: CatalogItemId,
1345    pub secret_as: MirScalarExpr,
1346}
1347
1348#[derive(Debug)]
1349pub struct AlterSystemSetPlan {
1350    pub name: String,
1351    pub value: VariableValue,
1352}
1353
1354#[derive(Debug)]
1355pub struct AlterSystemResetPlan {
1356    pub name: String,
1357}
1358
1359#[derive(Debug)]
1360pub struct AlterSystemResetAllPlan {}
1361
1362#[derive(Debug)]
1363pub struct AlterRolePlan {
1364    pub id: RoleId,
1365    pub name: String,
1366    pub option: PlannedAlterRoleOption,
1367}
1368
1369#[derive(Debug)]
1370pub struct AlterOwnerPlan {
1371    pub id: ObjectId,
1372    pub object_type: ObjectType,
1373    pub new_owner: RoleId,
1374}
1375
1376#[derive(Debug)]
1377pub struct AlterTablePlan {
1378    pub relation_id: CatalogItemId,
1379    pub column_name: ColumnName,
1380    pub column_type: SqlColumnType,
1381    pub raw_sql_type: RawDataType,
1382}
1383
1384#[derive(Debug, Clone)]
1385pub struct AlterMaterializedViewApplyReplacementPlan {
1386    pub id: CatalogItemId,
1387    pub replacement_id: CatalogItemId,
1388}
1389
1390#[derive(Debug)]
1391pub struct DeclarePlan {
1392    pub name: String,
1393    pub stmt: Statement<Raw>,
1394    pub sql: String,
1395    pub params: Params,
1396}
1397
1398#[derive(Debug)]
1399pub struct FetchPlan {
1400    pub name: String,
1401    pub count: Option<FetchDirection>,
1402    pub timeout: ExecuteTimeout,
1403}
1404
1405#[derive(Debug)]
1406pub struct ClosePlan {
1407    pub name: String,
1408}
1409
1410#[derive(Debug)]
1411pub struct PreparePlan {
1412    pub name: String,
1413    pub stmt: Statement<Raw>,
1414    pub sql: String,
1415    pub desc: StatementDesc,
1416}
1417
1418#[derive(Debug)]
1419pub struct ExecutePlan {
1420    pub name: String,
1421    pub params: Params,
1422}
1423
1424#[derive(Debug)]
1425pub struct DeallocatePlan {
1426    pub name: Option<String>,
1427}
1428
1429#[derive(Debug)]
1430pub struct RaisePlan {
1431    pub severity: NoticeSeverity,
1432}
1433
1434#[derive(Debug)]
1435pub struct GrantRolePlan {
1436    /// The roles that are gaining members.
1437    pub role_ids: Vec<RoleId>,
1438    /// The roles that will be added to `role_id`.
1439    pub member_ids: Vec<RoleId>,
1440    /// The role that granted the membership.
1441    pub grantor_id: RoleId,
1442}
1443
1444#[derive(Debug)]
1445pub struct RevokeRolePlan {
1446    /// The roles that are losing members.
1447    pub role_ids: Vec<RoleId>,
1448    /// The roles that will be removed from `role_id`.
1449    pub member_ids: Vec<RoleId>,
1450    /// The role that revoked the membership.
1451    pub grantor_id: RoleId,
1452}
1453
1454#[derive(Debug)]
1455pub struct UpdatePrivilege {
1456    /// The privileges being granted/revoked on an object.
1457    pub acl_mode: AclMode,
1458    /// The ID of the object receiving privileges.
1459    pub target_id: SystemObjectId,
1460    /// The role that is granting the privileges.
1461    pub grantor: RoleId,
1462    /// Whether `acl_mode` was derived from the `ALL [PRIVILEGES]` shorthand.
1463    /// Used to suppress the `NonApplicablePrivilegeTypes` notice in that
1464    /// case: the shorthand is not the user explicitly naming a privilege
1465    /// that doesn't apply to the object type, so warning would be noisy.
1466    pub acl_from_all: bool,
1467}
1468
1469#[derive(Debug)]
1470pub struct GrantPrivilegesPlan {
1471    /// Description of each privilege being granted.
1472    pub update_privileges: Vec<UpdatePrivilege>,
1473    /// The roles that will granted the privileges.
1474    pub grantees: Vec<RoleId>,
1475}
1476
1477#[derive(Debug)]
1478pub struct RevokePrivilegesPlan {
1479    /// Description of each privilege being revoked.
1480    pub update_privileges: Vec<UpdatePrivilege>,
1481    /// The roles that will have privileges revoked.
1482    pub revokees: Vec<RoleId>,
1483}
1484#[derive(Debug)]
1485pub struct AlterDefaultPrivilegesPlan {
1486    /// Description of objects that match this default privilege.
1487    pub privilege_objects: Vec<DefaultPrivilegeObject>,
1488    /// The privilege to be granted/revoked from the matching objects.
1489    pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1490    /// Whether this is a grant or revoke.
1491    pub is_grant: bool,
1492}
1493
1494#[derive(Debug)]
1495pub struct ReassignOwnedPlan {
1496    /// The roles whose owned objects are being reassigned.
1497    pub old_roles: Vec<RoleId>,
1498    /// The new owner of the objects.
1499    pub new_role: RoleId,
1500    /// All object IDs to reassign.
1501    pub reassign_ids: Vec<ObjectId>,
1502}
1503
1504#[derive(Debug)]
1505pub struct CommentPlan {
1506    /// The object that this comment is associated with.
1507    pub object_id: CommentObjectId,
1508    /// A sub-component of the object that this comment is associated with, e.g. a column.
1509    ///
1510    /// TODO(parkmycar): <https://github.com/MaterializeInc/database-issues/issues/6711>.
1511    pub sub_component: Option<usize>,
1512    /// The comment itself. If `None` that indicates we should clear the existing comment.
1513    pub comment: Option<String>,
1514}
1515
1516#[derive(Clone, Debug)]
1517pub enum TableDataSource {
1518    /// The table owns data created via INSERT/UPDATE/DELETE statements.
1519    TableWrites { defaults: Vec<Expr<Aug>> },
1520
1521    /// The table receives its data from the identified `DataSourceDesc`.
1522    /// This table type does not support INSERT/UPDATE/DELETE statements.
1523    DataSource {
1524        desc: DataSourceDesc,
1525        timeline: Timeline,
1526    },
1527}
1528
1529#[derive(Clone, Debug)]
1530pub struct Table {
1531    pub create_sql: String,
1532    pub desc: VersionedRelationDesc,
1533    pub temporary: bool,
1534    pub compaction_window: Option<CompactionWindow>,
1535    pub data_source: TableDataSource,
1536}
1537
1538#[derive(Clone, Debug)]
1539pub struct Source {
1540    pub create_sql: String,
1541    pub data_source: DataSourceDesc,
1542    pub desc: RelationDesc,
1543    pub compaction_window: Option<CompactionWindow>,
1544}
1545
1546#[derive(Debug, Clone)]
1547pub enum DataSourceDesc {
1548    /// Receives data from an external system.
1549    Ingestion(SourceDesc<ReferencedConnection>),
1550    /// Receives data from an external system.
1551    OldSyntaxIngestion {
1552        desc: SourceDesc<ReferencedConnection>,
1553        // If we're dealing with an old syntax ingestion the progress id will be some other collection
1554        // and the ingestion itself will have the data from a default external reference
1555        progress_subsource: CatalogItemId,
1556        data_config: SourceExportDataConfig<ReferencedConnection>,
1557        details: SourceExportDetails,
1558    },
1559    /// This source receives its data from the identified ingestion,
1560    /// specifically the output identified by `external_reference`.
1561    IngestionExport {
1562        ingestion_id: CatalogItemId,
1563        external_reference: UnresolvedItemName,
1564        details: SourceExportDetails,
1565        data_config: SourceExportDataConfig<ReferencedConnection>,
1566    },
1567    /// Receives data from the source's reclocking/remapping operations.
1568    Progress,
1569    /// Receives data from HTTP post requests.
1570    Webhook {
1571        validate_using: Option<WebhookValidation>,
1572        body_format: WebhookBodyFormat,
1573        headers: WebhookHeaders,
1574        /// Only `Some` when created via `CREATE TABLE ... FROM WEBHOOK`.
1575        cluster_id: Option<StorageInstanceId>,
1576    },
1577}
1578
1579#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1580pub struct WebhookValidation {
1581    /// The expression used to validate a request.
1582    pub expression: MirScalarExpr,
1583    /// Description of the source that will be created.
1584    pub relation_desc: RelationDesc,
1585    /// The column index to provide the request body and whether to provide it as bytes.
1586    pub bodies: Vec<(usize, bool)>,
1587    /// The column index to provide the request headers and whether to provide the values as bytes.
1588    pub headers: Vec<(usize, bool)>,
1589    /// Any secrets that are used in that validation.
1590    pub secrets: Vec<WebhookValidationSecret>,
1591}
1592
1593impl WebhookValidation {
1594    const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1595
1596    /// Attempt to reduce the internal [`MirScalarExpr`] into a simpler expression.
1597    ///
1598    /// The reduction happens on a separate thread, we also only wait for
1599    /// `WebhookValidation::MAX_REDUCE_TIME` before timing out and returning an error.
1600    pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1601        let WebhookValidation {
1602            expression,
1603            relation_desc,
1604            ..
1605        } = self;
1606
1607        // On a different thread, attempt to reduce the expression.
1608        let mut expression_ = expression.clone();
1609        let desc_ = relation_desc.clone();
1610        let reduce_task = mz_ore::task::spawn_blocking(
1611            || "webhook-validation-reduce",
1612            move || {
1613                let repr_col_types: Vec<ReprColumnType> = desc_
1614                    .typ()
1615                    .column_types
1616                    .iter()
1617                    .map(ReprColumnType::from)
1618                    .collect();
1619                expression_.reduce(&repr_col_types);
1620                expression_
1621            },
1622        );
1623
1624        match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1625            Ok(reduced_expr) => {
1626                *expression = reduced_expr;
1627                Ok(())
1628            }
1629            Err(_) => Err("timeout"),
1630        }
1631    }
1632}
1633
1634#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1635pub struct WebhookHeaders {
1636    /// Optionally include a column named `headers` whose content is possibly filtered.
1637    pub header_column: Option<WebhookHeaderFilters>,
1638    /// The column index to provide the specific request header, and whether to provide it as bytes.
1639    pub mapped_headers: BTreeMap<usize, (String, bool)>,
1640}
1641
1642impl WebhookHeaders {
1643    /// Returns the number of columns needed to represent our headers.
1644    pub fn num_columns(&self) -> usize {
1645        let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1646        let mapped_headers = self.mapped_headers.len();
1647
1648        header_column + mapped_headers
1649    }
1650}
1651
1652#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1653pub struct WebhookHeaderFilters {
1654    pub block: BTreeSet<String>,
1655    pub allow: BTreeSet<String>,
1656}
1657
1658#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Arbitrary)]
1659pub enum WebhookBodyFormat {
1660    Json { array: bool },
1661    Bytes,
1662    Text,
1663}
1664
1665impl From<WebhookBodyFormat> for SqlScalarType {
1666    fn from(value: WebhookBodyFormat) -> Self {
1667        match value {
1668            WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1669            WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1670            WebhookBodyFormat::Text => SqlScalarType::String,
1671        }
1672    }
1673}
1674
1675#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1676pub struct WebhookValidationSecret {
1677    /// Identifies the secret by [`CatalogItemId`].
1678    pub id: CatalogItemId,
1679    /// Column index for the expression context that this secret was originally evaluated in.
1680    pub column_idx: usize,
1681    /// Whether or not this secret should be provided to the expression as Bytes or a String.
1682    pub use_bytes: bool,
1683}
1684
1685#[derive(Clone, Debug)]
1686pub struct Connection {
1687    pub create_sql: String,
1688    pub details: ConnectionDetails,
1689}
1690
1691#[derive(Clone, Debug, Serialize)]
1692pub enum ConnectionDetails {
1693    Kafka(KafkaConnection<ReferencedConnection>),
1694    Csr(CsrConnection<ReferencedConnection>),
1695    GlueSchemaRegistry(GlueSchemaRegistryConnection<ReferencedConnection>),
1696    Postgres(PostgresConnection<ReferencedConnection>),
1697    Ssh {
1698        connection: SshConnection,
1699        key_1: SshKey,
1700        key_2: SshKey,
1701    },
1702    Aws(AwsConnection),
1703    AwsPrivatelink(AwsPrivatelinkConnection),
1704    Gcp(GcpConnection),
1705    MySql(MySqlConnection<ReferencedConnection>),
1706    SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1707    IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1708}
1709
1710impl ConnectionDetails {
1711    pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1712        match self {
1713            ConnectionDetails::Kafka(c) => {
1714                mz_storage_types::connections::Connection::Kafka(c.clone())
1715            }
1716            ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1717            ConnectionDetails::GlueSchemaRegistry(c) => {
1718                mz_storage_types::connections::Connection::GlueSchemaRegistry(c.clone())
1719            }
1720            ConnectionDetails::Postgres(c) => {
1721                mz_storage_types::connections::Connection::Postgres(c.clone())
1722            }
1723            ConnectionDetails::Ssh { connection, .. } => {
1724                mz_storage_types::connections::Connection::Ssh(connection.clone())
1725            }
1726            ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1727            ConnectionDetails::AwsPrivatelink(c) => {
1728                mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1729            }
1730            ConnectionDetails::Gcp(c) => mz_storage_types::connections::Connection::Gcp(c.clone()),
1731            ConnectionDetails::MySql(c) => {
1732                mz_storage_types::connections::Connection::MySql(c.clone())
1733            }
1734            ConnectionDetails::SqlServer(c) => {
1735                mz_storage_types::connections::Connection::SqlServer(c.clone())
1736            }
1737            ConnectionDetails::IcebergCatalog(c) => {
1738                mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1739            }
1740        }
1741    }
1742
1743    /// Secrets whose *contents* this connection places requirements on, paired
1744    /// with the check to apply. Callers must re-apply these checks whenever the
1745    /// connection is created or altered, and whenever the contents of one of
1746    /// the returned secrets change (e.g. `ALTER SECRET`).
1747    ///
1748    /// We rely on the caller to actually execute these checks because we don't know:
1749    /// - which secrets the caller cares about
1750    /// - which secrets require an async operation to fetch
1751    ///
1752    /// For example, the ALTER SECRET caller should only perform checks on its own secret,
1753    /// while the ALTER CONNECTION caller fetches and checks every secret from its connection.
1754    pub fn secret_content_guards(
1755        &self,
1756    ) -> Vec<(CatalogItemId, fn(&str) -> Result<(), anyhow::Error>)> {
1757        match self {
1758            // A service-account key defines its own OAuth2 token URI. We only
1759            // want to send requests to the actual Google OAuth2 token API.
1760            ConnectionDetails::Gcp(gcp) => vec![(
1761                gcp.credentials_json,
1762                GcpServiceAccountKeyTokenUri::validate_json,
1763            )],
1764            _ => vec![],
1765        }
1766    }
1767}
1768
1769#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1770pub struct NetworkPolicyRule {
1771    pub name: String,
1772    pub action: NetworkPolicyRuleAction,
1773    pub address: PolicyAddress,
1774    pub direction: NetworkPolicyRuleDirection,
1775}
1776
1777#[derive(
1778    Debug,
1779    Clone,
1780    Serialize,
1781    Deserialize,
1782    PartialEq,
1783    Eq,
1784    Ord,
1785    PartialOrd,
1786    Hash
1787)]
1788pub enum NetworkPolicyRuleAction {
1789    Allow,
1790}
1791
1792impl std::fmt::Display for NetworkPolicyRuleAction {
1793    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1794        match self {
1795            Self::Allow => write!(f, "allow"),
1796        }
1797    }
1798}
1799impl TryFrom<&str> for NetworkPolicyRuleAction {
1800    type Error = PlanError;
1801    fn try_from(value: &str) -> Result<Self, Self::Error> {
1802        match value.to_uppercase().as_str() {
1803            "ALLOW" => Ok(Self::Allow),
1804            _ => Err(PlanError::Unstructured(
1805                "Allow is the only valid option".into(),
1806            )),
1807        }
1808    }
1809}
1810
1811#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1812pub enum NetworkPolicyRuleDirection {
1813    Ingress,
1814}
1815impl std::fmt::Display for NetworkPolicyRuleDirection {
1816    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1817        match self {
1818            Self::Ingress => write!(f, "ingress"),
1819        }
1820    }
1821}
1822impl TryFrom<&str> for NetworkPolicyRuleDirection {
1823    type Error = PlanError;
1824    fn try_from(value: &str) -> Result<Self, Self::Error> {
1825        match value.to_uppercase().as_str() {
1826            "INGRESS" => Ok(Self::Ingress),
1827            _ => Err(PlanError::Unstructured(
1828                "Ingress is the only valid option".into(),
1829            )),
1830        }
1831    }
1832}
1833
1834#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1835pub struct PolicyAddress(pub IpNet);
1836impl std::fmt::Display for PolicyAddress {
1837    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1838        write!(f, "{}", &self.0.to_string())
1839    }
1840}
1841impl From<String> for PolicyAddress {
1842    fn from(value: String) -> Self {
1843        Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1844    }
1845}
1846impl TryFrom<&str> for PolicyAddress {
1847    type Error = PlanError;
1848    fn try_from(value: &str) -> Result<Self, Self::Error> {
1849        let net = IpNet::from_str(value)
1850            .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1851        Ok(Self(net))
1852    }
1853}
1854
1855impl Serialize for PolicyAddress {
1856    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1857    where
1858        S: serde::Serializer,
1859    {
1860        serializer.serialize_str(&format!("{}", &self.0))
1861    }
1862}
1863
1864#[derive(Clone, Debug, Serialize)]
1865pub enum SshKey {
1866    PublicOnly(String),
1867    Both(SshKeyPair),
1868}
1869
1870impl SshKey {
1871    pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1872        match self {
1873            SshKey::PublicOnly(_) => None,
1874            SshKey::Both(key_pair) => Some(key_pair),
1875        }
1876    }
1877
1878    pub fn public_key(&self) -> String {
1879        match self {
1880            SshKey::PublicOnly(s) => s.into(),
1881            SshKey::Both(p) => p.ssh_public_key(),
1882        }
1883    }
1884}
1885
1886#[derive(Clone, Debug)]
1887pub struct Secret {
1888    pub create_sql: String,
1889    pub secret_as: MirScalarExpr,
1890}
1891
1892#[derive(Clone, Debug)]
1893pub struct Sink {
1894    /// Parse-able SQL that is stored durably and defines this sink.
1895    pub create_sql: String,
1896    /// Collection we read into this sink.
1897    pub from: GlobalId,
1898    /// Type of connection to the external service we sink into.
1899    pub connection: StorageSinkConnection<ReferencedConnection>,
1900    // TODO(guswynn): this probably should just be in the `connection`.
1901    pub envelope: SinkEnvelope,
1902    pub version: u64,
1903    pub commit_interval: Option<Duration>,
1904}
1905
1906#[derive(Clone, Debug)]
1907pub struct View {
1908    /// Parse-able SQL that is stored durably and defines this view.
1909    pub create_sql: String,
1910    /// Unoptimized high-level expression from parsing the `create_sql`.
1911    pub expr: HirRelationExpr,
1912    /// All of the catalog objects that are referenced by this view, according to the `expr`.
1913    pub dependencies: DependencyIds,
1914    /// Columns of this view.
1915    pub column_names: Vec<ColumnName>,
1916    /// If this view is created in the temporary schema, e.g. `CREATE TEMPORARY ...`.
1917    pub temporary: bool,
1918}
1919
1920#[derive(Clone, Debug)]
1921pub struct MaterializedView {
1922    /// Parse-able SQL that is stored durably and defines this materialized view.
1923    pub create_sql: String,
1924    /// Unoptimized high-level expression from parsing the `create_sql`.
1925    pub expr: HirRelationExpr,
1926    /// All of the catalog objects that are referenced by this materialized view, according to the `expr`.
1927    pub dependencies: DependencyIds,
1928    /// Columns of this view.
1929    pub column_names: Vec<ColumnName>,
1930    pub replacement_target: Option<CatalogItemId>,
1931    /// Cluster this materialized view will get installed on.
1932    pub cluster_id: ClusterId,
1933    /// If set, only install this materialized view's dataflow on the specified replica.
1934    pub target_replica: Option<ReplicaId>,
1935    pub non_null_assertions: Vec<usize>,
1936    pub compaction_window: Option<CompactionWindow>,
1937    pub refresh_schedule: Option<RefreshSchedule>,
1938    pub as_of: Option<Timestamp>,
1939}
1940
1941#[derive(Clone, Debug)]
1942pub struct Index {
1943    /// Parse-able SQL that is stored durably and defines this index.
1944    pub create_sql: String,
1945    /// Collection this index is on top of.
1946    pub on: GlobalId,
1947    pub keys: Vec<mz_expr::MirScalarExpr>,
1948    pub compaction_window: Option<CompactionWindow>,
1949    pub cluster_id: ClusterId,
1950}
1951
1952#[derive(Clone, Debug)]
1953pub struct Type {
1954    pub create_sql: String,
1955    pub inner: CatalogType<IdReference>,
1956}
1957
1958/// Specifies when a `Peek` or `Subscribe` should occur.
1959#[derive(Deserialize, Clone, Debug, PartialEq)]
1960pub enum QueryWhen {
1961    /// The peek should occur at the latest possible timestamp that allows the
1962    /// peek to complete immediately.
1963    Immediately,
1964    /// The peek should occur at a timestamp that allows the peek to see all
1965    /// data written to tables within Materialize.
1966    FreshestTableWrite,
1967    /// The peek should occur at the timestamp described by the specified
1968    /// expression.
1969    ///
1970    /// The expression may have any type.
1971    AtTimestamp(Timestamp),
1972    /// Same as Immediately, but will also advance to at least the specified
1973    /// expression.
1974    AtLeastTimestamp(Timestamp),
1975}
1976
1977impl QueryWhen {
1978    /// Returns a timestamp to which the candidate must be advanced.
1979    pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1980        match self {
1981            QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1982            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1983        }
1984    }
1985    /// Returns whether the candidate's upper bound is constrained.
1986    /// This is only true for `AtTimestamp` since it is the only variant that
1987    /// specifies a timestamp.
1988    pub fn constrains_upper(&self) -> bool {
1989        match self {
1990            QueryWhen::AtTimestamp(_) => true,
1991            QueryWhen::AtLeastTimestamp(_)
1992            | QueryWhen::Immediately
1993            | QueryWhen::FreshestTableWrite => false,
1994        }
1995    }
1996    /// Returns whether the candidate must be advanced to the since.
1997    pub fn advance_to_since(&self) -> bool {
1998        match self {
1999            QueryWhen::Immediately
2000            | QueryWhen::AtLeastTimestamp(_)
2001            | QueryWhen::FreshestTableWrite => true,
2002            QueryWhen::AtTimestamp(_) => false,
2003        }
2004    }
2005    /// Returns whether the candidate can be advanced to the upper.
2006    pub fn can_advance_to_upper(&self) -> bool {
2007        match self {
2008            QueryWhen::Immediately => true,
2009            QueryWhen::FreshestTableWrite
2010            | QueryWhen::AtTimestamp(_)
2011            | QueryWhen::AtLeastTimestamp(_) => false,
2012        }
2013    }
2014
2015    /// Returns whether the candidate can be advanced to the timeline's timestamp.
2016    pub fn can_advance_to_timeline_ts(&self) -> bool {
2017        match self {
2018            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
2019            QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
2020        }
2021    }
2022    /// Returns whether the candidate must be advanced to the timeline's timestamp.
2023    pub fn must_advance_to_timeline_ts(&self) -> bool {
2024        match self {
2025            QueryWhen::FreshestTableWrite => true,
2026            QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
2027                false
2028            }
2029        }
2030    }
2031    /// Returns whether the selected timestamp should be tracked within the current transaction.
2032    pub fn is_transactional(&self) -> bool {
2033        match self {
2034            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
2035            QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
2036        }
2037    }
2038}
2039
2040#[derive(Debug, Copy, Clone)]
2041pub enum MutationKind {
2042    Insert,
2043    Update,
2044    Delete,
2045}
2046
2047#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
2048pub enum CopyFormat {
2049    Text,
2050    Csv,
2051    Binary,
2052    Parquet,
2053}
2054
2055#[derive(Debug, Copy, Clone)]
2056pub enum ExecuteTimeout {
2057    None,
2058    Seconds(f64),
2059    WaitOnce,
2060}
2061
2062#[derive(Clone, Debug)]
2063pub enum IndexOption {
2064    /// Configures the logical compaction window for an index.
2065    RetainHistory(CompactionWindow),
2066}
2067
2068#[derive(Clone, Debug)]
2069pub enum TableOption {
2070    /// Configures the logical compaction window for a table.
2071    RetainHistory(CompactionWindow),
2072}
2073
2074#[derive(Clone, Debug)]
2075pub struct PlanClusterOption {
2076    pub availability_zones: AlterOptionParameter<Vec<String>>,
2077    pub introspection_debugging: AlterOptionParameter<bool>,
2078    pub introspection_interval: AlterOptionParameter<OptionalDuration>,
2079    pub managed: AlterOptionParameter<bool>,
2080    pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
2081    pub replication_factor: AlterOptionParameter<u32>,
2082    pub size: AlterOptionParameter,
2083    pub schedule: AlterOptionParameter<ClusterSchedule>,
2084    pub workload_class: AlterOptionParameter<Option<String>>,
2085}
2086
2087impl Default for PlanClusterOption {
2088    fn default() -> Self {
2089        Self {
2090            availability_zones: AlterOptionParameter::Unchanged,
2091            introspection_debugging: AlterOptionParameter::Unchanged,
2092            introspection_interval: AlterOptionParameter::Unchanged,
2093            managed: AlterOptionParameter::Unchanged,
2094            replicas: AlterOptionParameter::Unchanged,
2095            replication_factor: AlterOptionParameter::Unchanged,
2096            size: AlterOptionParameter::Unchanged,
2097            schedule: AlterOptionParameter::Unchanged,
2098            workload_class: AlterOptionParameter::Unchanged,
2099        }
2100    }
2101}
2102
2103#[derive(Clone, Debug, PartialEq, Eq)]
2104pub enum AlterClusterPlanStrategy {
2105    None,
2106    For(Duration),
2107    UntilReady {
2108        on_timeout: OnTimeoutAction,
2109        timeout: Duration,
2110    },
2111}
2112
2113#[derive(
2114    Clone,
2115    Copy,
2116    Debug,
2117    Deserialize,
2118    Serialize,
2119    PartialOrd,
2120    PartialEq,
2121    Eq,
2122    Ord
2123)]
2124pub enum OnTimeoutAction {
2125    /// Cut over to the target shape even though it has not hydrated.
2126    Commit,
2127    /// Drop the target replicas and keep the pre-reconfiguration set.
2128    Rollback,
2129}
2130
2131impl Default for OnTimeoutAction {
2132    fn default() -> Self {
2133        Self::Commit
2134    }
2135}
2136
2137impl TryFrom<&str> for OnTimeoutAction {
2138    type Error = PlanError;
2139    fn try_from(value: &str) -> Result<Self, Self::Error> {
2140        match value.to_uppercase().as_str() {
2141            "COMMIT" => Ok(Self::Commit),
2142            "ROLLBACK" => Ok(Self::Rollback),
2143            _ => Err(PlanError::Unstructured(
2144                "Valid options are COMMIT, ROLLBACK".into(),
2145            )),
2146        }
2147    }
2148}
2149
2150impl AlterClusterPlanStrategy {
2151    pub fn is_none(&self) -> bool {
2152        matches!(self, Self::None)
2153    }
2154    pub fn is_some(&self) -> bool {
2155        !matches!(self, Self::None)
2156    }
2157}
2158
2159impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2160    type Error = PlanError;
2161
2162    fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2163        Ok(match value.wait {
2164            Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2165            Some(ClusterAlterOptionValue::UntilReady(options)) => {
2166                let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2167                Self::UntilReady {
2168                    timeout: match extracted.timeout {
2169                        Some(d) => d,
2170                        None => Err(PlanError::UntilReadyTimeoutRequired)?,
2171                    },
2172                    on_timeout: match extracted.on_timeout {
2173                        Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2174                            PlanError::InvalidOptionValue {
2175                                option_name: "ON TIMEOUT".into(),
2176                                err: Box::new(e),
2177                            }
2178                        })?,
2179                        None => OnTimeoutAction::default(),
2180                    },
2181                }
2182            }
2183            None => Self::None,
2184        })
2185    }
2186}
2187
2188/// A vector of values to which parameter references should be bound.
2189#[derive(Debug, Clone)]
2190pub struct Params {
2191    /// The datums that were provided in the EXECUTE statement.
2192    pub datums: Row,
2193    /// The types of the datums provided in the EXECUTE statement.
2194    pub execute_types: Vec<SqlScalarType>,
2195    /// The types that the prepared statement expects based on its definition.
2196    pub expected_types: Vec<SqlScalarType>,
2197}
2198
2199impl Params {
2200    /// Returns a `Params` with no parameters.
2201    pub fn empty() -> Params {
2202        Params {
2203            datums: Row::pack_slice(&[]),
2204            execute_types: vec![],
2205            expected_types: vec![],
2206        }
2207    }
2208}
2209
2210/// Controls planning of a SQL query.
2211#[derive(
2212    Ord,
2213    PartialOrd,
2214    Clone,
2215    Debug,
2216    Eq,
2217    PartialEq,
2218    Serialize,
2219    Deserialize,
2220    Hash,
2221    Copy
2222)]
2223pub struct PlanContext {
2224    pub wall_time: DateTime<Utc>,
2225    pub ignore_if_exists_errors: bool,
2226}
2227
2228impl PlanContext {
2229    pub fn new(wall_time: DateTime<Utc>) -> Self {
2230        Self {
2231            wall_time,
2232            ignore_if_exists_errors: false,
2233        }
2234    }
2235
2236    /// Return a PlanContext with zero values. This should only be used when
2237    /// planning is required but unused (like in `plan_create_table()`) or in
2238    /// tests.
2239    pub fn zero() -> Self {
2240        PlanContext {
2241            wall_time: now::to_datetime(NOW_ZERO()),
2242            ignore_if_exists_errors: false,
2243        }
2244    }
2245
2246    pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2247        self.ignore_if_exists_errors = value;
2248        self
2249    }
2250}