mz_sql/
plan.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL planning.
11//!
12//! SQL planning is the process of taking the abstract syntax tree of a
13//! [`Statement`] and turning it into a [`Plan`] that the dataflow layer can
14//! execute.
15//!
16//! Statements must be purified before they can be planned. See the
17//! [`pure`](crate::pure) module for details.
18
19// Internal module layout.
20//
21// The entry point for planning is `statement::handle_statement`. That function
22// dispatches to a more specific `handle` function for the particular statement
23// type. For most statements, this `handle` function is uninteresting and short,
24// but anything involving a `SELECT` statement gets complicated. `SELECT`
25// queries wind through the functions in the `query` module, starting with
26// `plan_root_query` and fanning out based on the contents of the `SELECT`
27// statement.
28
29use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{
41    CollectionPlan, ColumnOrder, MapFilterProject, MirRelationExpr, MirScalarExpr, RowSetFinishing,
42};
43use mz_ore::now::{self, NOW_ZERO};
44use mz_pgcopy::CopyFormatParams;
45use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
46use mz_repr::explain::{ExplainConfig, ExplainFormat};
47use mz_repr::network_policy_id::NetworkPolicyId;
48use mz_repr::optimize::OptimizerFeatureOverrides;
49use mz_repr::refresh_schedule::RefreshSchedule;
50use mz_repr::role_id::RoleId;
51use mz_repr::{
52    CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, Row, SqlColumnType,
53    SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
54};
55use mz_sql_parser::ast::{
56    AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
57    RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
58    Value, WithOptionValue,
59};
60use mz_ssh_util::keys::SshKeyPair;
61use mz_storage_types::connections::aws::AwsConnection;
62use mz_storage_types::connections::inline::ReferencedConnection;
63use mz_storage_types::connections::{
64    AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
65    MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
66};
67use mz_storage_types::instances::StorageInstanceId;
68use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
69use mz_storage_types::sources::{
70    SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
71};
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74
75use crate::ast::{
76    ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
77    TransactionAccessMode,
78};
79use crate::catalog::{
80    CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
81    RoleAttributesRaw,
82};
83use crate::names::{
84    Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
85    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
86};
87
88pub(crate) mod error;
89pub(crate) mod explain;
90pub(crate) mod hir;
91pub(crate) mod literal;
92pub(crate) mod lowering;
93pub(crate) mod notice;
94pub(crate) mod plan_utils;
95pub(crate) mod query;
96pub(crate) mod scope;
97pub(crate) mod side_effecting_func;
98pub(crate) mod statement;
99pub(crate) mod transform_ast;
100pub(crate) mod transform_hir;
101pub(crate) mod typeconv;
102pub(crate) mod with_options;
103
104use crate::plan;
105use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
106use crate::plan::with_options::OptionalDuration;
107pub use error::PlanError;
108pub use explain::normalize_subqueries;
109pub use hir::{
110    AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
111    WindowExprType,
112};
113pub use lowering::Config as HirToMirConfig;
114pub use notice::PlanNotice;
115pub use query::{ExprContext, QueryContext, QueryLifetime};
116pub use scope::Scope;
117pub use side_effecting_func::SideEffectingFunc;
118pub use statement::ddl::{
119    AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
120    PlannedAlterRoleOption, PlannedRoleVariable, SqlServerConfigOptionExtracted,
121};
122pub use statement::{
123    StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
124    resolve_cluster_for_materialized_view,
125};
126
127use self::statement::ddl::ClusterAlterOptionExtracted;
128use self::with_options::TryFromValue;
129
130/// Instructions for executing a SQL query.
131#[derive(Debug, EnumKind)]
132#[enum_kind(PlanKind)]
133pub enum Plan {
134    CreateConnection(CreateConnectionPlan),
135    CreateDatabase(CreateDatabasePlan),
136    CreateSchema(CreateSchemaPlan),
137    CreateRole(CreateRolePlan),
138    CreateCluster(CreateClusterPlan),
139    CreateClusterReplica(CreateClusterReplicaPlan),
140    CreateSource(CreateSourcePlan),
141    CreateSources(Vec<CreateSourcePlanBundle>),
142    CreateSecret(CreateSecretPlan),
143    CreateSink(CreateSinkPlan),
144    CreateTable(CreateTablePlan),
145    CreateView(CreateViewPlan),
146    CreateMaterializedView(CreateMaterializedViewPlan),
147    CreateContinualTask(CreateContinualTaskPlan),
148    CreateNetworkPolicy(CreateNetworkPolicyPlan),
149    CreateIndex(CreateIndexPlan),
150    CreateType(CreateTypePlan),
151    Comment(CommentPlan),
152    DiscardTemp,
153    DiscardAll,
154    DropObjects(DropObjectsPlan),
155    DropOwned(DropOwnedPlan),
156    EmptyQuery,
157    ShowAllVariables,
158    ShowCreate(ShowCreatePlan),
159    ShowColumns(ShowColumnsPlan),
160    ShowVariable(ShowVariablePlan),
161    InspectShard(InspectShardPlan),
162    SetVariable(SetVariablePlan),
163    ResetVariable(ResetVariablePlan),
164    SetTransaction(SetTransactionPlan),
165    StartTransaction(StartTransactionPlan),
166    CommitTransaction(CommitTransactionPlan),
167    AbortTransaction(AbortTransactionPlan),
168    Select(SelectPlan),
169    Subscribe(SubscribePlan),
170    CopyFrom(CopyFromPlan),
171    CopyTo(CopyToPlan),
172    ExplainPlan(ExplainPlanPlan),
173    ExplainPushdown(ExplainPushdownPlan),
174    ExplainTimestamp(ExplainTimestampPlan),
175    ExplainSinkSchema(ExplainSinkSchemaPlan),
176    Insert(InsertPlan),
177    AlterCluster(AlterClusterPlan),
178    AlterClusterSwap(AlterClusterSwapPlan),
179    AlterNoop(AlterNoopPlan),
180    AlterSetCluster(AlterSetClusterPlan),
181    AlterConnection(AlterConnectionPlan),
182    AlterSource(AlterSourcePlan),
183    AlterClusterRename(AlterClusterRenamePlan),
184    AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
185    AlterItemRename(AlterItemRenamePlan),
186    AlterSchemaRename(AlterSchemaRenamePlan),
187    AlterSchemaSwap(AlterSchemaSwapPlan),
188    AlterSecret(AlterSecretPlan),
189    AlterSink(AlterSinkPlan),
190    AlterSystemSet(AlterSystemSetPlan),
191    AlterSystemReset(AlterSystemResetPlan),
192    AlterSystemResetAll(AlterSystemResetAllPlan),
193    AlterRole(AlterRolePlan),
194    AlterOwner(AlterOwnerPlan),
195    AlterTableAddColumn(AlterTablePlan),
196    AlterNetworkPolicy(AlterNetworkPolicyPlan),
197    Declare(DeclarePlan),
198    Fetch(FetchPlan),
199    Close(ClosePlan),
200    ReadThenWrite(ReadThenWritePlan),
201    Prepare(PreparePlan),
202    Execute(ExecutePlan),
203    Deallocate(DeallocatePlan),
204    Raise(RaisePlan),
205    GrantRole(GrantRolePlan),
206    RevokeRole(RevokeRolePlan),
207    GrantPrivileges(GrantPrivilegesPlan),
208    RevokePrivileges(RevokePrivilegesPlan),
209    AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
210    ReassignOwned(ReassignOwnedPlan),
211    SideEffectingFunc(SideEffectingFunc),
212    ValidateConnection(ValidateConnectionPlan),
213    AlterRetainHistory(AlterRetainHistoryPlan),
214}
215
216impl Plan {
217    /// Expresses which [`StatementKind`] can generate which set of
218    /// [`PlanKind`].
219    pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
220        match stmt {
221            StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
222            StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
223            StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
224            StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
225            StatementKind::AlterObjectRename => &[
226                PlanKind::AlterClusterRename,
227                PlanKind::AlterClusterReplicaRename,
228                PlanKind::AlterItemRename,
229                PlanKind::AlterSchemaRename,
230                PlanKind::AlterNoop,
231            ],
232            StatementKind::AlterObjectSwap => &[
233                PlanKind::AlterClusterSwap,
234                PlanKind::AlterSchemaSwap,
235                PlanKind::AlterNoop,
236            ],
237            StatementKind::AlterRole => &[PlanKind::AlterRole],
238            StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
239            StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
240            StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
241            StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
242            StatementKind::AlterSource => &[
243                PlanKind::AlterNoop,
244                PlanKind::AlterSource,
245                PlanKind::AlterRetainHistory,
246            ],
247            StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
248            StatementKind::AlterSystemResetAll => {
249                &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
250            }
251            StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
252            StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
253            StatementKind::AlterTableAddColumn => {
254                &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
255            }
256            StatementKind::Close => &[PlanKind::Close],
257            StatementKind::Comment => &[PlanKind::Comment],
258            StatementKind::Commit => &[PlanKind::CommitTransaction],
259            StatementKind::Copy => &[
260                PlanKind::CopyFrom,
261                PlanKind::Select,
262                PlanKind::Subscribe,
263                PlanKind::CopyTo,
264            ],
265            StatementKind::CreateCluster => &[PlanKind::CreateCluster],
266            StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
267            StatementKind::CreateConnection => &[PlanKind::CreateConnection],
268            StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
269            StatementKind::CreateIndex => &[PlanKind::CreateIndex],
270            StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
271            StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
272            StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
273            StatementKind::CreateRole => &[PlanKind::CreateRole],
274            StatementKind::CreateSchema => &[PlanKind::CreateSchema],
275            StatementKind::CreateSecret => &[PlanKind::CreateSecret],
276            StatementKind::CreateSink => &[PlanKind::CreateSink],
277            StatementKind::CreateSource | StatementKind::CreateSubsource => {
278                &[PlanKind::CreateSource]
279            }
280            StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
281            StatementKind::CreateTable => &[PlanKind::CreateTable],
282            StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
283            StatementKind::CreateType => &[PlanKind::CreateType],
284            StatementKind::CreateView => &[PlanKind::CreateView],
285            StatementKind::Deallocate => &[PlanKind::Deallocate],
286            StatementKind::Declare => &[PlanKind::Declare],
287            StatementKind::Delete => &[PlanKind::ReadThenWrite],
288            StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
289            StatementKind::DropObjects => &[PlanKind::DropObjects],
290            StatementKind::DropOwned => &[PlanKind::DropOwned],
291            StatementKind::Execute => &[PlanKind::Execute],
292            StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
293            StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
294            StatementKind::ExplainAnalyzeObject => &[PlanKind::Select],
295            StatementKind::ExplainAnalyzeCluster => &[PlanKind::Select],
296            StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
297            StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
298            StatementKind::Fetch => &[PlanKind::Fetch],
299            StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
300            StatementKind::GrantRole => &[PlanKind::GrantRole],
301            StatementKind::Insert => &[PlanKind::Insert],
302            StatementKind::Prepare => &[PlanKind::Prepare],
303            StatementKind::Raise => &[PlanKind::Raise],
304            StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
305            StatementKind::ResetVariable => &[PlanKind::ResetVariable],
306            StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
307            StatementKind::RevokeRole => &[PlanKind::RevokeRole],
308            StatementKind::Rollback => &[PlanKind::AbortTransaction],
309            StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
310            StatementKind::SetTransaction => &[PlanKind::SetTransaction],
311            StatementKind::SetVariable => &[PlanKind::SetVariable],
312            StatementKind::Show => &[
313                PlanKind::Select,
314                PlanKind::ShowVariable,
315                PlanKind::ShowCreate,
316                PlanKind::ShowColumns,
317                PlanKind::ShowAllVariables,
318                PlanKind::InspectShard,
319            ],
320            StatementKind::StartTransaction => &[PlanKind::StartTransaction],
321            StatementKind::Subscribe => &[PlanKind::Subscribe],
322            StatementKind::Update => &[PlanKind::ReadThenWrite],
323            StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
324            StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
325        }
326    }
327
328    /// Returns a human readable name of the plan. Meant for use in messages sent back to a user.
329    pub fn name(&self) -> &str {
330        match self {
331            Plan::CreateConnection(_) => "create connection",
332            Plan::CreateDatabase(_) => "create database",
333            Plan::CreateSchema(_) => "create schema",
334            Plan::CreateRole(_) => "create role",
335            Plan::CreateCluster(_) => "create cluster",
336            Plan::CreateClusterReplica(_) => "create cluster replica",
337            Plan::CreateSource(_) => "create source",
338            Plan::CreateSources(_) => "create source",
339            Plan::CreateSecret(_) => "create secret",
340            Plan::CreateSink(_) => "create sink",
341            Plan::CreateTable(_) => "create table",
342            Plan::CreateView(_) => "create view",
343            Plan::CreateMaterializedView(_) => "create materialized view",
344            Plan::CreateContinualTask(_) => "create continual task",
345            Plan::CreateIndex(_) => "create index",
346            Plan::CreateType(_) => "create type",
347            Plan::CreateNetworkPolicy(_) => "create network policy",
348            Plan::Comment(_) => "comment",
349            Plan::DiscardTemp => "discard temp",
350            Plan::DiscardAll => "discard all",
351            Plan::DropObjects(plan) => match plan.object_type {
352                ObjectType::Table => "drop table",
353                ObjectType::View => "drop view",
354                ObjectType::MaterializedView => "drop materialized view",
355                ObjectType::Source => "drop source",
356                ObjectType::Sink => "drop sink",
357                ObjectType::Index => "drop index",
358                ObjectType::Type => "drop type",
359                ObjectType::Role => "drop roles",
360                ObjectType::Cluster => "drop clusters",
361                ObjectType::ClusterReplica => "drop cluster replicas",
362                ObjectType::Secret => "drop secret",
363                ObjectType::Connection => "drop connection",
364                ObjectType::Database => "drop database",
365                ObjectType::Schema => "drop schema",
366                ObjectType::Func => "drop function",
367                ObjectType::ContinualTask => "drop continual task",
368                ObjectType::NetworkPolicy => "drop network policy",
369            },
370            Plan::DropOwned(_) => "drop owned",
371            Plan::EmptyQuery => "do nothing",
372            Plan::ShowAllVariables => "show all variables",
373            Plan::ShowCreate(_) => "show create",
374            Plan::ShowColumns(_) => "show columns",
375            Plan::ShowVariable(_) => "show variable",
376            Plan::InspectShard(_) => "inspect shard",
377            Plan::SetVariable(_) => "set variable",
378            Plan::ResetVariable(_) => "reset variable",
379            Plan::SetTransaction(_) => "set transaction",
380            Plan::StartTransaction(_) => "start transaction",
381            Plan::CommitTransaction(_) => "commit",
382            Plan::AbortTransaction(_) => "abort",
383            Plan::Select(_) => "select",
384            Plan::Subscribe(_) => "subscribe",
385            Plan::CopyFrom(_) => "copy from",
386            Plan::CopyTo(_) => "copy to",
387            Plan::ExplainPlan(_) => "explain plan",
388            Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
389            Plan::ExplainTimestamp(_) => "explain timestamp",
390            Plan::ExplainSinkSchema(_) => "explain schema",
391            Plan::Insert(_) => "insert",
392            Plan::AlterNoop(plan) => match plan.object_type {
393                ObjectType::Table => "alter table",
394                ObjectType::View => "alter view",
395                ObjectType::MaterializedView => "alter materialized view",
396                ObjectType::Source => "alter source",
397                ObjectType::Sink => "alter sink",
398                ObjectType::Index => "alter index",
399                ObjectType::Type => "alter type",
400                ObjectType::Role => "alter role",
401                ObjectType::Cluster => "alter cluster",
402                ObjectType::ClusterReplica => "alter cluster replica",
403                ObjectType::Secret => "alter secret",
404                ObjectType::Connection => "alter connection",
405                ObjectType::Database => "alter database",
406                ObjectType::Schema => "alter schema",
407                ObjectType::Func => "alter function",
408                ObjectType::ContinualTask => "alter continual task",
409                ObjectType::NetworkPolicy => "alter network policy",
410            },
411            Plan::AlterCluster(_) => "alter cluster",
412            Plan::AlterClusterRename(_) => "alter cluster rename",
413            Plan::AlterClusterSwap(_) => "alter cluster swap",
414            Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
415            Plan::AlterSetCluster(_) => "alter set cluster",
416            Plan::AlterConnection(_) => "alter connection",
417            Plan::AlterSource(_) => "alter source",
418            Plan::AlterItemRename(_) => "rename item",
419            Plan::AlterSchemaRename(_) => "alter rename schema",
420            Plan::AlterSchemaSwap(_) => "alter swap schema",
421            Plan::AlterSecret(_) => "alter secret",
422            Plan::AlterSink(_) => "alter sink",
423            Plan::AlterSystemSet(_) => "alter system",
424            Plan::AlterSystemReset(_) => "alter system",
425            Plan::AlterSystemResetAll(_) => "alter system",
426            Plan::AlterRole(_) => "alter role",
427            Plan::AlterNetworkPolicy(_) => "alter network policy",
428            Plan::AlterOwner(plan) => match plan.object_type {
429                ObjectType::Table => "alter table owner",
430                ObjectType::View => "alter view owner",
431                ObjectType::MaterializedView => "alter materialized view owner",
432                ObjectType::Source => "alter source owner",
433                ObjectType::Sink => "alter sink owner",
434                ObjectType::Index => "alter index owner",
435                ObjectType::Type => "alter type owner",
436                ObjectType::Role => "alter role owner",
437                ObjectType::Cluster => "alter cluster owner",
438                ObjectType::ClusterReplica => "alter cluster replica owner",
439                ObjectType::Secret => "alter secret owner",
440                ObjectType::Connection => "alter connection owner",
441                ObjectType::Database => "alter database owner",
442                ObjectType::Schema => "alter schema owner",
443                ObjectType::Func => "alter function owner",
444                ObjectType::ContinualTask => "alter continual task owner",
445                ObjectType::NetworkPolicy => "alter network policy owner",
446            },
447            Plan::AlterTableAddColumn(_) => "alter table add column",
448            Plan::Declare(_) => "declare",
449            Plan::Fetch(_) => "fetch",
450            Plan::Close(_) => "close",
451            Plan::ReadThenWrite(plan) => match plan.kind {
452                MutationKind::Insert => "insert into select",
453                MutationKind::Update => "update",
454                MutationKind::Delete => "delete",
455            },
456            Plan::Prepare(_) => "prepare",
457            Plan::Execute(_) => "execute",
458            Plan::Deallocate(_) => "deallocate",
459            Plan::Raise(_) => "raise",
460            Plan::GrantRole(_) => "grant role",
461            Plan::RevokeRole(_) => "revoke role",
462            Plan::GrantPrivileges(_) => "grant privilege",
463            Plan::RevokePrivileges(_) => "revoke privilege",
464            Plan::AlterDefaultPrivileges(_) => "alter default privileges",
465            Plan::ReassignOwned(_) => "reassign owned",
466            Plan::SideEffectingFunc(_) => "side effecting func",
467            Plan::ValidateConnection(_) => "validate connection",
468            Plan::AlterRetainHistory(_) => "alter retain history",
469        }
470    }
471
472    /// Returns `true` iff this `Plan` is allowed to be executed in read-only
473    /// mode.
474    ///
475    /// We use an explicit allow-list, to avoid future additions automatically
476    /// falling into the `true` category.
477    pub fn allowed_in_read_only(&self) -> bool {
478        match self {
479            // These two set non-durable session variables, so are okay in
480            // read-only mode.
481            Plan::SetVariable(_) => true,
482            Plan::ResetVariable(_) => true,
483            Plan::SetTransaction(_) => true,
484            Plan::StartTransaction(_) => true,
485            Plan::CommitTransaction(_) => true,
486            Plan::AbortTransaction(_) => true,
487            Plan::Select(_) => true,
488            Plan::EmptyQuery => true,
489            Plan::ShowAllVariables => true,
490            Plan::ShowCreate(_) => true,
491            Plan::ShowColumns(_) => true,
492            Plan::ShowVariable(_) => true,
493            Plan::InspectShard(_) => true,
494            Plan::Subscribe(_) => true,
495            Plan::CopyTo(_) => true,
496            Plan::ExplainPlan(_) => true,
497            Plan::ExplainPushdown(_) => true,
498            Plan::ExplainTimestamp(_) => true,
499            Plan::ExplainSinkSchema(_) => true,
500            Plan::ValidateConnection(_) => true,
501            _ => false,
502        }
503    }
504}
505
506#[derive(Debug)]
507pub struct StartTransactionPlan {
508    pub access: Option<TransactionAccessMode>,
509    pub isolation_level: Option<TransactionIsolationLevel>,
510}
511
512#[derive(Debug)]
513pub enum TransactionType {
514    Explicit,
515    Implicit,
516}
517
518impl TransactionType {
519    pub fn is_explicit(&self) -> bool {
520        matches!(self, TransactionType::Explicit)
521    }
522
523    pub fn is_implicit(&self) -> bool {
524        matches!(self, TransactionType::Implicit)
525    }
526}
527
528#[derive(Debug)]
529pub struct CommitTransactionPlan {
530    pub transaction_type: TransactionType,
531}
532
533#[derive(Debug)]
534pub struct AbortTransactionPlan {
535    pub transaction_type: TransactionType,
536}
537
538#[derive(Debug)]
539pub struct CreateDatabasePlan {
540    pub name: String,
541    pub if_not_exists: bool,
542}
543
544#[derive(Debug)]
545pub struct CreateSchemaPlan {
546    pub database_spec: ResolvedDatabaseSpecifier,
547    pub schema_name: String,
548    pub if_not_exists: bool,
549}
550
551#[derive(Debug)]
552pub struct CreateRolePlan {
553    pub name: String,
554    pub attributes: RoleAttributesRaw,
555}
556
557#[derive(Debug, PartialEq, Eq, Clone)]
558pub struct CreateClusterPlan {
559    pub name: String,
560    pub variant: CreateClusterVariant,
561    pub workload_class: Option<String>,
562}
563
564#[derive(Debug, PartialEq, Eq, Clone)]
565pub enum CreateClusterVariant {
566    Managed(CreateClusterManagedPlan),
567    Unmanaged(CreateClusterUnmanagedPlan),
568}
569
570#[derive(Debug, PartialEq, Eq, Clone)]
571pub struct CreateClusterUnmanagedPlan {
572    pub replicas: Vec<(String, ReplicaConfig)>,
573}
574
575#[derive(Debug, PartialEq, Eq, Clone)]
576pub struct CreateClusterManagedPlan {
577    pub replication_factor: u32,
578    pub size: String,
579    pub availability_zones: Vec<String>,
580    pub compute: ComputeReplicaConfig,
581    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
582    pub schedule: ClusterSchedule,
583}
584
585#[derive(Debug)]
586pub struct CreateClusterReplicaPlan {
587    pub cluster_id: ClusterId,
588    pub name: String,
589    pub config: ReplicaConfig,
590}
591
592/// Configuration of introspection for a cluster replica.
593#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq)]
594pub struct ComputeReplicaIntrospectionConfig {
595    /// Whether to introspect the introspection.
596    pub debugging: bool,
597    /// The interval at which to introspect.
598    pub interval: Duration,
599}
600
601#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
602pub struct ComputeReplicaConfig {
603    pub introspection: Option<ComputeReplicaIntrospectionConfig>,
604}
605
606#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
607pub enum ReplicaConfig {
608    Unorchestrated {
609        storagectl_addrs: Vec<String>,
610        computectl_addrs: Vec<String>,
611        compute: ComputeReplicaConfig,
612    },
613    Orchestrated {
614        size: String,
615        availability_zone: Option<String>,
616        compute: ComputeReplicaConfig,
617        internal: bool,
618        billed_as: Option<String>,
619    },
620}
621
622#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
623pub enum ClusterSchedule {
624    /// The system won't automatically turn the cluster On or Off.
625    Manual,
626    /// The cluster will be On when a REFRESH materialized view on it needs to refresh.
627    /// `hydration_time_estimate` determines how much time before a refresh to turn the
628    /// cluster On, so that it can rehydrate already before the refresh time.
629    Refresh { hydration_time_estimate: Duration },
630}
631
632impl Default for ClusterSchedule {
633    fn default() -> Self {
634        // (Has to be consistent with `impl Default for ClusterScheduleOptionValue`.)
635        ClusterSchedule::Manual
636    }
637}
638
639#[derive(Debug)]
640pub struct CreateSourcePlan {
641    pub name: QualifiedItemName,
642    pub source: Source,
643    pub if_not_exists: bool,
644    pub timeline: Timeline,
645    // None for subsources, which run on the parent cluster.
646    pub in_cluster: Option<ClusterId>,
647}
648
649#[derive(Clone, Debug, PartialEq, Eq)]
650pub struct SourceReferences {
651    pub updated_at: u64,
652    pub references: Vec<SourceReference>,
653}
654
655/// An available external reference for a source and if possible to retrieve,
656/// any column names it contains.
657#[derive(Clone, Debug, PartialEq, Eq)]
658pub struct SourceReference {
659    pub name: String,
660    pub namespace: Option<String>,
661    pub columns: Vec<String>,
662}
663
664/// A [`CreateSourcePlan`] and the metadata necessary to sequence it.
665#[derive(Debug)]
666pub struct CreateSourcePlanBundle {
667    /// ID of this source in the Catalog.
668    pub item_id: CatalogItemId,
669    /// ID used to reference this source from outside the catalog, e.g. compute.
670    pub global_id: GlobalId,
671    /// Details of the source to create.
672    pub plan: CreateSourcePlan,
673    /// Other catalog objects that are referenced by this source, determined at name resolution.
674    pub resolved_ids: ResolvedIds,
675    /// All the available upstream references for this source.
676    /// Populated for top-level sources that can contain subsources/tables
677    /// and used during sequencing to populate the appropriate catalog fields.
678    pub available_source_references: Option<SourceReferences>,
679}
680
681#[derive(Debug)]
682pub struct CreateConnectionPlan {
683    pub name: QualifiedItemName,
684    pub if_not_exists: bool,
685    pub connection: Connection,
686    pub validate: bool,
687}
688
689#[derive(Debug)]
690pub struct ValidateConnectionPlan {
691    /// ID of the connection in the Catalog.
692    pub id: CatalogItemId,
693    /// The connection to validate.
694    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
695}
696
697#[derive(Debug)]
698pub struct CreateSecretPlan {
699    pub name: QualifiedItemName,
700    pub secret: Secret,
701    pub if_not_exists: bool,
702}
703
704#[derive(Debug)]
705pub struct CreateSinkPlan {
706    pub name: QualifiedItemName,
707    pub sink: Sink,
708    pub with_snapshot: bool,
709    pub if_not_exists: bool,
710    pub in_cluster: ClusterId,
711}
712
713#[derive(Debug)]
714pub struct CreateTablePlan {
715    pub name: QualifiedItemName,
716    pub table: Table,
717    pub if_not_exists: bool,
718}
719
720#[derive(Debug, Clone)]
721pub struct CreateViewPlan {
722    pub name: QualifiedItemName,
723    pub view: View,
724    /// The Catalog objects that this view is replacing, if any.
725    pub replace: Option<CatalogItemId>,
726    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
727    pub drop_ids: Vec<CatalogItemId>,
728    pub if_not_exists: bool,
729    /// True if the view contains an expression that can make the exact column list
730    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
731    pub ambiguous_columns: bool,
732}
733
734#[derive(Debug, Clone)]
735pub struct CreateMaterializedViewPlan {
736    pub name: QualifiedItemName,
737    pub materialized_view: MaterializedView,
738    /// The Catalog objects that this materialized view is replacing, if any.
739    pub replace: Option<CatalogItemId>,
740    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
741    pub drop_ids: Vec<CatalogItemId>,
742    pub if_not_exists: bool,
743    /// True if the materialized view contains an expression that can make the exact column list
744    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
745    pub ambiguous_columns: bool,
746}
747
748#[derive(Debug, Clone)]
749pub struct CreateContinualTaskPlan {
750    pub name: QualifiedItemName,
751    /// During initial creation, the `LocalId` placeholder for this CT in `continual_task.expr`.
752    /// None on restart.
753    pub placeholder_id: Option<mz_expr::LocalId>,
754    pub desc: RelationDesc,
755    /// ID of the collection we read into this continual task.
756    pub input_id: GlobalId,
757    pub with_snapshot: bool,
758    /// Definition for the continual task.
759    pub continual_task: MaterializedView,
760}
761
762#[derive(Debug, Clone)]
763pub struct CreateNetworkPolicyPlan {
764    pub name: String,
765    pub rules: Vec<NetworkPolicyRule>,
766}
767
768#[derive(Debug, Clone)]
769pub struct AlterNetworkPolicyPlan {
770    pub id: NetworkPolicyId,
771    pub name: String,
772    pub rules: Vec<NetworkPolicyRule>,
773}
774
775#[derive(Debug, Clone)]
776pub struct CreateIndexPlan {
777    pub name: QualifiedItemName,
778    pub index: Index,
779    pub if_not_exists: bool,
780}
781
782#[derive(Debug)]
783pub struct CreateTypePlan {
784    pub name: QualifiedItemName,
785    pub typ: Type,
786}
787
788#[derive(Debug)]
789pub struct DropObjectsPlan {
790    /// The IDs of only the objects directly referenced in the `DROP` statement.
791    pub referenced_ids: Vec<ObjectId>,
792    /// All object IDs to drop. Includes `referenced_ids` and all descendants.
793    pub drop_ids: Vec<ObjectId>,
794    /// The type of object that was dropped explicitly in the DROP statement. `ids` may contain
795    /// objects of different types due to CASCADE.
796    pub object_type: ObjectType,
797}
798
799#[derive(Debug)]
800pub struct DropOwnedPlan {
801    /// The role IDs that own the objects.
802    pub role_ids: Vec<RoleId>,
803    /// All object IDs to drop.
804    pub drop_ids: Vec<ObjectId>,
805    /// The privileges to revoke.
806    pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
807    /// The default privileges to revoke.
808    pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
809}
810
811#[derive(Debug)]
812pub struct ShowVariablePlan {
813    pub name: String,
814}
815
816#[derive(Debug)]
817pub struct InspectShardPlan {
818    /// ID of the storage collection to inspect.
819    pub id: GlobalId,
820}
821
822#[derive(Debug)]
823pub struct SetVariablePlan {
824    pub name: String,
825    pub value: VariableValue,
826    pub local: bool,
827}
828
829#[derive(Debug)]
830pub enum VariableValue {
831    Default,
832    Values(Vec<String>),
833}
834
835#[derive(Debug)]
836pub struct ResetVariablePlan {
837    pub name: String,
838}
839
840#[derive(Debug)]
841pub struct SetTransactionPlan {
842    pub local: bool,
843    pub modes: Vec<TransactionMode>,
844}
845
846/// A plan for select statements.
847#[derive(Clone, Debug)]
848pub struct SelectPlan {
849    /// The `SELECT` statement itself. Used for explain/notices, but not otherwise
850    /// load-bearing. Boxed to save stack space.
851    pub select: Option<Box<SelectStatement<Aug>>>,
852    /// The plan as a HIR.
853    pub source: HirRelationExpr,
854    /// At what time should this select happen?
855    pub when: QueryWhen,
856    /// Instructions how to form the result set.
857    pub finishing: RowSetFinishing,
858    /// For `COPY TO`, the format to use.
859    pub copy_to: Option<CopyFormat>,
860}
861
862impl SelectPlan {
863    pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
864        let arity = typ.arity();
865        SelectPlan {
866            select: None,
867            source: HirRelationExpr::Constant { rows, typ },
868            when: QueryWhen::Immediately,
869            finishing: RowSetFinishing::trivial(arity),
870            copy_to: None,
871        }
872    }
873}
874
875#[derive(Debug)]
876pub enum SubscribeOutput {
877    Diffs,
878    WithinTimestampOrderBy {
879        /// We pretend that mz_diff is prepended to the normal columns, making it index 0
880        order_by: Vec<ColumnOrder>,
881    },
882    EnvelopeUpsert {
883        /// Order by with just keys
884        order_by_keys: Vec<ColumnOrder>,
885    },
886    EnvelopeDebezium {
887        /// Order by with just keys
888        order_by_keys: Vec<ColumnOrder>,
889    },
890}
891
892#[derive(Debug)]
893pub struct SubscribePlan {
894    pub from: SubscribeFrom,
895    pub with_snapshot: bool,
896    pub when: QueryWhen,
897    pub up_to: Option<Timestamp>,
898    pub copy_to: Option<CopyFormat>,
899    pub emit_progress: bool,
900    pub output: SubscribeOutput,
901}
902
903#[derive(Debug, Clone)]
904pub enum SubscribeFrom {
905    /// ID of the collection to subscribe to.
906    Id(GlobalId),
907    /// Query to subscribe to.
908    Query {
909        expr: MirRelationExpr,
910        desc: RelationDesc,
911    },
912}
913
914impl SubscribeFrom {
915    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
916        match self {
917            SubscribeFrom::Id(id) => BTreeSet::from([*id]),
918            SubscribeFrom::Query { expr, .. } => expr.depends_on(),
919        }
920    }
921
922    pub fn contains_temporal(&self) -> bool {
923        match self {
924            SubscribeFrom::Id(_) => false,
925            SubscribeFrom::Query { expr, .. } => expr.contains_temporal(),
926        }
927    }
928}
929
930#[derive(Debug)]
931pub struct ShowCreatePlan {
932    pub id: ObjectId,
933    pub row: Row,
934}
935
936#[derive(Debug)]
937pub struct ShowColumnsPlan {
938    pub id: CatalogItemId,
939    pub select_plan: SelectPlan,
940    pub new_resolved_ids: ResolvedIds,
941}
942
943#[derive(Debug)]
944pub struct CopyFromPlan {
945    /// Table we're copying into.
946    pub target_id: CatalogItemId,
947    /// Human-readable full name of the target table.
948    pub target_name: String,
949    /// Source we're copying data from.
950    pub source: CopyFromSource,
951    /// How input columns map to those on the destination table.
952    ///
953    /// TODO(cf2): Remove this field in favor of the mfp.
954    pub columns: Vec<ColumnIndex>,
955    /// [`RelationDesc`] describing the input data.
956    pub source_desc: RelationDesc,
957    /// Changes the shape of the input data to match the destination table.
958    pub mfp: MapFilterProject,
959    /// Format specific params for copying the input data.
960    pub params: CopyFormatParams<'static>,
961    /// Filter for the source files we're copying from, e.g. an S3 prefix.
962    pub filter: Option<CopyFromFilter>,
963}
964
965#[derive(Debug)]
966pub enum CopyFromSource {
967    /// Copying from a file local to the user, transmitted via pgwire.
968    Stdin,
969    /// A remote resource, e.g. HTTP file.
970    ///
971    /// The contained [`HirScalarExpr`] evaluates to the Url for the remote resource.
972    Url(HirScalarExpr),
973    /// A file in an S3 bucket.
974    AwsS3 {
975        /// Expression that evaluates to the file we want to copy.
976        uri: HirScalarExpr,
977        /// Details for how we connect to AWS S3.
978        connection: AwsConnection,
979        /// ID of the connection object.
980        connection_id: CatalogItemId,
981    },
982}
983
984#[derive(Debug)]
985pub enum CopyFromFilter {
986    Files(Vec<String>),
987    Pattern(String),
988}
989
990#[derive(Debug, Clone)]
991pub struct CopyToPlan {
992    /// The select query plan whose data will be copied to destination uri.
993    pub select_plan: SelectPlan,
994    pub desc: RelationDesc,
995    /// The scalar expression to be resolved to get the destination uri.
996    pub to: HirScalarExpr,
997    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
998    /// The ID of the connection.
999    pub connection_id: CatalogItemId,
1000    pub format: S3SinkFormat,
1001    pub max_file_size: u64,
1002}
1003
1004#[derive(Clone, Debug)]
1005pub struct ExplainPlanPlan {
1006    pub stage: ExplainStage,
1007    pub format: ExplainFormat,
1008    pub config: ExplainConfig,
1009    pub explainee: Explainee,
1010}
1011
1012/// The type of object to be explained
1013#[derive(Clone, Debug)]
1014pub enum Explainee {
1015    /// Lookup and explain a plan saved for an view.
1016    View(CatalogItemId),
1017    /// Lookup and explain a plan saved for an existing materialized view.
1018    MaterializedView(CatalogItemId),
1019    /// Lookup and explain a plan saved for an existing index.
1020    Index(CatalogItemId),
1021    /// Replan an existing view.
1022    ReplanView(CatalogItemId),
1023    /// Replan an existing materialized view.
1024    ReplanMaterializedView(CatalogItemId),
1025    /// Replan an existing index.
1026    ReplanIndex(CatalogItemId),
1027    /// A SQL statement.
1028    Statement(ExplaineeStatement),
1029}
1030
1031/// Explainee types that are statements.
1032#[derive(Clone, Debug, EnumKind)]
1033#[enum_kind(ExplaineeStatementKind)]
1034pub enum ExplaineeStatement {
1035    /// The object to be explained is a SELECT statement.
1036    Select {
1037        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1038        broken: bool,
1039        plan: plan::SelectPlan,
1040        desc: RelationDesc,
1041    },
1042    /// The object to be explained is a CREATE VIEW.
1043    CreateView {
1044        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1045        broken: bool,
1046        plan: plan::CreateViewPlan,
1047    },
1048    /// The object to be explained is a CREATE MATERIALIZED VIEW.
1049    CreateMaterializedView {
1050        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1051        broken: bool,
1052        plan: plan::CreateMaterializedViewPlan,
1053    },
1054    /// The object to be explained is a CREATE INDEX.
1055    CreateIndex {
1056        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1057        broken: bool,
1058        plan: plan::CreateIndexPlan,
1059    },
1060}
1061
1062impl ExplaineeStatement {
1063    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1064        match self {
1065            Self::Select { plan, .. } => plan.source.depends_on(),
1066            Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1067            Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1068            Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1069        }
1070    }
1071
1072    /// Statements that have their `broken` flag set are expected to cause a
1073    /// panic in the optimizer code. In this case:
1074    ///
1075    /// 1. The optimizer pipeline execution will stop, but the panic will be
1076    ///    intercepted and will not propagate to the caller. The partial
1077    ///    optimizer trace collected until this point will be available.
1078    /// 2. The optimizer trace tracing subscriber will delegate regular tracing
1079    ///    spans and events to the default subscriber.
1080    ///
1081    /// This is useful when debugging queries that cause panics.
1082    pub fn broken(&self) -> bool {
1083        match self {
1084            Self::Select { broken, .. } => *broken,
1085            Self::CreateView { broken, .. } => *broken,
1086            Self::CreateMaterializedView { broken, .. } => *broken,
1087            Self::CreateIndex { broken, .. } => *broken,
1088        }
1089    }
1090}
1091
1092impl ExplaineeStatementKind {
1093    pub fn supports(&self, stage: &ExplainStage) -> bool {
1094        use ExplainStage::*;
1095        match self {
1096            Self::Select => true,
1097            Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1098            Self::CreateMaterializedView => true,
1099            Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1100        }
1101    }
1102}
1103
1104impl std::fmt::Display for ExplaineeStatementKind {
1105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1106        match self {
1107            Self::Select => write!(f, "SELECT"),
1108            Self::CreateView => write!(f, "CREATE VIEW"),
1109            Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1110            Self::CreateIndex => write!(f, "CREATE INDEX"),
1111        }
1112    }
1113}
1114
1115#[derive(Clone, Debug)]
1116pub struct ExplainPushdownPlan {
1117    pub explainee: Explainee,
1118}
1119
1120#[derive(Clone, Debug)]
1121pub struct ExplainTimestampPlan {
1122    pub format: ExplainFormat,
1123    pub raw_plan: HirRelationExpr,
1124    pub when: QueryWhen,
1125}
1126
1127#[derive(Debug)]
1128pub struct ExplainSinkSchemaPlan {
1129    pub sink_from: GlobalId,
1130    pub json_schema: String,
1131}
1132
1133#[derive(Debug)]
1134pub struct SendDiffsPlan {
1135    pub id: CatalogItemId,
1136    pub updates: Vec<(Row, Diff)>,
1137    pub kind: MutationKind,
1138    pub returning: Vec<(Row, NonZeroUsize)>,
1139    pub max_result_size: u64,
1140}
1141
1142#[derive(Debug)]
1143pub struct InsertPlan {
1144    pub id: CatalogItemId,
1145    pub values: HirRelationExpr,
1146    pub returning: Vec<mz_expr::MirScalarExpr>,
1147}
1148
1149#[derive(Debug)]
1150pub struct ReadThenWritePlan {
1151    pub id: CatalogItemId,
1152    pub selection: HirRelationExpr,
1153    pub finishing: RowSetFinishing,
1154    pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1155    pub kind: MutationKind,
1156    pub returning: Vec<mz_expr::MirScalarExpr>,
1157}
1158
1159/// Generated by `ALTER ... IF EXISTS` if the named object did not exist.
1160#[derive(Debug)]
1161pub struct AlterNoopPlan {
1162    pub object_type: ObjectType,
1163}
1164
1165#[derive(Debug)]
1166pub struct AlterSetClusterPlan {
1167    pub id: CatalogItemId,
1168    pub set_cluster: ClusterId,
1169}
1170
1171#[derive(Debug)]
1172pub struct AlterRetainHistoryPlan {
1173    pub id: CatalogItemId,
1174    pub value: Option<Value>,
1175    pub window: CompactionWindow,
1176    pub object_type: ObjectType,
1177}
1178
1179#[derive(Debug, Clone)]
1180
1181pub enum AlterOptionParameter<T = String> {
1182    Set(T),
1183    Reset,
1184    Unchanged,
1185}
1186
1187#[derive(Debug)]
1188pub enum AlterConnectionAction {
1189    RotateKeys,
1190    AlterOptions {
1191        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1192        drop_options: BTreeSet<ConnectionOptionName>,
1193        validate: bool,
1194    },
1195}
1196
1197#[derive(Debug)]
1198pub struct AlterConnectionPlan {
1199    pub id: CatalogItemId,
1200    pub action: AlterConnectionAction,
1201}
1202
1203#[derive(Debug)]
1204pub enum AlterSourceAction {
1205    AddSubsourceExports {
1206        subsources: Vec<CreateSourcePlanBundle>,
1207        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1208    },
1209    RefreshReferences {
1210        references: SourceReferences,
1211    },
1212}
1213
1214#[derive(Debug)]
1215pub struct AlterSourcePlan {
1216    pub item_id: CatalogItemId,
1217    pub ingestion_id: GlobalId,
1218    pub action: AlterSourceAction,
1219}
1220
1221#[derive(Debug, Clone)]
1222pub struct AlterSinkPlan {
1223    pub item_id: CatalogItemId,
1224    pub global_id: GlobalId,
1225    pub sink: Sink,
1226    pub with_snapshot: bool,
1227    pub in_cluster: ClusterId,
1228}
1229
1230#[derive(Debug, Clone)]
1231pub struct AlterClusterPlan {
1232    pub id: ClusterId,
1233    pub name: String,
1234    pub options: PlanClusterOption,
1235    pub strategy: AlterClusterPlanStrategy,
1236}
1237
1238#[derive(Debug)]
1239pub struct AlterClusterRenamePlan {
1240    pub id: ClusterId,
1241    pub name: String,
1242    pub to_name: String,
1243}
1244
1245#[derive(Debug)]
1246pub struct AlterClusterReplicaRenamePlan {
1247    pub cluster_id: ClusterId,
1248    pub replica_id: ReplicaId,
1249    pub name: QualifiedReplica,
1250    pub to_name: String,
1251}
1252
1253#[derive(Debug)]
1254pub struct AlterItemRenamePlan {
1255    pub id: CatalogItemId,
1256    pub current_full_name: FullItemName,
1257    pub to_name: String,
1258    pub object_type: ObjectType,
1259}
1260
1261#[derive(Debug)]
1262pub struct AlterSchemaRenamePlan {
1263    pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1264    pub new_schema_name: String,
1265}
1266
1267#[derive(Debug)]
1268pub struct AlterSchemaSwapPlan {
1269    pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1270    pub schema_a_name: String,
1271    pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1272    pub schema_b_name: String,
1273    pub name_temp: String,
1274}
1275
1276#[derive(Debug)]
1277pub struct AlterClusterSwapPlan {
1278    pub id_a: ClusterId,
1279    pub id_b: ClusterId,
1280    pub name_a: String,
1281    pub name_b: String,
1282    pub name_temp: String,
1283}
1284
1285#[derive(Debug)]
1286pub struct AlterSecretPlan {
1287    pub id: CatalogItemId,
1288    pub secret_as: MirScalarExpr,
1289}
1290
1291#[derive(Debug)]
1292pub struct AlterSystemSetPlan {
1293    pub name: String,
1294    pub value: VariableValue,
1295}
1296
1297#[derive(Debug)]
1298pub struct AlterSystemResetPlan {
1299    pub name: String,
1300}
1301
1302#[derive(Debug)]
1303pub struct AlterSystemResetAllPlan {}
1304
1305#[derive(Debug)]
1306pub struct AlterRolePlan {
1307    pub id: RoleId,
1308    pub name: String,
1309    pub option: PlannedAlterRoleOption,
1310}
1311
1312#[derive(Debug)]
1313pub struct AlterOwnerPlan {
1314    pub id: ObjectId,
1315    pub object_type: ObjectType,
1316    pub new_owner: RoleId,
1317}
1318
1319#[derive(Debug)]
1320pub struct AlterTablePlan {
1321    pub relation_id: CatalogItemId,
1322    pub column_name: ColumnName,
1323    pub column_type: SqlColumnType,
1324    pub raw_sql_type: RawDataType,
1325}
1326
1327#[derive(Debug)]
1328pub struct DeclarePlan {
1329    pub name: String,
1330    pub stmt: Statement<Raw>,
1331    pub sql: String,
1332    pub params: Params,
1333}
1334
1335#[derive(Debug)]
1336pub struct FetchPlan {
1337    pub name: String,
1338    pub count: Option<FetchDirection>,
1339    pub timeout: ExecuteTimeout,
1340}
1341
1342#[derive(Debug)]
1343pub struct ClosePlan {
1344    pub name: String,
1345}
1346
1347#[derive(Debug)]
1348pub struct PreparePlan {
1349    pub name: String,
1350    pub stmt: Statement<Raw>,
1351    pub sql: String,
1352    pub desc: StatementDesc,
1353}
1354
1355#[derive(Debug)]
1356pub struct ExecutePlan {
1357    pub name: String,
1358    pub params: Params,
1359}
1360
1361#[derive(Debug)]
1362pub struct DeallocatePlan {
1363    pub name: Option<String>,
1364}
1365
1366#[derive(Debug)]
1367pub struct RaisePlan {
1368    pub severity: NoticeSeverity,
1369}
1370
1371#[derive(Debug)]
1372pub struct GrantRolePlan {
1373    /// The roles that are gaining members.
1374    pub role_ids: Vec<RoleId>,
1375    /// The roles that will be added to `role_id`.
1376    pub member_ids: Vec<RoleId>,
1377    /// The role that granted the membership.
1378    pub grantor_id: RoleId,
1379}
1380
1381#[derive(Debug)]
1382pub struct RevokeRolePlan {
1383    /// The roles that are losing members.
1384    pub role_ids: Vec<RoleId>,
1385    /// The roles that will be removed from `role_id`.
1386    pub member_ids: Vec<RoleId>,
1387    /// The role that revoked the membership.
1388    pub grantor_id: RoleId,
1389}
1390
1391#[derive(Debug)]
1392pub struct UpdatePrivilege {
1393    /// The privileges being granted/revoked on an object.
1394    pub acl_mode: AclMode,
1395    /// The ID of the object receiving privileges.
1396    pub target_id: SystemObjectId,
1397    /// The role that is granting the privileges.
1398    pub grantor: RoleId,
1399}
1400
1401#[derive(Debug)]
1402pub struct GrantPrivilegesPlan {
1403    /// Description of each privilege being granted.
1404    pub update_privileges: Vec<UpdatePrivilege>,
1405    /// The roles that will granted the privileges.
1406    pub grantees: Vec<RoleId>,
1407}
1408
1409#[derive(Debug)]
1410pub struct RevokePrivilegesPlan {
1411    /// Description of each privilege being revoked.
1412    pub update_privileges: Vec<UpdatePrivilege>,
1413    /// The roles that will have privileges revoked.
1414    pub revokees: Vec<RoleId>,
1415}
1416#[derive(Debug)]
1417pub struct AlterDefaultPrivilegesPlan {
1418    /// Description of objects that match this default privilege.
1419    pub privilege_objects: Vec<DefaultPrivilegeObject>,
1420    /// The privilege to be granted/revoked from the matching objects.
1421    pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1422    /// Whether this is a grant or revoke.
1423    pub is_grant: bool,
1424}
1425
1426#[derive(Debug)]
1427pub struct ReassignOwnedPlan {
1428    /// The roles whose owned objects are being reassigned.
1429    pub old_roles: Vec<RoleId>,
1430    /// The new owner of the objects.
1431    pub new_role: RoleId,
1432    /// All object IDs to reassign.
1433    pub reassign_ids: Vec<ObjectId>,
1434}
1435
1436#[derive(Debug)]
1437pub struct CommentPlan {
1438    /// The object that this comment is associated with.
1439    pub object_id: CommentObjectId,
1440    /// A sub-component of the object that this comment is associated with, e.g. a column.
1441    ///
1442    /// TODO(parkmycar): <https://github.com/MaterializeInc/database-issues/issues/6711>.
1443    pub sub_component: Option<usize>,
1444    /// The comment itself. If `None` that indicates we should clear the existing comment.
1445    pub comment: Option<String>,
1446}
1447
1448#[derive(Clone, Debug)]
1449pub enum TableDataSource {
1450    /// The table owns data created via INSERT/UPDATE/DELETE statements.
1451    TableWrites { defaults: Vec<Expr<Aug>> },
1452
1453    /// The table receives its data from the identified `DataSourceDesc`.
1454    /// This table type does not support INSERT/UPDATE/DELETE statements.
1455    DataSource {
1456        desc: DataSourceDesc,
1457        timeline: Timeline,
1458    },
1459}
1460
1461#[derive(Clone, Debug)]
1462pub struct Table {
1463    pub create_sql: String,
1464    pub desc: VersionedRelationDesc,
1465    pub temporary: bool,
1466    pub compaction_window: Option<CompactionWindow>,
1467    pub data_source: TableDataSource,
1468}
1469
1470#[derive(Clone, Debug)]
1471pub struct Source {
1472    pub create_sql: String,
1473    pub data_source: DataSourceDesc,
1474    pub desc: RelationDesc,
1475    pub compaction_window: Option<CompactionWindow>,
1476}
1477
1478#[derive(Debug, Clone)]
1479pub enum DataSourceDesc {
1480    /// Receives data from an external system.
1481    Ingestion(SourceDesc<ReferencedConnection>),
1482    /// Receives data from an external system.
1483    OldSyntaxIngestion {
1484        desc: SourceDesc<ReferencedConnection>,
1485        // If we're dealing with an old syntax ingestion the progress id will be some other collection
1486        // and the ingestion itself will have the data from a default external reference
1487        progress_subsource: CatalogItemId,
1488        data_config: SourceExportDataConfig<ReferencedConnection>,
1489        details: SourceExportDetails,
1490    },
1491    /// This source receives its data from the identified ingestion,
1492    /// specifically the output identified by `external_reference`.
1493    IngestionExport {
1494        ingestion_id: CatalogItemId,
1495        external_reference: UnresolvedItemName,
1496        details: SourceExportDetails,
1497        data_config: SourceExportDataConfig<ReferencedConnection>,
1498    },
1499    /// Receives data from the source's reclocking/remapping operations.
1500    Progress,
1501    /// Receives data from HTTP post requests.
1502    Webhook {
1503        validate_using: Option<WebhookValidation>,
1504        body_format: WebhookBodyFormat,
1505        headers: WebhookHeaders,
1506        /// Only `Some` when created via `CREATE TABLE ... FROM WEBHOOK`.
1507        cluster_id: Option<StorageInstanceId>,
1508    },
1509}
1510
1511#[derive(Clone, Debug, Serialize)]
1512pub struct WebhookValidation {
1513    /// The expression used to validate a request.
1514    pub expression: MirScalarExpr,
1515    /// Description of the source that will be created.
1516    pub relation_desc: RelationDesc,
1517    /// The column index to provide the request body and whether to provide it as bytes.
1518    pub bodies: Vec<(usize, bool)>,
1519    /// The column index to provide the request headers and whether to provide the values as bytes.
1520    pub headers: Vec<(usize, bool)>,
1521    /// Any secrets that are used in that validation.
1522    pub secrets: Vec<WebhookValidationSecret>,
1523}
1524
1525impl WebhookValidation {
1526    const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1527
1528    /// Attempt to reduce the internal [`MirScalarExpr`] into a simpler expression.
1529    ///
1530    /// The reduction happens on a separate thread, we also only wait for
1531    /// `WebhookValidation::MAX_REDUCE_TIME` before timing out and returning an error.
1532    pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1533        let WebhookValidation {
1534            expression,
1535            relation_desc,
1536            ..
1537        } = self;
1538
1539        // On a different thread, attempt to reduce the expression.
1540        let mut expression_ = expression.clone();
1541        let desc_ = relation_desc.clone();
1542        let reduce_task = mz_ore::task::spawn_blocking(
1543            || "webhook-validation-reduce",
1544            move || {
1545                expression_.reduce(&desc_.typ().column_types);
1546                expression_
1547            },
1548        );
1549
1550        match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1551            Ok(Ok(reduced_expr)) => {
1552                *expression = reduced_expr;
1553                Ok(())
1554            }
1555            Ok(Err(_)) => Err("joining task"),
1556            Err(_) => Err("timeout"),
1557        }
1558    }
1559}
1560
1561#[derive(Clone, Debug, Default, Serialize)]
1562pub struct WebhookHeaders {
1563    /// Optionally include a column named `headers` whose content is possibly filtered.
1564    pub header_column: Option<WebhookHeaderFilters>,
1565    /// The column index to provide the specific request header, and whether to provide it as bytes.
1566    pub mapped_headers: BTreeMap<usize, (String, bool)>,
1567}
1568
1569impl WebhookHeaders {
1570    /// Returns the number of columns needed to represent our headers.
1571    pub fn num_columns(&self) -> usize {
1572        let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1573        let mapped_headers = self.mapped_headers.len();
1574
1575        header_column + mapped_headers
1576    }
1577}
1578
1579#[derive(Clone, Debug, Default, Serialize)]
1580pub struct WebhookHeaderFilters {
1581    pub block: BTreeSet<String>,
1582    pub allow: BTreeSet<String>,
1583}
1584
1585#[derive(Copy, Clone, Debug, Serialize, Arbitrary)]
1586pub enum WebhookBodyFormat {
1587    Json { array: bool },
1588    Bytes,
1589    Text,
1590}
1591
1592impl From<WebhookBodyFormat> for SqlScalarType {
1593    fn from(value: WebhookBodyFormat) -> Self {
1594        match value {
1595            WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1596            WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1597            WebhookBodyFormat::Text => SqlScalarType::String,
1598        }
1599    }
1600}
1601
1602#[derive(Clone, Debug, Serialize)]
1603pub struct WebhookValidationSecret {
1604    /// Identifies the secret by [`CatalogItemId`].
1605    pub id: CatalogItemId,
1606    /// Column index for the expression context that this secret was originally evaluated in.
1607    pub column_idx: usize,
1608    /// Whether or not this secret should be provided to the expression as Bytes or a String.
1609    pub use_bytes: bool,
1610}
1611
1612#[derive(Clone, Debug)]
1613pub struct Connection {
1614    pub create_sql: String,
1615    pub details: ConnectionDetails,
1616}
1617
1618#[derive(Clone, Debug, Serialize)]
1619pub enum ConnectionDetails {
1620    Kafka(KafkaConnection<ReferencedConnection>),
1621    Csr(CsrConnection<ReferencedConnection>),
1622    Postgres(PostgresConnection<ReferencedConnection>),
1623    Ssh {
1624        connection: SshConnection,
1625        key_1: SshKey,
1626        key_2: SshKey,
1627    },
1628    Aws(AwsConnection),
1629    AwsPrivatelink(AwsPrivatelinkConnection),
1630    MySql(MySqlConnection<ReferencedConnection>),
1631    SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1632    IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1633}
1634
1635impl ConnectionDetails {
1636    pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1637        match self {
1638            ConnectionDetails::Kafka(c) => {
1639                mz_storage_types::connections::Connection::Kafka(c.clone())
1640            }
1641            ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1642            ConnectionDetails::Postgres(c) => {
1643                mz_storage_types::connections::Connection::Postgres(c.clone())
1644            }
1645            ConnectionDetails::Ssh { connection, .. } => {
1646                mz_storage_types::connections::Connection::Ssh(connection.clone())
1647            }
1648            ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1649            ConnectionDetails::AwsPrivatelink(c) => {
1650                mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1651            }
1652            ConnectionDetails::MySql(c) => {
1653                mz_storage_types::connections::Connection::MySql(c.clone())
1654            }
1655            ConnectionDetails::SqlServer(c) => {
1656                mz_storage_types::connections::Connection::SqlServer(c.clone())
1657            }
1658            ConnectionDetails::IcebergCatalog(c) => {
1659                mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1660            }
1661        }
1662    }
1663}
1664
1665#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1666pub struct NetworkPolicyRule {
1667    pub name: String,
1668    pub action: NetworkPolicyRuleAction,
1669    pub address: PolicyAddress,
1670    pub direction: NetworkPolicyRuleDirection,
1671}
1672
1673#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1674pub enum NetworkPolicyRuleAction {
1675    Allow,
1676}
1677
1678impl std::fmt::Display for NetworkPolicyRuleAction {
1679    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1680        match self {
1681            Self::Allow => write!(f, "allow"),
1682        }
1683    }
1684}
1685impl TryFrom<&str> for NetworkPolicyRuleAction {
1686    type Error = PlanError;
1687    fn try_from(value: &str) -> Result<Self, Self::Error> {
1688        match value.to_uppercase().as_str() {
1689            "ALLOW" => Ok(Self::Allow),
1690            _ => Err(PlanError::Unstructured(
1691                "Allow is the only valid option".into(),
1692            )),
1693        }
1694    }
1695}
1696
1697#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1698pub enum NetworkPolicyRuleDirection {
1699    Ingress,
1700}
1701impl std::fmt::Display for NetworkPolicyRuleDirection {
1702    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1703        match self {
1704            Self::Ingress => write!(f, "ingress"),
1705        }
1706    }
1707}
1708impl TryFrom<&str> for NetworkPolicyRuleDirection {
1709    type Error = PlanError;
1710    fn try_from(value: &str) -> Result<Self, Self::Error> {
1711        match value.to_uppercase().as_str() {
1712            "INGRESS" => Ok(Self::Ingress),
1713            _ => Err(PlanError::Unstructured(
1714                "Ingress is the only valid option".into(),
1715            )),
1716        }
1717    }
1718}
1719
1720#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1721pub struct PolicyAddress(pub IpNet);
1722impl std::fmt::Display for PolicyAddress {
1723    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1724        write!(f, "{}", &self.0.to_string())
1725    }
1726}
1727impl From<String> for PolicyAddress {
1728    fn from(value: String) -> Self {
1729        Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1730    }
1731}
1732impl TryFrom<&str> for PolicyAddress {
1733    type Error = PlanError;
1734    fn try_from(value: &str) -> Result<Self, Self::Error> {
1735        let net = IpNet::from_str(value)
1736            .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1737        Ok(Self(net))
1738    }
1739}
1740
1741impl Serialize for PolicyAddress {
1742    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1743    where
1744        S: serde::Serializer,
1745    {
1746        serializer.serialize_str(&format!("{}", &self.0))
1747    }
1748}
1749
1750#[derive(Clone, Debug, Serialize)]
1751pub enum SshKey {
1752    PublicOnly(String),
1753    Both(SshKeyPair),
1754}
1755
1756impl SshKey {
1757    pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1758        match self {
1759            SshKey::PublicOnly(_) => None,
1760            SshKey::Both(key_pair) => Some(key_pair),
1761        }
1762    }
1763
1764    pub fn public_key(&self) -> String {
1765        match self {
1766            SshKey::PublicOnly(s) => s.into(),
1767            SshKey::Both(p) => p.ssh_public_key(),
1768        }
1769    }
1770}
1771
1772#[derive(Clone, Debug)]
1773pub struct Secret {
1774    pub create_sql: String,
1775    pub secret_as: MirScalarExpr,
1776}
1777
1778#[derive(Clone, Debug)]
1779pub struct Sink {
1780    /// Parse-able SQL that is stored durably and defines this sink.
1781    pub create_sql: String,
1782    /// Collection we read into this sink.
1783    pub from: GlobalId,
1784    /// Type of connection to the external service we sink into.
1785    pub connection: StorageSinkConnection<ReferencedConnection>,
1786    // TODO(guswynn): this probably should just be in the `connection`.
1787    pub envelope: SinkEnvelope,
1788    pub version: u64,
1789}
1790
1791#[derive(Clone, Debug)]
1792pub struct View {
1793    /// Parse-able SQL that is stored durably and defines this view.
1794    pub create_sql: String,
1795    /// Unoptimized high-level expression from parsing the `create_sql`.
1796    pub expr: HirRelationExpr,
1797    /// All of the catalog objects that are referenced by this view, according to the `expr`.
1798    pub dependencies: DependencyIds,
1799    /// Columns of this view.
1800    pub column_names: Vec<ColumnName>,
1801    /// If this view is created in the temporary schema, e.g. `CREATE TEMPORARY ...`.
1802    pub temporary: bool,
1803}
1804
1805#[derive(Clone, Debug)]
1806pub struct MaterializedView {
1807    /// Parse-able SQL that is stored durably and defines this materialized view.
1808    pub create_sql: String,
1809    /// Unoptimized high-level expression from parsing the `create_sql`.
1810    pub expr: HirRelationExpr,
1811    /// All of the catalog objects that are referenced by this materialized view, according to the `expr`.
1812    pub dependencies: DependencyIds,
1813    /// Columns of this view.
1814    pub column_names: Vec<ColumnName>,
1815    /// Cluster this materialized view will get installed on.
1816    pub cluster_id: ClusterId,
1817    pub non_null_assertions: Vec<usize>,
1818    pub compaction_window: Option<CompactionWindow>,
1819    pub refresh_schedule: Option<RefreshSchedule>,
1820    pub as_of: Option<Timestamp>,
1821}
1822
1823#[derive(Clone, Debug)]
1824pub struct Index {
1825    /// Parse-able SQL that is stored durably and defines this index.
1826    pub create_sql: String,
1827    /// Collection this index is on top of.
1828    pub on: GlobalId,
1829    pub keys: Vec<mz_expr::MirScalarExpr>,
1830    pub compaction_window: Option<CompactionWindow>,
1831    pub cluster_id: ClusterId,
1832}
1833
1834#[derive(Clone, Debug)]
1835pub struct Type {
1836    pub create_sql: String,
1837    pub inner: CatalogType<IdReference>,
1838}
1839
1840/// Specifies when a `Peek` or `Subscribe` should occur.
1841#[derive(Deserialize, Clone, Debug, PartialEq)]
1842pub enum QueryWhen {
1843    /// The peek should occur at the latest possible timestamp that allows the
1844    /// peek to complete immediately.
1845    Immediately,
1846    /// The peek should occur at a timestamp that allows the peek to see all
1847    /// data written to tables within Materialize.
1848    FreshestTableWrite,
1849    /// The peek should occur at the timestamp described by the specified
1850    /// expression.
1851    ///
1852    /// The expression may have any type.
1853    AtTimestamp(Timestamp),
1854    /// Same as Immediately, but will also advance to at least the specified
1855    /// expression.
1856    AtLeastTimestamp(Timestamp),
1857}
1858
1859impl QueryWhen {
1860    /// Returns a timestamp to which the candidate must be advanced.
1861    pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1862        match self {
1863            QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1864            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1865        }
1866    }
1867    /// Returns whether the candidate's upper bound is constrained.
1868    /// This is only true for `AtTimestamp` since it is the only variant that
1869    /// specifies a timestamp.
1870    pub fn constrains_upper(&self) -> bool {
1871        match self {
1872            QueryWhen::AtTimestamp(_) => true,
1873            QueryWhen::AtLeastTimestamp(_)
1874            | QueryWhen::Immediately
1875            | QueryWhen::FreshestTableWrite => false,
1876        }
1877    }
1878    /// Returns whether the candidate must be advanced to the since.
1879    pub fn advance_to_since(&self) -> bool {
1880        match self {
1881            QueryWhen::Immediately
1882            | QueryWhen::AtLeastTimestamp(_)
1883            | QueryWhen::FreshestTableWrite => true,
1884            QueryWhen::AtTimestamp(_) => false,
1885        }
1886    }
1887    /// Returns whether the candidate can be advanced to the upper.
1888    pub fn can_advance_to_upper(&self) -> bool {
1889        match self {
1890            QueryWhen::Immediately => true,
1891            QueryWhen::FreshestTableWrite
1892            | QueryWhen::AtTimestamp(_)
1893            | QueryWhen::AtLeastTimestamp(_) => false,
1894        }
1895    }
1896
1897    /// Returns whether the candidate can be advanced to the timeline's timestamp.
1898    pub fn can_advance_to_timeline_ts(&self) -> bool {
1899        match self {
1900            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1901            QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1902        }
1903    }
1904    /// Returns whether the candidate must be advanced to the timeline's timestamp.
1905    pub fn must_advance_to_timeline_ts(&self) -> bool {
1906        match self {
1907            QueryWhen::FreshestTableWrite => true,
1908            QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1909                false
1910            }
1911        }
1912    }
1913    /// Returns whether the selected timestamp should be tracked within the current transaction.
1914    pub fn is_transactional(&self) -> bool {
1915        match self {
1916            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1917            QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1918        }
1919    }
1920}
1921
1922#[derive(Debug, Copy, Clone)]
1923pub enum MutationKind {
1924    Insert,
1925    Update,
1926    Delete,
1927}
1928
1929#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1930pub enum CopyFormat {
1931    Text,
1932    Csv,
1933    Binary,
1934    Parquet,
1935}
1936
1937#[derive(Debug, Copy, Clone)]
1938pub enum ExecuteTimeout {
1939    None,
1940    Seconds(f64),
1941    WaitOnce,
1942}
1943
1944#[derive(Clone, Debug)]
1945pub enum IndexOption {
1946    /// Configures the logical compaction window for an index.
1947    RetainHistory(CompactionWindow),
1948}
1949
1950#[derive(Clone, Debug)]
1951pub enum TableOption {
1952    /// Configures the logical compaction window for a table.
1953    RetainHistory(CompactionWindow),
1954}
1955
1956#[derive(Clone, Debug)]
1957pub struct PlanClusterOption {
1958    pub availability_zones: AlterOptionParameter<Vec<String>>,
1959    pub introspection_debugging: AlterOptionParameter<bool>,
1960    pub introspection_interval: AlterOptionParameter<OptionalDuration>,
1961    pub managed: AlterOptionParameter<bool>,
1962    pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
1963    pub replication_factor: AlterOptionParameter<u32>,
1964    pub size: AlterOptionParameter,
1965    pub schedule: AlterOptionParameter<ClusterSchedule>,
1966    pub workload_class: AlterOptionParameter<Option<String>>,
1967}
1968
1969impl Default for PlanClusterOption {
1970    fn default() -> Self {
1971        Self {
1972            availability_zones: AlterOptionParameter::Unchanged,
1973            introspection_debugging: AlterOptionParameter::Unchanged,
1974            introspection_interval: AlterOptionParameter::Unchanged,
1975            managed: AlterOptionParameter::Unchanged,
1976            replicas: AlterOptionParameter::Unchanged,
1977            replication_factor: AlterOptionParameter::Unchanged,
1978            size: AlterOptionParameter::Unchanged,
1979            schedule: AlterOptionParameter::Unchanged,
1980            workload_class: AlterOptionParameter::Unchanged,
1981        }
1982    }
1983}
1984
1985#[derive(Clone, Debug, PartialEq, Eq)]
1986pub enum AlterClusterPlanStrategy {
1987    None,
1988    For(Duration),
1989    UntilReady {
1990        on_timeout: OnTimeoutAction,
1991        timeout: Duration,
1992    },
1993}
1994
1995#[derive(Clone, Debug, PartialEq, Eq)]
1996pub enum OnTimeoutAction {
1997    Commit,
1998    Rollback,
1999}
2000
2001impl Default for OnTimeoutAction {
2002    fn default() -> Self {
2003        Self::Commit
2004    }
2005}
2006
2007impl TryFrom<&str> for OnTimeoutAction {
2008    type Error = PlanError;
2009    fn try_from(value: &str) -> Result<Self, Self::Error> {
2010        match value.to_uppercase().as_str() {
2011            "COMMIT" => Ok(Self::Commit),
2012            "ROLLBACK" => Ok(Self::Rollback),
2013            _ => Err(PlanError::Unstructured(
2014                "Valid options are COMMIT, ROLLBACK".into(),
2015            )),
2016        }
2017    }
2018}
2019
2020impl AlterClusterPlanStrategy {
2021    pub fn is_none(&self) -> bool {
2022        matches!(self, Self::None)
2023    }
2024    pub fn is_some(&self) -> bool {
2025        !matches!(self, Self::None)
2026    }
2027}
2028
2029impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2030    type Error = PlanError;
2031
2032    fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2033        Ok(match value.wait {
2034            Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2035            Some(ClusterAlterOptionValue::UntilReady(options)) => {
2036                let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2037                Self::UntilReady {
2038                    timeout: match extracted.timeout {
2039                        Some(d) => d,
2040                        None => Err(PlanError::UntilReadyTimeoutRequired)?,
2041                    },
2042                    on_timeout: match extracted.on_timeout {
2043                        Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2044                            PlanError::InvalidOptionValue {
2045                                option_name: "ON TIMEOUT".into(),
2046                                err: Box::new(e),
2047                            }
2048                        })?,
2049                        None => OnTimeoutAction::default(),
2050                    },
2051                }
2052            }
2053            None => Self::None,
2054        })
2055    }
2056}
2057
2058/// A vector of values to which parameter references should be bound.
2059#[derive(Debug, Clone)]
2060pub struct Params {
2061    /// The datums that were provided in the EXECUTE statement.
2062    pub datums: Row,
2063    /// The types of the datums provided in the EXECUTE statement.
2064    pub execute_types: Vec<SqlScalarType>,
2065    /// The types that the prepared statement expects based on its definition.
2066    pub expected_types: Vec<SqlScalarType>,
2067}
2068
2069impl Params {
2070    /// Returns a `Params` with no parameters.
2071    pub fn empty() -> Params {
2072        Params {
2073            datums: Row::pack_slice(&[]),
2074            execute_types: vec![],
2075            expected_types: vec![],
2076        }
2077    }
2078}
2079
2080/// Controls planning of a SQL query.
2081#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, Copy)]
2082pub struct PlanContext {
2083    pub wall_time: DateTime<Utc>,
2084    pub ignore_if_exists_errors: bool,
2085}
2086
2087impl PlanContext {
2088    pub fn new(wall_time: DateTime<Utc>) -> Self {
2089        Self {
2090            wall_time,
2091            ignore_if_exists_errors: false,
2092        }
2093    }
2094
2095    /// Return a PlanContext with zero values. This should only be used when
2096    /// planning is required but unused (like in `plan_create_table()`) or in
2097    /// tests.
2098    pub fn zero() -> Self {
2099        PlanContext {
2100            wall_time: now::to_datetime(NOW_ZERO()),
2101            ignore_if_exists_errors: false,
2102        }
2103    }
2104
2105    pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2106        self.ignore_if_exists_errors = value;
2107        self
2108    }
2109}