mz_sql/
plan.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL planning.
11//!
12//! SQL planning is the process of taking the abstract syntax tree of a
13//! [`Statement`] and turning it into a [`Plan`] that the dataflow layer can
14//! execute.
15//!
16//! Statements must be purified before they can be planned. See the
17//! [`pure`](crate::pure) module for details.
18
19// Internal module layout.
20//
21// The entry point for planning is `statement::handle_statement`. That function
22// dispatches to a more specific `handle` function for the particular statement
23// type. For most statements, this `handle` function is uninteresting and short,
24// but anything involving a `SELECT` statement gets complicated. `SELECT`
25// queries wind through the functions in the `query` module, starting with
26// `plan_root_query` and fanning out based on the contents of the `SELECT`
27// statement.
28
29use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{
41    CollectionPlan, ColumnOrder, MapFilterProject, MirRelationExpr, MirScalarExpr, RowSetFinishing,
42};
43use mz_ore::now::{self, NOW_ZERO};
44use mz_pgcopy::CopyFormatParams;
45use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
46use mz_repr::explain::{ExplainConfig, ExplainFormat};
47use mz_repr::network_policy_id::NetworkPolicyId;
48use mz_repr::optimize::OptimizerFeatureOverrides;
49use mz_repr::refresh_schedule::RefreshSchedule;
50use mz_repr::role_id::RoleId;
51use mz_repr::{
52    CatalogItemId, ColumnIndex, ColumnName, ColumnType, Diff, GlobalId, RelationDesc, RelationType,
53    Row, ScalarType, Timestamp, VersionedRelationDesc,
54};
55use mz_sql_parser::ast::{
56    AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
57    RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
58    Value, WithOptionValue,
59};
60use mz_ssh_util::keys::SshKeyPair;
61use mz_storage_types::connections::aws::AwsConnection;
62use mz_storage_types::connections::inline::ReferencedConnection;
63use mz_storage_types::connections::{
64    AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
65    MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
66};
67use mz_storage_types::instances::StorageInstanceId;
68use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
69use mz_storage_types::sources::{
70    SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
71};
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74
75use crate::ast::{
76    ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
77    TransactionAccessMode,
78};
79use crate::catalog::{
80    CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
81    RoleAttributes,
82};
83use crate::names::{
84    Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
85    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
86};
87
88pub(crate) mod error;
89pub(crate) mod explain;
90pub(crate) mod hir;
91pub(crate) mod literal;
92pub(crate) mod lowering;
93pub(crate) mod notice;
94pub(crate) mod plan_utils;
95pub(crate) mod query;
96pub(crate) mod scope;
97pub(crate) mod side_effecting_func;
98pub(crate) mod statement;
99pub(crate) mod transform_ast;
100pub(crate) mod transform_hir;
101pub(crate) mod typeconv;
102pub(crate) mod with_options;
103
104use crate::plan;
105use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
106use crate::plan::with_options::OptionalDuration;
107pub use error::PlanError;
108pub use explain::normalize_subqueries;
109pub use hir::{
110    AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
111    WindowExprType,
112};
113pub use lowering::Config as HirToMirConfig;
114pub use notice::PlanNotice;
115pub use query::{ExprContext, QueryContext, QueryLifetime};
116pub use scope::Scope;
117pub use side_effecting_func::SideEffectingFunc;
118pub use statement::ddl::{
119    AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
120    PlannedAlterRoleOption, PlannedRoleVariable, SqlServerConfigOptionExtracted,
121};
122pub use statement::{
123    StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
124    resolve_cluster_for_materialized_view,
125};
126
127use self::statement::ddl::ClusterAlterOptionExtracted;
128use self::with_options::TryFromValue;
129
130/// Instructions for executing a SQL query.
131#[derive(Debug, EnumKind)]
132#[enum_kind(PlanKind)]
133pub enum Plan {
134    CreateConnection(CreateConnectionPlan),
135    CreateDatabase(CreateDatabasePlan),
136    CreateSchema(CreateSchemaPlan),
137    CreateRole(CreateRolePlan),
138    CreateCluster(CreateClusterPlan),
139    CreateClusterReplica(CreateClusterReplicaPlan),
140    CreateSource(CreateSourcePlan),
141    CreateSources(Vec<CreateSourcePlanBundle>),
142    CreateSecret(CreateSecretPlan),
143    CreateSink(CreateSinkPlan),
144    CreateTable(CreateTablePlan),
145    CreateView(CreateViewPlan),
146    CreateMaterializedView(CreateMaterializedViewPlan),
147    CreateContinualTask(CreateContinualTaskPlan),
148    CreateNetworkPolicy(CreateNetworkPolicyPlan),
149    CreateIndex(CreateIndexPlan),
150    CreateType(CreateTypePlan),
151    Comment(CommentPlan),
152    DiscardTemp,
153    DiscardAll,
154    DropObjects(DropObjectsPlan),
155    DropOwned(DropOwnedPlan),
156    EmptyQuery,
157    ShowAllVariables,
158    ShowCreate(ShowCreatePlan),
159    ShowColumns(ShowColumnsPlan),
160    ShowVariable(ShowVariablePlan),
161    InspectShard(InspectShardPlan),
162    SetVariable(SetVariablePlan),
163    ResetVariable(ResetVariablePlan),
164    SetTransaction(SetTransactionPlan),
165    StartTransaction(StartTransactionPlan),
166    CommitTransaction(CommitTransactionPlan),
167    AbortTransaction(AbortTransactionPlan),
168    Select(SelectPlan),
169    Subscribe(SubscribePlan),
170    CopyFrom(CopyFromPlan),
171    CopyTo(CopyToPlan),
172    ExplainPlan(ExplainPlanPlan),
173    ExplainPushdown(ExplainPushdownPlan),
174    ExplainTimestamp(ExplainTimestampPlan),
175    ExplainSinkSchema(ExplainSinkSchemaPlan),
176    Insert(InsertPlan),
177    AlterCluster(AlterClusterPlan),
178    AlterClusterSwap(AlterClusterSwapPlan),
179    AlterNoop(AlterNoopPlan),
180    AlterSetCluster(AlterSetClusterPlan),
181    AlterConnection(AlterConnectionPlan),
182    AlterSource(AlterSourcePlan),
183    AlterClusterRename(AlterClusterRenamePlan),
184    AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
185    AlterItemRename(AlterItemRenamePlan),
186    AlterSchemaRename(AlterSchemaRenamePlan),
187    AlterSchemaSwap(AlterSchemaSwapPlan),
188    AlterSecret(AlterSecretPlan),
189    AlterSink(AlterSinkPlan),
190    AlterSystemSet(AlterSystemSetPlan),
191    AlterSystemReset(AlterSystemResetPlan),
192    AlterSystemResetAll(AlterSystemResetAllPlan),
193    AlterRole(AlterRolePlan),
194    AlterOwner(AlterOwnerPlan),
195    AlterTableAddColumn(AlterTablePlan),
196    AlterNetworkPolicy(AlterNetworkPolicyPlan),
197    Declare(DeclarePlan),
198    Fetch(FetchPlan),
199    Close(ClosePlan),
200    ReadThenWrite(ReadThenWritePlan),
201    Prepare(PreparePlan),
202    Execute(ExecutePlan),
203    Deallocate(DeallocatePlan),
204    Raise(RaisePlan),
205    GrantRole(GrantRolePlan),
206    RevokeRole(RevokeRolePlan),
207    GrantPrivileges(GrantPrivilegesPlan),
208    RevokePrivileges(RevokePrivilegesPlan),
209    AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
210    ReassignOwned(ReassignOwnedPlan),
211    SideEffectingFunc(SideEffectingFunc),
212    ValidateConnection(ValidateConnectionPlan),
213    AlterRetainHistory(AlterRetainHistoryPlan),
214}
215
216impl Plan {
217    /// Expresses which [`StatementKind`] can generate which set of
218    /// [`PlanKind`].
219    pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
220        match stmt {
221            StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
222            StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
223            StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
224            StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
225            StatementKind::AlterObjectRename => &[
226                PlanKind::AlterClusterRename,
227                PlanKind::AlterClusterReplicaRename,
228                PlanKind::AlterItemRename,
229                PlanKind::AlterSchemaRename,
230                PlanKind::AlterNoop,
231            ],
232            StatementKind::AlterObjectSwap => &[
233                PlanKind::AlterClusterSwap,
234                PlanKind::AlterSchemaSwap,
235                PlanKind::AlterNoop,
236            ],
237            StatementKind::AlterRole => &[PlanKind::AlterRole],
238            StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
239            StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
240            StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
241            StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
242            StatementKind::AlterSource => &[
243                PlanKind::AlterNoop,
244                PlanKind::AlterSource,
245                PlanKind::AlterRetainHistory,
246            ],
247            StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
248            StatementKind::AlterSystemResetAll => {
249                &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
250            }
251            StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
252            StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
253            StatementKind::AlterTableAddColumn => {
254                &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
255            }
256            StatementKind::Close => &[PlanKind::Close],
257            StatementKind::Comment => &[PlanKind::Comment],
258            StatementKind::Commit => &[PlanKind::CommitTransaction],
259            StatementKind::Copy => &[
260                PlanKind::CopyFrom,
261                PlanKind::Select,
262                PlanKind::Subscribe,
263                PlanKind::CopyTo,
264            ],
265            StatementKind::CreateCluster => &[PlanKind::CreateCluster],
266            StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
267            StatementKind::CreateConnection => &[PlanKind::CreateConnection],
268            StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
269            StatementKind::CreateIndex => &[PlanKind::CreateIndex],
270            StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
271            StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
272            StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
273            StatementKind::CreateRole => &[PlanKind::CreateRole],
274            StatementKind::CreateSchema => &[PlanKind::CreateSchema],
275            StatementKind::CreateSecret => &[PlanKind::CreateSecret],
276            StatementKind::CreateSink => &[PlanKind::CreateSink],
277            StatementKind::CreateSource | StatementKind::CreateSubsource => {
278                &[PlanKind::CreateSource]
279            }
280            StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
281            StatementKind::CreateTable => &[PlanKind::CreateTable],
282            StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
283            StatementKind::CreateType => &[PlanKind::CreateType],
284            StatementKind::CreateView => &[PlanKind::CreateView],
285            StatementKind::Deallocate => &[PlanKind::Deallocate],
286            StatementKind::Declare => &[PlanKind::Declare],
287            StatementKind::Delete => &[PlanKind::ReadThenWrite],
288            StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
289            StatementKind::DropObjects => &[PlanKind::DropObjects],
290            StatementKind::DropOwned => &[PlanKind::DropOwned],
291            StatementKind::Execute => &[PlanKind::Execute],
292            StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
293            StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
294            StatementKind::ExplainAnalyze => &[PlanKind::Select],
295            StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
296            StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
297            StatementKind::Fetch => &[PlanKind::Fetch],
298            StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
299            StatementKind::GrantRole => &[PlanKind::GrantRole],
300            StatementKind::Insert => &[PlanKind::Insert],
301            StatementKind::Prepare => &[PlanKind::Prepare],
302            StatementKind::Raise => &[PlanKind::Raise],
303            StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
304            StatementKind::ResetVariable => &[PlanKind::ResetVariable],
305            StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
306            StatementKind::RevokeRole => &[PlanKind::RevokeRole],
307            StatementKind::Rollback => &[PlanKind::AbortTransaction],
308            StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
309            StatementKind::SetTransaction => &[PlanKind::SetTransaction],
310            StatementKind::SetVariable => &[PlanKind::SetVariable],
311            StatementKind::Show => &[
312                PlanKind::Select,
313                PlanKind::ShowVariable,
314                PlanKind::ShowCreate,
315                PlanKind::ShowColumns,
316                PlanKind::ShowAllVariables,
317                PlanKind::InspectShard,
318            ],
319            StatementKind::StartTransaction => &[PlanKind::StartTransaction],
320            StatementKind::Subscribe => &[PlanKind::Subscribe],
321            StatementKind::Update => &[PlanKind::ReadThenWrite],
322            StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
323            StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
324        }
325    }
326
327    /// Returns a human readable name of the plan. Meant for use in messages sent back to a user.
328    pub fn name(&self) -> &str {
329        match self {
330            Plan::CreateConnection(_) => "create connection",
331            Plan::CreateDatabase(_) => "create database",
332            Plan::CreateSchema(_) => "create schema",
333            Plan::CreateRole(_) => "create role",
334            Plan::CreateCluster(_) => "create cluster",
335            Plan::CreateClusterReplica(_) => "create cluster replica",
336            Plan::CreateSource(_) => "create source",
337            Plan::CreateSources(_) => "create source",
338            Plan::CreateSecret(_) => "create secret",
339            Plan::CreateSink(_) => "create sink",
340            Plan::CreateTable(_) => "create table",
341            Plan::CreateView(_) => "create view",
342            Plan::CreateMaterializedView(_) => "create materialized view",
343            Plan::CreateContinualTask(_) => "create continual task",
344            Plan::CreateIndex(_) => "create index",
345            Plan::CreateType(_) => "create type",
346            Plan::CreateNetworkPolicy(_) => "create network policy",
347            Plan::Comment(_) => "comment",
348            Plan::DiscardTemp => "discard temp",
349            Plan::DiscardAll => "discard all",
350            Plan::DropObjects(plan) => match plan.object_type {
351                ObjectType::Table => "drop table",
352                ObjectType::View => "drop view",
353                ObjectType::MaterializedView => "drop materialized view",
354                ObjectType::Source => "drop source",
355                ObjectType::Sink => "drop sink",
356                ObjectType::Index => "drop index",
357                ObjectType::Type => "drop type",
358                ObjectType::Role => "drop roles",
359                ObjectType::Cluster => "drop clusters",
360                ObjectType::ClusterReplica => "drop cluster replicas",
361                ObjectType::Secret => "drop secret",
362                ObjectType::Connection => "drop connection",
363                ObjectType::Database => "drop database",
364                ObjectType::Schema => "drop schema",
365                ObjectType::Func => "drop function",
366                ObjectType::ContinualTask => "drop continual task",
367                ObjectType::NetworkPolicy => "drop network policy",
368            },
369            Plan::DropOwned(_) => "drop owned",
370            Plan::EmptyQuery => "do nothing",
371            Plan::ShowAllVariables => "show all variables",
372            Plan::ShowCreate(_) => "show create",
373            Plan::ShowColumns(_) => "show columns",
374            Plan::ShowVariable(_) => "show variable",
375            Plan::InspectShard(_) => "inspect shard",
376            Plan::SetVariable(_) => "set variable",
377            Plan::ResetVariable(_) => "reset variable",
378            Plan::SetTransaction(_) => "set transaction",
379            Plan::StartTransaction(_) => "start transaction",
380            Plan::CommitTransaction(_) => "commit",
381            Plan::AbortTransaction(_) => "abort",
382            Plan::Select(_) => "select",
383            Plan::Subscribe(_) => "subscribe",
384            Plan::CopyFrom(_) => "copy from",
385            Plan::CopyTo(_) => "copy to",
386            Plan::ExplainPlan(_) => "explain plan",
387            Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
388            Plan::ExplainTimestamp(_) => "explain timestamp",
389            Plan::ExplainSinkSchema(_) => "explain schema",
390            Plan::Insert(_) => "insert",
391            Plan::AlterNoop(plan) => match plan.object_type {
392                ObjectType::Table => "alter table",
393                ObjectType::View => "alter view",
394                ObjectType::MaterializedView => "alter materialized view",
395                ObjectType::Source => "alter source",
396                ObjectType::Sink => "alter sink",
397                ObjectType::Index => "alter index",
398                ObjectType::Type => "alter type",
399                ObjectType::Role => "alter role",
400                ObjectType::Cluster => "alter cluster",
401                ObjectType::ClusterReplica => "alter cluster replica",
402                ObjectType::Secret => "alter secret",
403                ObjectType::Connection => "alter connection",
404                ObjectType::Database => "alter database",
405                ObjectType::Schema => "alter schema",
406                ObjectType::Func => "alter function",
407                ObjectType::ContinualTask => "alter continual task",
408                ObjectType::NetworkPolicy => "alter network policy",
409            },
410            Plan::AlterCluster(_) => "alter cluster",
411            Plan::AlterClusterRename(_) => "alter cluster rename",
412            Plan::AlterClusterSwap(_) => "alter cluster swap",
413            Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
414            Plan::AlterSetCluster(_) => "alter set cluster",
415            Plan::AlterConnection(_) => "alter connection",
416            Plan::AlterSource(_) => "alter source",
417            Plan::AlterItemRename(_) => "rename item",
418            Plan::AlterSchemaRename(_) => "alter rename schema",
419            Plan::AlterSchemaSwap(_) => "alter swap schema",
420            Plan::AlterSecret(_) => "alter secret",
421            Plan::AlterSink(_) => "alter sink",
422            Plan::AlterSystemSet(_) => "alter system",
423            Plan::AlterSystemReset(_) => "alter system",
424            Plan::AlterSystemResetAll(_) => "alter system",
425            Plan::AlterRole(_) => "alter role",
426            Plan::AlterNetworkPolicy(_) => "alter network policy",
427            Plan::AlterOwner(plan) => match plan.object_type {
428                ObjectType::Table => "alter table owner",
429                ObjectType::View => "alter view owner",
430                ObjectType::MaterializedView => "alter materialized view owner",
431                ObjectType::Source => "alter source owner",
432                ObjectType::Sink => "alter sink owner",
433                ObjectType::Index => "alter index owner",
434                ObjectType::Type => "alter type owner",
435                ObjectType::Role => "alter role owner",
436                ObjectType::Cluster => "alter cluster owner",
437                ObjectType::ClusterReplica => "alter cluster replica owner",
438                ObjectType::Secret => "alter secret owner",
439                ObjectType::Connection => "alter connection owner",
440                ObjectType::Database => "alter database owner",
441                ObjectType::Schema => "alter schema owner",
442                ObjectType::Func => "alter function owner",
443                ObjectType::ContinualTask => "alter continual task owner",
444                ObjectType::NetworkPolicy => "alter network policy owner",
445            },
446            Plan::AlterTableAddColumn(_) => "alter table add column",
447            Plan::Declare(_) => "declare",
448            Plan::Fetch(_) => "fetch",
449            Plan::Close(_) => "close",
450            Plan::ReadThenWrite(plan) => match plan.kind {
451                MutationKind::Insert => "insert into select",
452                MutationKind::Update => "update",
453                MutationKind::Delete => "delete",
454            },
455            Plan::Prepare(_) => "prepare",
456            Plan::Execute(_) => "execute",
457            Plan::Deallocate(_) => "deallocate",
458            Plan::Raise(_) => "raise",
459            Plan::GrantRole(_) => "grant role",
460            Plan::RevokeRole(_) => "revoke role",
461            Plan::GrantPrivileges(_) => "grant privilege",
462            Plan::RevokePrivileges(_) => "revoke privilege",
463            Plan::AlterDefaultPrivileges(_) => "alter default privileges",
464            Plan::ReassignOwned(_) => "reassign owned",
465            Plan::SideEffectingFunc(_) => "side effecting func",
466            Plan::ValidateConnection(_) => "validate connection",
467            Plan::AlterRetainHistory(_) => "alter retain history",
468        }
469    }
470
471    /// Returns `true` iff this `Plan` is allowed to be executed in read-only
472    /// mode.
473    ///
474    /// We use an explicit allow-list, to avoid future additions automatically
475    /// falling into the `true` category.
476    pub fn allowed_in_read_only(&self) -> bool {
477        match self {
478            // These two set non-durable session variables, so are okay in
479            // read-only mode.
480            Plan::SetVariable(_) => true,
481            Plan::ResetVariable(_) => true,
482            Plan::SetTransaction(_) => true,
483            Plan::StartTransaction(_) => true,
484            Plan::CommitTransaction(_) => true,
485            Plan::AbortTransaction(_) => true,
486            Plan::Select(_) => true,
487            Plan::EmptyQuery => true,
488            Plan::ShowAllVariables => true,
489            Plan::ShowCreate(_) => true,
490            Plan::ShowColumns(_) => true,
491            Plan::ShowVariable(_) => true,
492            Plan::InspectShard(_) => true,
493            Plan::Subscribe(_) => true,
494            Plan::CopyTo(_) => true,
495            Plan::ExplainPlan(_) => true,
496            Plan::ExplainPushdown(_) => true,
497            Plan::ExplainTimestamp(_) => true,
498            Plan::ExplainSinkSchema(_) => true,
499            Plan::ValidateConnection(_) => true,
500            _ => false,
501        }
502    }
503}
504
505#[derive(Debug)]
506pub struct StartTransactionPlan {
507    pub access: Option<TransactionAccessMode>,
508    pub isolation_level: Option<TransactionIsolationLevel>,
509}
510
511#[derive(Debug)]
512pub enum TransactionType {
513    Explicit,
514    Implicit,
515}
516
517impl TransactionType {
518    pub fn is_explicit(&self) -> bool {
519        matches!(self, TransactionType::Explicit)
520    }
521
522    pub fn is_implicit(&self) -> bool {
523        matches!(self, TransactionType::Implicit)
524    }
525}
526
527#[derive(Debug)]
528pub struct CommitTransactionPlan {
529    pub transaction_type: TransactionType,
530}
531
532#[derive(Debug)]
533pub struct AbortTransactionPlan {
534    pub transaction_type: TransactionType,
535}
536
537#[derive(Debug)]
538pub struct CreateDatabasePlan {
539    pub name: String,
540    pub if_not_exists: bool,
541}
542
543#[derive(Debug)]
544pub struct CreateSchemaPlan {
545    pub database_spec: ResolvedDatabaseSpecifier,
546    pub schema_name: String,
547    pub if_not_exists: bool,
548}
549
550#[derive(Debug)]
551pub struct CreateRolePlan {
552    pub name: String,
553    pub attributes: RoleAttributes,
554}
555
556#[derive(Debug, PartialEq, Eq, Clone)]
557pub struct CreateClusterPlan {
558    pub name: String,
559    pub variant: CreateClusterVariant,
560    pub workload_class: Option<String>,
561}
562
563#[derive(Debug, PartialEq, Eq, Clone)]
564pub enum CreateClusterVariant {
565    Managed(CreateClusterManagedPlan),
566    Unmanaged(CreateClusterUnmanagedPlan),
567}
568
569#[derive(Debug, PartialEq, Eq, Clone)]
570pub struct CreateClusterUnmanagedPlan {
571    pub replicas: Vec<(String, ReplicaConfig)>,
572}
573
574#[derive(Debug, PartialEq, Eq, Clone)]
575pub struct CreateClusterManagedPlan {
576    pub replication_factor: u32,
577    pub size: String,
578    pub availability_zones: Vec<String>,
579    pub compute: ComputeReplicaConfig,
580    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
581    pub schedule: ClusterSchedule,
582}
583
584#[derive(Debug)]
585pub struct CreateClusterReplicaPlan {
586    pub cluster_id: ClusterId,
587    pub name: String,
588    pub config: ReplicaConfig,
589}
590
591/// Configuration of introspection for a cluster replica.
592#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq)]
593pub struct ComputeReplicaIntrospectionConfig {
594    /// Whether to introspect the introspection.
595    pub debugging: bool,
596    /// The interval at which to introspect.
597    pub interval: Duration,
598}
599
600#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
601pub struct ComputeReplicaConfig {
602    pub introspection: Option<ComputeReplicaIntrospectionConfig>,
603}
604
605#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
606pub enum ReplicaConfig {
607    Unorchestrated {
608        storagectl_addrs: Vec<String>,
609        computectl_addrs: Vec<String>,
610        compute: ComputeReplicaConfig,
611    },
612    Orchestrated {
613        size: String,
614        availability_zone: Option<String>,
615        compute: ComputeReplicaConfig,
616        internal: bool,
617        billed_as: Option<String>,
618    },
619}
620
621#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
622pub enum ClusterSchedule {
623    /// The system won't automatically turn the cluster On or Off.
624    Manual,
625    /// The cluster will be On when a REFRESH materialized view on it needs to refresh.
626    /// `hydration_time_estimate` determines how much time before a refresh to turn the
627    /// cluster On, so that it can rehydrate already before the refresh time.
628    Refresh { hydration_time_estimate: Duration },
629}
630
631impl Default for ClusterSchedule {
632    fn default() -> Self {
633        // (Has to be consistent with `impl Default for ClusterScheduleOptionValue`.)
634        ClusterSchedule::Manual
635    }
636}
637
638#[derive(Debug)]
639pub struct CreateSourcePlan {
640    pub name: QualifiedItemName,
641    pub source: Source,
642    pub if_not_exists: bool,
643    pub timeline: Timeline,
644    // None for subsources, which run on the parent cluster.
645    pub in_cluster: Option<ClusterId>,
646}
647
648#[derive(Clone, Debug, PartialEq, Eq)]
649pub struct SourceReferences {
650    pub updated_at: u64,
651    pub references: Vec<SourceReference>,
652}
653
654/// An available external reference for a source and if possible to retrieve,
655/// any column names it contains.
656#[derive(Clone, Debug, PartialEq, Eq)]
657pub struct SourceReference {
658    pub name: String,
659    pub namespace: Option<String>,
660    pub columns: Vec<String>,
661}
662
663/// A [`CreateSourcePlan`] and the metadata necessary to sequence it.
664#[derive(Debug)]
665pub struct CreateSourcePlanBundle {
666    /// ID of this source in the Catalog.
667    pub item_id: CatalogItemId,
668    /// ID used to reference this source from outside the catalog, e.g. compute.
669    pub global_id: GlobalId,
670    /// Details of the source to create.
671    pub plan: CreateSourcePlan,
672    /// Other catalog objects that are referenced by this source, determined at name resolution.
673    pub resolved_ids: ResolvedIds,
674    /// All the available upstream references for this source.
675    /// Populated for top-level sources that can contain subsources/tables
676    /// and used during sequencing to populate the appropriate catalog fields.
677    pub available_source_references: Option<SourceReferences>,
678}
679
680#[derive(Debug)]
681pub struct CreateConnectionPlan {
682    pub name: QualifiedItemName,
683    pub if_not_exists: bool,
684    pub connection: Connection,
685    pub validate: bool,
686}
687
688#[derive(Debug)]
689pub struct ValidateConnectionPlan {
690    /// ID of the connection in the Catalog.
691    pub id: CatalogItemId,
692    /// The connection to validate.
693    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
694}
695
696#[derive(Debug)]
697pub struct CreateSecretPlan {
698    pub name: QualifiedItemName,
699    pub secret: Secret,
700    pub if_not_exists: bool,
701}
702
703#[derive(Debug)]
704pub struct CreateSinkPlan {
705    pub name: QualifiedItemName,
706    pub sink: Sink,
707    pub with_snapshot: bool,
708    pub if_not_exists: bool,
709    pub in_cluster: ClusterId,
710}
711
712#[derive(Debug)]
713pub struct CreateTablePlan {
714    pub name: QualifiedItemName,
715    pub table: Table,
716    pub if_not_exists: bool,
717}
718
719#[derive(Debug, Clone)]
720pub struct CreateViewPlan {
721    pub name: QualifiedItemName,
722    pub view: View,
723    /// The Catalog objects that this view is replacing, if any.
724    pub replace: Option<CatalogItemId>,
725    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
726    pub drop_ids: Vec<CatalogItemId>,
727    pub if_not_exists: bool,
728    /// True if the view contains an expression that can make the exact column list
729    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
730    pub ambiguous_columns: bool,
731}
732
733#[derive(Debug, Clone)]
734pub struct CreateMaterializedViewPlan {
735    pub name: QualifiedItemName,
736    pub materialized_view: MaterializedView,
737    /// The Catalog objects that this materialized view is replacing, if any.
738    pub replace: Option<CatalogItemId>,
739    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
740    pub drop_ids: Vec<CatalogItemId>,
741    pub if_not_exists: bool,
742    /// True if the materialized view contains an expression that can make the exact column list
743    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
744    pub ambiguous_columns: bool,
745}
746
747#[derive(Debug, Clone)]
748pub struct CreateContinualTaskPlan {
749    pub name: QualifiedItemName,
750    /// During initial creation, the `LocalId` placeholder for this CT in `continual_task.expr`.
751    /// None on restart.
752    pub placeholder_id: Option<mz_expr::LocalId>,
753    pub desc: RelationDesc,
754    /// ID of the collection we read into this continual task.
755    pub input_id: GlobalId,
756    pub with_snapshot: bool,
757    /// Definition for the continual task.
758    pub continual_task: MaterializedView,
759}
760
761#[derive(Debug, Clone)]
762pub struct CreateNetworkPolicyPlan {
763    pub name: String,
764    pub rules: Vec<NetworkPolicyRule>,
765}
766
767#[derive(Debug, Clone)]
768pub struct AlterNetworkPolicyPlan {
769    pub id: NetworkPolicyId,
770    pub name: String,
771    pub rules: Vec<NetworkPolicyRule>,
772}
773
774#[derive(Debug, Clone)]
775pub struct CreateIndexPlan {
776    pub name: QualifiedItemName,
777    pub index: Index,
778    pub if_not_exists: bool,
779}
780
781#[derive(Debug)]
782pub struct CreateTypePlan {
783    pub name: QualifiedItemName,
784    pub typ: Type,
785}
786
787#[derive(Debug)]
788pub struct DropObjectsPlan {
789    /// The IDs of only the objects directly referenced in the `DROP` statement.
790    pub referenced_ids: Vec<ObjectId>,
791    /// All object IDs to drop. Includes `referenced_ids` and all descendants.
792    pub drop_ids: Vec<ObjectId>,
793    /// The type of object that was dropped explicitly in the DROP statement. `ids` may contain
794    /// objects of different types due to CASCADE.
795    pub object_type: ObjectType,
796}
797
798#[derive(Debug)]
799pub struct DropOwnedPlan {
800    /// The role IDs that own the objects.
801    pub role_ids: Vec<RoleId>,
802    /// All object IDs to drop.
803    pub drop_ids: Vec<ObjectId>,
804    /// The privileges to revoke.
805    pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
806    /// The default privileges to revoke.
807    pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
808}
809
810#[derive(Debug)]
811pub struct ShowVariablePlan {
812    pub name: String,
813}
814
815#[derive(Debug)]
816pub struct InspectShardPlan {
817    /// ID of the storage collection to inspect.
818    pub id: GlobalId,
819}
820
821#[derive(Debug)]
822pub struct SetVariablePlan {
823    pub name: String,
824    pub value: VariableValue,
825    pub local: bool,
826}
827
828#[derive(Debug)]
829pub enum VariableValue {
830    Default,
831    Values(Vec<String>),
832}
833
834#[derive(Debug)]
835pub struct ResetVariablePlan {
836    pub name: String,
837}
838
839#[derive(Debug)]
840pub struct SetTransactionPlan {
841    pub local: bool,
842    pub modes: Vec<TransactionMode>,
843}
844
845/// A plan for select statements.
846#[derive(Clone, Debug)]
847pub struct SelectPlan {
848    /// The `SELECT` statement itself. Used for explain/notices, but not otherwise
849    /// load-bearing. Boxed to save stack space.
850    pub select: Option<Box<SelectStatement<Aug>>>,
851    /// The plan as a HIR.
852    pub source: HirRelationExpr,
853    /// At what time should this select happen?
854    pub when: QueryWhen,
855    /// Instructions how to form the result set.
856    pub finishing: RowSetFinishing,
857    /// For `COPY TO`, the format to use.
858    pub copy_to: Option<CopyFormat>,
859}
860
861impl SelectPlan {
862    pub fn immediate(rows: Vec<Row>, typ: RelationType) -> Self {
863        let arity = typ.arity();
864        SelectPlan {
865            select: None,
866            source: HirRelationExpr::Constant { rows, typ },
867            when: QueryWhen::Immediately,
868            finishing: RowSetFinishing::trivial(arity),
869            copy_to: None,
870        }
871    }
872}
873
874#[derive(Debug)]
875pub enum SubscribeOutput {
876    Diffs,
877    WithinTimestampOrderBy {
878        /// We pretend that mz_diff is prepended to the normal columns, making it index 0
879        order_by: Vec<ColumnOrder>,
880    },
881    EnvelopeUpsert {
882        /// Order by with just keys
883        order_by_keys: Vec<ColumnOrder>,
884    },
885    EnvelopeDebezium {
886        /// Order by with just keys
887        order_by_keys: Vec<ColumnOrder>,
888    },
889}
890
891#[derive(Debug)]
892pub struct SubscribePlan {
893    pub from: SubscribeFrom,
894    pub with_snapshot: bool,
895    pub when: QueryWhen,
896    pub up_to: Option<MirScalarExpr>,
897    pub copy_to: Option<CopyFormat>,
898    pub emit_progress: bool,
899    pub output: SubscribeOutput,
900}
901
902#[derive(Debug, Clone)]
903pub enum SubscribeFrom {
904    /// ID of the collection to subscribe to.
905    Id(GlobalId),
906    /// Query to subscribe to.
907    Query {
908        expr: MirRelationExpr,
909        desc: RelationDesc,
910    },
911}
912
913impl SubscribeFrom {
914    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
915        match self {
916            SubscribeFrom::Id(id) => BTreeSet::from([*id]),
917            SubscribeFrom::Query { expr, .. } => expr.depends_on(),
918        }
919    }
920
921    pub fn contains_temporal(&self) -> bool {
922        match self {
923            SubscribeFrom::Id(_) => false,
924            SubscribeFrom::Query { expr, .. } => expr.contains_temporal(),
925        }
926    }
927}
928
929#[derive(Debug)]
930pub struct ShowCreatePlan {
931    pub id: ObjectId,
932    pub row: Row,
933}
934
935#[derive(Debug)]
936pub struct ShowColumnsPlan {
937    pub id: CatalogItemId,
938    pub select_plan: SelectPlan,
939    pub new_resolved_ids: ResolvedIds,
940}
941
942#[derive(Debug)]
943pub struct CopyFromPlan {
944    /// Table we're copying into.
945    pub id: CatalogItemId,
946    /// Source we're copying data from.
947    pub source: CopyFromSource,
948    /// How input columns map to those on the destination table.
949    ///
950    /// TODO(cf2): Remove this field in favor of the mfp.
951    pub columns: Vec<ColumnIndex>,
952    /// [`RelationDesc`] describing the input data.
953    pub source_desc: RelationDesc,
954    /// Changes the shape of the input data to match the destination table.
955    pub mfp: MapFilterProject,
956    /// Format specific params for copying the input data.
957    pub params: CopyFormatParams<'static>,
958    /// Filter for the source files we're copying from, e.g. an S3 prefix.
959    pub filter: Option<CopyFromFilter>,
960}
961
962#[derive(Debug)]
963pub enum CopyFromSource {
964    /// Copying from a file local to the user, transmitted via pgwire.
965    Stdin,
966    /// A remote resource, e.g. HTTP file.
967    ///
968    /// The contained [`HirScalarExpr`] evaluates to the Url for the remote resource.
969    Url(HirScalarExpr),
970    /// A file in an S3 bucket.
971    AwsS3 {
972        /// Expression that evaluates to the file we want to copy.
973        uri: HirScalarExpr,
974        /// Details for how we connect to AWS S3.
975        connection: AwsConnection,
976        /// ID of the connection object.
977        connection_id: CatalogItemId,
978    },
979}
980
981#[derive(Debug)]
982pub enum CopyFromFilter {
983    Files(Vec<String>),
984    Pattern(String),
985}
986
987#[derive(Debug, Clone)]
988pub struct CopyToPlan {
989    /// The select query plan whose data will be copied to destination uri.
990    pub select_plan: SelectPlan,
991    pub desc: RelationDesc,
992    /// The scalar expression to be resolved to get the destination uri.
993    pub to: HirScalarExpr,
994    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
995    /// The ID of the connection.
996    pub connection_id: CatalogItemId,
997    pub format: S3SinkFormat,
998    pub max_file_size: u64,
999}
1000
1001#[derive(Clone, Debug)]
1002pub struct ExplainPlanPlan {
1003    pub stage: ExplainStage,
1004    pub format: ExplainFormat,
1005    pub config: ExplainConfig,
1006    pub explainee: Explainee,
1007}
1008
1009/// The type of object to be explained
1010#[derive(Clone, Debug)]
1011pub enum Explainee {
1012    /// Lookup and explain a plan saved for an view.
1013    View(CatalogItemId),
1014    /// Lookup and explain a plan saved for an existing materialized view.
1015    MaterializedView(CatalogItemId),
1016    /// Lookup and explain a plan saved for an existing index.
1017    Index(CatalogItemId),
1018    /// Replan an existing view.
1019    ReplanView(CatalogItemId),
1020    /// Replan an existing materialized view.
1021    ReplanMaterializedView(CatalogItemId),
1022    /// Replan an existing index.
1023    ReplanIndex(CatalogItemId),
1024    /// A SQL statement.
1025    Statement(ExplaineeStatement),
1026}
1027
1028/// Explainee types that are statements.
1029#[derive(Clone, Debug, EnumKind)]
1030#[enum_kind(ExplaineeStatementKind)]
1031pub enum ExplaineeStatement {
1032    /// The object to be explained is a SELECT statement.
1033    Select {
1034        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1035        broken: bool,
1036        plan: plan::SelectPlan,
1037        desc: RelationDesc,
1038    },
1039    /// The object to be explained is a CREATE VIEW.
1040    CreateView {
1041        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1042        broken: bool,
1043        plan: plan::CreateViewPlan,
1044    },
1045    /// The object to be explained is a CREATE MATERIALIZED VIEW.
1046    CreateMaterializedView {
1047        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1048        broken: bool,
1049        plan: plan::CreateMaterializedViewPlan,
1050    },
1051    /// The object to be explained is a CREATE INDEX.
1052    CreateIndex {
1053        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1054        broken: bool,
1055        plan: plan::CreateIndexPlan,
1056    },
1057}
1058
1059impl ExplaineeStatement {
1060    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1061        match self {
1062            Self::Select { plan, .. } => plan.source.depends_on(),
1063            Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1064            Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1065            Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1066        }
1067    }
1068
1069    /// Statements that have their `broken` flag set are expected to cause a
1070    /// panic in the optimizer code. In this case:
1071    ///
1072    /// 1. The optimizer pipeline execution will stop, but the panic will be
1073    ///    intercepted and will not propagate to the caller. The partial
1074    ///    optimizer trace collected until this point will be available.
1075    /// 2. The optimizer trace tracing subscriber will delegate regular tracing
1076    ///    spans and events to the default subscriber.
1077    ///
1078    /// This is useful when debugging queries that cause panics.
1079    pub fn broken(&self) -> bool {
1080        match self {
1081            Self::Select { broken, .. } => *broken,
1082            Self::CreateView { broken, .. } => *broken,
1083            Self::CreateMaterializedView { broken, .. } => *broken,
1084            Self::CreateIndex { broken, .. } => *broken,
1085        }
1086    }
1087}
1088
1089impl ExplaineeStatementKind {
1090    pub fn supports(&self, stage: &ExplainStage) -> bool {
1091        use ExplainStage::*;
1092        match self {
1093            Self::Select => true,
1094            Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1095            Self::CreateMaterializedView => true,
1096            Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1097        }
1098    }
1099}
1100
1101impl std::fmt::Display for ExplaineeStatementKind {
1102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1103        match self {
1104            Self::Select => write!(f, "SELECT"),
1105            Self::CreateView => write!(f, "CREATE VIEW"),
1106            Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1107            Self::CreateIndex => write!(f, "CREATE INDEX"),
1108        }
1109    }
1110}
1111
1112#[derive(Clone, Debug)]
1113pub struct ExplainPushdownPlan {
1114    pub explainee: Explainee,
1115}
1116
1117#[derive(Clone, Debug)]
1118pub struct ExplainTimestampPlan {
1119    pub format: ExplainFormat,
1120    pub raw_plan: HirRelationExpr,
1121    pub when: QueryWhen,
1122}
1123
1124#[derive(Debug)]
1125pub struct ExplainSinkSchemaPlan {
1126    pub sink_from: GlobalId,
1127    pub json_schema: String,
1128}
1129
1130#[derive(Debug)]
1131pub struct SendDiffsPlan {
1132    pub id: CatalogItemId,
1133    pub updates: Vec<(Row, Diff)>,
1134    pub kind: MutationKind,
1135    pub returning: Vec<(Row, NonZeroUsize)>,
1136    pub max_result_size: u64,
1137}
1138
1139#[derive(Debug)]
1140pub struct InsertPlan {
1141    pub id: CatalogItemId,
1142    pub values: HirRelationExpr,
1143    pub returning: Vec<mz_expr::MirScalarExpr>,
1144}
1145
1146#[derive(Debug)]
1147pub struct ReadThenWritePlan {
1148    pub id: CatalogItemId,
1149    pub selection: HirRelationExpr,
1150    pub finishing: RowSetFinishing,
1151    pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1152    pub kind: MutationKind,
1153    pub returning: Vec<mz_expr::MirScalarExpr>,
1154}
1155
1156/// Generated by `ALTER ... IF EXISTS` if the named object did not exist.
1157#[derive(Debug)]
1158pub struct AlterNoopPlan {
1159    pub object_type: ObjectType,
1160}
1161
1162#[derive(Debug)]
1163pub struct AlterSetClusterPlan {
1164    pub id: CatalogItemId,
1165    pub set_cluster: ClusterId,
1166}
1167
1168#[derive(Debug)]
1169pub struct AlterRetainHistoryPlan {
1170    pub id: CatalogItemId,
1171    pub value: Option<Value>,
1172    pub window: CompactionWindow,
1173    pub object_type: ObjectType,
1174}
1175
1176#[derive(Debug, Clone)]
1177
1178pub enum AlterOptionParameter<T = String> {
1179    Set(T),
1180    Reset,
1181    Unchanged,
1182}
1183
1184#[derive(Debug)]
1185pub enum AlterConnectionAction {
1186    RotateKeys,
1187    AlterOptions {
1188        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1189        drop_options: BTreeSet<ConnectionOptionName>,
1190        validate: bool,
1191    },
1192}
1193
1194#[derive(Debug)]
1195pub struct AlterConnectionPlan {
1196    pub id: CatalogItemId,
1197    pub action: AlterConnectionAction,
1198}
1199
1200#[derive(Debug)]
1201pub enum AlterSourceAction {
1202    AddSubsourceExports {
1203        subsources: Vec<CreateSourcePlanBundle>,
1204        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1205    },
1206    RefreshReferences {
1207        references: SourceReferences,
1208    },
1209}
1210
1211#[derive(Debug)]
1212pub struct AlterSourcePlan {
1213    pub item_id: CatalogItemId,
1214    pub ingestion_id: GlobalId,
1215    pub action: AlterSourceAction,
1216}
1217
1218#[derive(Debug, Clone)]
1219pub struct AlterSinkPlan {
1220    pub item_id: CatalogItemId,
1221    pub global_id: GlobalId,
1222    pub sink: Sink,
1223    pub with_snapshot: bool,
1224    pub in_cluster: ClusterId,
1225}
1226
1227#[derive(Debug, Clone)]
1228pub struct AlterClusterPlan {
1229    pub id: ClusterId,
1230    pub name: String,
1231    pub options: PlanClusterOption,
1232    pub strategy: AlterClusterPlanStrategy,
1233}
1234
1235#[derive(Debug)]
1236pub struct AlterClusterRenamePlan {
1237    pub id: ClusterId,
1238    pub name: String,
1239    pub to_name: String,
1240}
1241
1242#[derive(Debug)]
1243pub struct AlterClusterReplicaRenamePlan {
1244    pub cluster_id: ClusterId,
1245    pub replica_id: ReplicaId,
1246    pub name: QualifiedReplica,
1247    pub to_name: String,
1248}
1249
1250#[derive(Debug)]
1251pub struct AlterItemRenamePlan {
1252    pub id: CatalogItemId,
1253    pub current_full_name: FullItemName,
1254    pub to_name: String,
1255    pub object_type: ObjectType,
1256}
1257
1258#[derive(Debug)]
1259pub struct AlterSchemaRenamePlan {
1260    pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1261    pub new_schema_name: String,
1262}
1263
1264#[derive(Debug)]
1265pub struct AlterSchemaSwapPlan {
1266    pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1267    pub schema_a_name: String,
1268    pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1269    pub schema_b_name: String,
1270    pub name_temp: String,
1271}
1272
1273#[derive(Debug)]
1274pub struct AlterClusterSwapPlan {
1275    pub id_a: ClusterId,
1276    pub id_b: ClusterId,
1277    pub name_a: String,
1278    pub name_b: String,
1279    pub name_temp: String,
1280}
1281
1282#[derive(Debug)]
1283pub struct AlterSecretPlan {
1284    pub id: CatalogItemId,
1285    pub secret_as: MirScalarExpr,
1286}
1287
1288#[derive(Debug)]
1289pub struct AlterSystemSetPlan {
1290    pub name: String,
1291    pub value: VariableValue,
1292}
1293
1294#[derive(Debug)]
1295pub struct AlterSystemResetPlan {
1296    pub name: String,
1297}
1298
1299#[derive(Debug)]
1300pub struct AlterSystemResetAllPlan {}
1301
1302#[derive(Debug)]
1303pub struct AlterRolePlan {
1304    pub id: RoleId,
1305    pub name: String,
1306    pub option: PlannedAlterRoleOption,
1307}
1308
1309#[derive(Debug)]
1310pub struct AlterOwnerPlan {
1311    pub id: ObjectId,
1312    pub object_type: ObjectType,
1313    pub new_owner: RoleId,
1314}
1315
1316#[derive(Debug)]
1317pub struct AlterTablePlan {
1318    pub relation_id: CatalogItemId,
1319    pub column_name: ColumnName,
1320    pub column_type: ColumnType,
1321    pub raw_sql_type: RawDataType,
1322}
1323
1324#[derive(Debug)]
1325pub struct DeclarePlan {
1326    pub name: String,
1327    pub stmt: Statement<Raw>,
1328    pub sql: String,
1329    pub params: Params,
1330}
1331
1332#[derive(Debug)]
1333pub struct FetchPlan {
1334    pub name: String,
1335    pub count: Option<FetchDirection>,
1336    pub timeout: ExecuteTimeout,
1337}
1338
1339#[derive(Debug)]
1340pub struct ClosePlan {
1341    pub name: String,
1342}
1343
1344#[derive(Debug)]
1345pub struct PreparePlan {
1346    pub name: String,
1347    pub stmt: Statement<Raw>,
1348    pub sql: String,
1349    pub desc: StatementDesc,
1350}
1351
1352#[derive(Debug)]
1353pub struct ExecutePlan {
1354    pub name: String,
1355    pub params: Params,
1356}
1357
1358#[derive(Debug)]
1359pub struct DeallocatePlan {
1360    pub name: Option<String>,
1361}
1362
1363#[derive(Debug)]
1364pub struct RaisePlan {
1365    pub severity: NoticeSeverity,
1366}
1367
1368#[derive(Debug)]
1369pub struct GrantRolePlan {
1370    /// The roles that are gaining members.
1371    pub role_ids: Vec<RoleId>,
1372    /// The roles that will be added to `role_id`.
1373    pub member_ids: Vec<RoleId>,
1374    /// The role that granted the membership.
1375    pub grantor_id: RoleId,
1376}
1377
1378#[derive(Debug)]
1379pub struct RevokeRolePlan {
1380    /// The roles that are losing members.
1381    pub role_ids: Vec<RoleId>,
1382    /// The roles that will be removed from `role_id`.
1383    pub member_ids: Vec<RoleId>,
1384    /// The role that revoked the membership.
1385    pub grantor_id: RoleId,
1386}
1387
1388#[derive(Debug)]
1389pub struct UpdatePrivilege {
1390    /// The privileges being granted/revoked on an object.
1391    pub acl_mode: AclMode,
1392    /// The ID of the object receiving privileges.
1393    pub target_id: SystemObjectId,
1394    /// The role that is granting the privileges.
1395    pub grantor: RoleId,
1396}
1397
1398#[derive(Debug)]
1399pub struct GrantPrivilegesPlan {
1400    /// Description of each privilege being granted.
1401    pub update_privileges: Vec<UpdatePrivilege>,
1402    /// The roles that will granted the privileges.
1403    pub grantees: Vec<RoleId>,
1404}
1405
1406#[derive(Debug)]
1407pub struct RevokePrivilegesPlan {
1408    /// Description of each privilege being revoked.
1409    pub update_privileges: Vec<UpdatePrivilege>,
1410    /// The roles that will have privileges revoked.
1411    pub revokees: Vec<RoleId>,
1412}
1413#[derive(Debug)]
1414pub struct AlterDefaultPrivilegesPlan {
1415    /// Description of objects that match this default privilege.
1416    pub privilege_objects: Vec<DefaultPrivilegeObject>,
1417    /// The privilege to be granted/revoked from the matching objects.
1418    pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1419    /// Whether this is a grant or revoke.
1420    pub is_grant: bool,
1421}
1422
1423#[derive(Debug)]
1424pub struct ReassignOwnedPlan {
1425    /// The roles whose owned objects are being reassigned.
1426    pub old_roles: Vec<RoleId>,
1427    /// The new owner of the objects.
1428    pub new_role: RoleId,
1429    /// All object IDs to reassign.
1430    pub reassign_ids: Vec<ObjectId>,
1431}
1432
1433#[derive(Debug)]
1434pub struct CommentPlan {
1435    /// The object that this comment is associated with.
1436    pub object_id: CommentObjectId,
1437    /// A sub-component of the object that this comment is associated with, e.g. a column.
1438    ///
1439    /// TODO(parkmycar): <https://github.com/MaterializeInc/database-issues/issues/6711>.
1440    pub sub_component: Option<usize>,
1441    /// The comment itself. If `None` that indicates we should clear the existing comment.
1442    pub comment: Option<String>,
1443}
1444
1445#[derive(Clone, Debug)]
1446pub enum TableDataSource {
1447    /// The table owns data created via INSERT/UPDATE/DELETE statements.
1448    TableWrites { defaults: Vec<Expr<Aug>> },
1449
1450    /// The table receives its data from the identified `DataSourceDesc`.
1451    /// This table type does not support INSERT/UPDATE/DELETE statements.
1452    DataSource {
1453        desc: DataSourceDesc,
1454        timeline: Timeline,
1455    },
1456}
1457
1458#[derive(Clone, Debug)]
1459pub struct Table {
1460    pub create_sql: String,
1461    pub desc: VersionedRelationDesc,
1462    pub temporary: bool,
1463    pub compaction_window: Option<CompactionWindow>,
1464    pub data_source: TableDataSource,
1465}
1466
1467#[derive(Clone, Debug)]
1468pub struct Source {
1469    pub create_sql: String,
1470    pub data_source: DataSourceDesc,
1471    pub desc: RelationDesc,
1472    pub compaction_window: Option<CompactionWindow>,
1473}
1474
1475#[derive(Debug, Clone)]
1476pub enum DataSourceDesc {
1477    /// Receives data from an external system.
1478    Ingestion(Ingestion),
1479    /// This source receives its data from the identified ingestion,
1480    /// specifically the output identified by `external_reference`.
1481    IngestionExport {
1482        ingestion_id: CatalogItemId,
1483        external_reference: UnresolvedItemName,
1484        details: SourceExportDetails,
1485        data_config: SourceExportDataConfig<ReferencedConnection>,
1486    },
1487    /// Receives data from the source's reclocking/remapping operations.
1488    Progress,
1489    /// Receives data from HTTP post requests.
1490    Webhook {
1491        validate_using: Option<WebhookValidation>,
1492        body_format: WebhookBodyFormat,
1493        headers: WebhookHeaders,
1494        /// Only `Some` when created via `CREATE TABLE ... FROM WEBHOOK`.
1495        cluster_id: Option<StorageInstanceId>,
1496    },
1497}
1498
1499#[derive(Clone, Debug, Serialize, Deserialize)]
1500pub struct Ingestion {
1501    pub desc: SourceDesc<ReferencedConnection>,
1502    pub progress_subsource: CatalogItemId,
1503}
1504
1505#[derive(Clone, Debug, Serialize)]
1506pub struct WebhookValidation {
1507    /// The expression used to validate a request.
1508    pub expression: MirScalarExpr,
1509    /// Description of the source that will be created.
1510    pub relation_desc: RelationDesc,
1511    /// The column index to provide the request body and whether to provide it as bytes.
1512    pub bodies: Vec<(usize, bool)>,
1513    /// The column index to provide the request headers and whether to provide the values as bytes.
1514    pub headers: Vec<(usize, bool)>,
1515    /// Any secrets that are used in that validation.
1516    pub secrets: Vec<WebhookValidationSecret>,
1517}
1518
1519impl WebhookValidation {
1520    const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1521
1522    /// Attempt to reduce the internal [`MirScalarExpr`] into a simpler expression.
1523    ///
1524    /// The reduction happens on a separate thread, we also only wait for
1525    /// `WebhookValidation::MAX_REDUCE_TIME` before timing out and returning an error.
1526    pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1527        let WebhookValidation {
1528            expression,
1529            relation_desc,
1530            ..
1531        } = self;
1532
1533        // On a different thread, attempt to reduce the expression.
1534        let mut expression_ = expression.clone();
1535        let desc_ = relation_desc.clone();
1536        let reduce_task = mz_ore::task::spawn_blocking(
1537            || "webhook-validation-reduce",
1538            move || {
1539                expression_.reduce(&desc_.typ().column_types);
1540                expression_
1541            },
1542        );
1543
1544        match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1545            Ok(Ok(reduced_expr)) => {
1546                *expression = reduced_expr;
1547                Ok(())
1548            }
1549            Ok(Err(_)) => Err("joining task"),
1550            Err(_) => Err("timeout"),
1551        }
1552    }
1553}
1554
1555#[derive(Clone, Debug, Default, Serialize)]
1556pub struct WebhookHeaders {
1557    /// Optionally include a column named `headers` whose content is possibly filtered.
1558    pub header_column: Option<WebhookHeaderFilters>,
1559    /// The column index to provide the specific request header, and whether to provide it as bytes.
1560    pub mapped_headers: BTreeMap<usize, (String, bool)>,
1561}
1562
1563impl WebhookHeaders {
1564    /// Returns the number of columns needed to represent our headers.
1565    pub fn num_columns(&self) -> usize {
1566        let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1567        let mapped_headers = self.mapped_headers.len();
1568
1569        header_column + mapped_headers
1570    }
1571}
1572
1573#[derive(Clone, Debug, Default, Serialize)]
1574pub struct WebhookHeaderFilters {
1575    pub block: BTreeSet<String>,
1576    pub allow: BTreeSet<String>,
1577}
1578
1579#[derive(Copy, Clone, Debug, Serialize, Arbitrary)]
1580pub enum WebhookBodyFormat {
1581    Json { array: bool },
1582    Bytes,
1583    Text,
1584}
1585
1586impl From<WebhookBodyFormat> for ScalarType {
1587    fn from(value: WebhookBodyFormat) -> Self {
1588        match value {
1589            WebhookBodyFormat::Json { .. } => ScalarType::Jsonb,
1590            WebhookBodyFormat::Bytes => ScalarType::Bytes,
1591            WebhookBodyFormat::Text => ScalarType::String,
1592        }
1593    }
1594}
1595
1596#[derive(Clone, Debug, Serialize)]
1597pub struct WebhookValidationSecret {
1598    /// Identifies the secret by [`CatalogItemId`].
1599    pub id: CatalogItemId,
1600    /// Column index for the expression context that this secret was originally evaluated in.
1601    pub column_idx: usize,
1602    /// Whether or not this secret should be provided to the expression as Bytes or a String.
1603    pub use_bytes: bool,
1604}
1605
1606#[derive(Clone, Debug)]
1607pub struct Connection {
1608    pub create_sql: String,
1609    pub details: ConnectionDetails,
1610}
1611
1612#[derive(Clone, Debug, Serialize)]
1613pub enum ConnectionDetails {
1614    Kafka(KafkaConnection<ReferencedConnection>),
1615    Csr(CsrConnection<ReferencedConnection>),
1616    Postgres(PostgresConnection<ReferencedConnection>),
1617    Ssh {
1618        connection: SshConnection,
1619        key_1: SshKey,
1620        key_2: SshKey,
1621    },
1622    Aws(AwsConnection),
1623    AwsPrivatelink(AwsPrivatelinkConnection),
1624    MySql(MySqlConnection<ReferencedConnection>),
1625    SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1626    IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1627}
1628
1629impl ConnectionDetails {
1630    pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1631        match self {
1632            ConnectionDetails::Kafka(c) => {
1633                mz_storage_types::connections::Connection::Kafka(c.clone())
1634            }
1635            ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1636            ConnectionDetails::Postgres(c) => {
1637                mz_storage_types::connections::Connection::Postgres(c.clone())
1638            }
1639            ConnectionDetails::Ssh { connection, .. } => {
1640                mz_storage_types::connections::Connection::Ssh(connection.clone())
1641            }
1642            ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1643            ConnectionDetails::AwsPrivatelink(c) => {
1644                mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1645            }
1646            ConnectionDetails::MySql(c) => {
1647                mz_storage_types::connections::Connection::MySql(c.clone())
1648            }
1649            ConnectionDetails::SqlServer(c) => {
1650                mz_storage_types::connections::Connection::SqlServer(c.clone())
1651            }
1652            ConnectionDetails::IcebergCatalog(c) => {
1653                mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1654            }
1655        }
1656    }
1657}
1658
1659#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1660pub struct NetworkPolicyRule {
1661    pub name: String,
1662    pub action: NetworkPolicyRuleAction,
1663    pub address: PolicyAddress,
1664    pub direction: NetworkPolicyRuleDirection,
1665}
1666
1667#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1668pub enum NetworkPolicyRuleAction {
1669    Allow,
1670}
1671
1672impl std::fmt::Display for NetworkPolicyRuleAction {
1673    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1674        match self {
1675            Self::Allow => write!(f, "allow"),
1676        }
1677    }
1678}
1679impl TryFrom<&str> for NetworkPolicyRuleAction {
1680    type Error = PlanError;
1681    fn try_from(value: &str) -> Result<Self, Self::Error> {
1682        match value.to_uppercase().as_str() {
1683            "ALLOW" => Ok(Self::Allow),
1684            _ => Err(PlanError::Unstructured(
1685                "Allow is the only valid option".into(),
1686            )),
1687        }
1688    }
1689}
1690
1691#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1692pub enum NetworkPolicyRuleDirection {
1693    Ingress,
1694}
1695impl std::fmt::Display for NetworkPolicyRuleDirection {
1696    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1697        match self {
1698            Self::Ingress => write!(f, "ingress"),
1699        }
1700    }
1701}
1702impl TryFrom<&str> for NetworkPolicyRuleDirection {
1703    type Error = PlanError;
1704    fn try_from(value: &str) -> Result<Self, Self::Error> {
1705        match value.to_uppercase().as_str() {
1706            "INGRESS" => Ok(Self::Ingress),
1707            _ => Err(PlanError::Unstructured(
1708                "Ingress is the only valid option".into(),
1709            )),
1710        }
1711    }
1712}
1713
1714#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1715pub struct PolicyAddress(pub IpNet);
1716impl std::fmt::Display for PolicyAddress {
1717    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1718        write!(f, "{}", &self.0.to_string())
1719    }
1720}
1721impl From<String> for PolicyAddress {
1722    fn from(value: String) -> Self {
1723        Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1724    }
1725}
1726impl TryFrom<&str> for PolicyAddress {
1727    type Error = PlanError;
1728    fn try_from(value: &str) -> Result<Self, Self::Error> {
1729        let net = IpNet::from_str(value)
1730            .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1731        Ok(Self(net))
1732    }
1733}
1734
1735impl Serialize for PolicyAddress {
1736    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1737    where
1738        S: serde::Serializer,
1739    {
1740        serializer.serialize_str(&format!("{}", &self.0))
1741    }
1742}
1743
1744#[derive(Clone, Debug, Serialize)]
1745pub enum SshKey {
1746    PublicOnly(String),
1747    Both(SshKeyPair),
1748}
1749
1750impl SshKey {
1751    pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1752        match self {
1753            SshKey::PublicOnly(_) => None,
1754            SshKey::Both(key_pair) => Some(key_pair),
1755        }
1756    }
1757
1758    pub fn public_key(&self) -> String {
1759        match self {
1760            SshKey::PublicOnly(s) => s.into(),
1761            SshKey::Both(p) => p.ssh_public_key(),
1762        }
1763    }
1764}
1765
1766#[derive(Clone, Debug)]
1767pub struct Secret {
1768    pub create_sql: String,
1769    pub secret_as: MirScalarExpr,
1770}
1771
1772#[derive(Clone, Debug)]
1773pub struct Sink {
1774    /// Parse-able SQL that is stored durably and defines this sink.
1775    pub create_sql: String,
1776    /// Collection we read into this sink.
1777    pub from: GlobalId,
1778    /// Type of connection to the external service we sink into.
1779    pub connection: StorageSinkConnection<ReferencedConnection>,
1780    // TODO(guswynn): this probably should just be in the `connection`.
1781    pub envelope: SinkEnvelope,
1782    pub version: u64,
1783}
1784
1785#[derive(Clone, Debug)]
1786pub struct View {
1787    /// Parse-able SQL that is stored durably and defines this view.
1788    pub create_sql: String,
1789    /// Unoptimized high-level expression from parsing the `create_sql`.
1790    pub expr: HirRelationExpr,
1791    /// All of the catalog objects that are referenced by this view, according to the `expr`.
1792    pub dependencies: DependencyIds,
1793    /// Columns of this view.
1794    pub column_names: Vec<ColumnName>,
1795    /// If this view is created in the temporary schema, e.g. `CREATE TEMPORARY ...`.
1796    pub temporary: bool,
1797}
1798
1799#[derive(Clone, Debug)]
1800pub struct MaterializedView {
1801    /// Parse-able SQL that is stored durably and defines this materialized view.
1802    pub create_sql: String,
1803    /// Unoptimized high-level expression from parsing the `create_sql`.
1804    pub expr: HirRelationExpr,
1805    /// All of the catalog objects that are referenced by this materialized view, according to the `expr`.
1806    pub dependencies: DependencyIds,
1807    /// Columns of this view.
1808    pub column_names: Vec<ColumnName>,
1809    /// Cluster this materialized view will get installed on.
1810    pub cluster_id: ClusterId,
1811    pub non_null_assertions: Vec<usize>,
1812    pub compaction_window: Option<CompactionWindow>,
1813    pub refresh_schedule: Option<RefreshSchedule>,
1814    pub as_of: Option<Timestamp>,
1815}
1816
1817#[derive(Clone, Debug)]
1818pub struct Index {
1819    /// Parse-able SQL that is stored durably and defines this index.
1820    pub create_sql: String,
1821    /// Collection this index is on top of.
1822    pub on: GlobalId,
1823    pub keys: Vec<mz_expr::MirScalarExpr>,
1824    pub compaction_window: Option<CompactionWindow>,
1825    pub cluster_id: ClusterId,
1826}
1827
1828#[derive(Clone, Debug)]
1829pub struct Type {
1830    pub create_sql: String,
1831    pub inner: CatalogType<IdReference>,
1832}
1833
1834/// Specifies when a `Peek` or `Subscribe` should occur.
1835#[derive(Deserialize, Clone, Debug, PartialEq)]
1836pub enum QueryWhen {
1837    /// The peek should occur at the latest possible timestamp that allows the
1838    /// peek to complete immediately.
1839    Immediately,
1840    /// The peek should occur at a timestamp that allows the peek to see all
1841    /// data written to tables within Materialize.
1842    FreshestTableWrite,
1843    /// The peek should occur at the timestamp described by the specified
1844    /// expression.
1845    ///
1846    /// The expression may have any type.
1847    AtTimestamp(MirScalarExpr),
1848    /// Same as Immediately, but will also advance to at least the specified
1849    /// expression.
1850    AtLeastTimestamp(MirScalarExpr),
1851}
1852
1853impl QueryWhen {
1854    /// Returns a timestamp to which the candidate must be advanced.
1855    pub fn advance_to_timestamp(&self) -> Option<MirScalarExpr> {
1856        match self {
1857            QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1858            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1859        }
1860    }
1861    /// Returns whether the candidate's upper bound is constrained.
1862    /// This is only true for `AtTimestamp` since it is the only variant that
1863    /// specifies a timestamp.
1864    pub fn constrains_upper(&self) -> bool {
1865        match self {
1866            QueryWhen::AtTimestamp(_) => true,
1867            QueryWhen::AtLeastTimestamp(_)
1868            | QueryWhen::Immediately
1869            | QueryWhen::FreshestTableWrite => false,
1870        }
1871    }
1872    /// Returns whether the candidate must be advanced to the since.
1873    pub fn advance_to_since(&self) -> bool {
1874        match self {
1875            QueryWhen::Immediately
1876            | QueryWhen::AtLeastTimestamp(_)
1877            | QueryWhen::FreshestTableWrite => true,
1878            QueryWhen::AtTimestamp(_) => false,
1879        }
1880    }
1881    /// Returns whether the candidate can be advanced to the upper.
1882    pub fn can_advance_to_upper(&self) -> bool {
1883        match self {
1884            QueryWhen::Immediately => true,
1885            QueryWhen::FreshestTableWrite
1886            | QueryWhen::AtTimestamp(_)
1887            | QueryWhen::AtLeastTimestamp(_) => false,
1888        }
1889    }
1890
1891    /// Returns whether the candidate can be advanced to the timeline's timestamp.
1892    pub fn can_advance_to_timeline_ts(&self) -> bool {
1893        match self {
1894            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1895            QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1896        }
1897    }
1898    /// Returns whether the candidate must be advanced to the timeline's timestamp.
1899    pub fn must_advance_to_timeline_ts(&self) -> bool {
1900        match self {
1901            QueryWhen::FreshestTableWrite => true,
1902            QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1903                false
1904            }
1905        }
1906    }
1907    /// Returns whether the selected timestamp should be tracked within the current transaction.
1908    pub fn is_transactional(&self) -> bool {
1909        match self {
1910            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1911            QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1912        }
1913    }
1914}
1915
1916#[derive(Debug, Copy, Clone)]
1917pub enum MutationKind {
1918    Insert,
1919    Update,
1920    Delete,
1921}
1922
1923#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1924pub enum CopyFormat {
1925    Text,
1926    Csv,
1927    Binary,
1928    Parquet,
1929}
1930
1931#[derive(Debug, Copy, Clone)]
1932pub enum ExecuteTimeout {
1933    None,
1934    Seconds(f64),
1935    WaitOnce,
1936}
1937
1938#[derive(Clone, Debug)]
1939pub enum IndexOption {
1940    /// Configures the logical compaction window for an index.
1941    RetainHistory(CompactionWindow),
1942}
1943
1944#[derive(Clone, Debug)]
1945pub enum TableOption {
1946    /// Configures the logical compaction window for a table.
1947    RetainHistory(CompactionWindow),
1948}
1949
1950#[derive(Clone, Debug)]
1951pub struct PlanClusterOption {
1952    pub availability_zones: AlterOptionParameter<Vec<String>>,
1953    pub introspection_debugging: AlterOptionParameter<bool>,
1954    pub introspection_interval: AlterOptionParameter<OptionalDuration>,
1955    pub managed: AlterOptionParameter<bool>,
1956    pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
1957    pub replication_factor: AlterOptionParameter<u32>,
1958    pub size: AlterOptionParameter,
1959    pub schedule: AlterOptionParameter<ClusterSchedule>,
1960    pub workload_class: AlterOptionParameter<Option<String>>,
1961}
1962
1963impl Default for PlanClusterOption {
1964    fn default() -> Self {
1965        Self {
1966            availability_zones: AlterOptionParameter::Unchanged,
1967            introspection_debugging: AlterOptionParameter::Unchanged,
1968            introspection_interval: AlterOptionParameter::Unchanged,
1969            managed: AlterOptionParameter::Unchanged,
1970            replicas: AlterOptionParameter::Unchanged,
1971            replication_factor: AlterOptionParameter::Unchanged,
1972            size: AlterOptionParameter::Unchanged,
1973            schedule: AlterOptionParameter::Unchanged,
1974            workload_class: AlterOptionParameter::Unchanged,
1975        }
1976    }
1977}
1978
1979#[derive(Clone, Debug, PartialEq, Eq)]
1980pub enum AlterClusterPlanStrategy {
1981    None,
1982    For(Duration),
1983    UntilReady {
1984        on_timeout: OnTimeoutAction,
1985        timeout: Duration,
1986    },
1987}
1988
1989#[derive(Clone, Debug, PartialEq, Eq)]
1990pub enum OnTimeoutAction {
1991    Commit,
1992    Rollback,
1993}
1994
1995impl Default for OnTimeoutAction {
1996    fn default() -> Self {
1997        Self::Commit
1998    }
1999}
2000
2001impl TryFrom<&str> for OnTimeoutAction {
2002    type Error = PlanError;
2003    fn try_from(value: &str) -> Result<Self, Self::Error> {
2004        match value.to_uppercase().as_str() {
2005            "COMMIT" => Ok(Self::Commit),
2006            "ROLLBACK" => Ok(Self::Rollback),
2007            _ => Err(PlanError::Unstructured(
2008                "Valid options are COMMIT, ROLLBACK".into(),
2009            )),
2010        }
2011    }
2012}
2013
2014impl AlterClusterPlanStrategy {
2015    pub fn is_none(&self) -> bool {
2016        matches!(self, Self::None)
2017    }
2018    pub fn is_some(&self) -> bool {
2019        !matches!(self, Self::None)
2020    }
2021}
2022
2023impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2024    type Error = PlanError;
2025
2026    fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2027        Ok(match value.wait {
2028            Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2029            Some(ClusterAlterOptionValue::UntilReady(options)) => {
2030                let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2031                Self::UntilReady {
2032                    timeout: match extracted.timeout {
2033                        Some(d) => d,
2034                        None => Err(PlanError::UntilReadyTimeoutRequired)?,
2035                    },
2036                    on_timeout: match extracted.on_timeout {
2037                        Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2038                            PlanError::InvalidOptionValue {
2039                                option_name: "ON TIMEOUT".into(),
2040                                err: Box::new(e),
2041                            }
2042                        })?,
2043                        None => OnTimeoutAction::default(),
2044                    },
2045                }
2046            }
2047            None => Self::None,
2048        })
2049    }
2050}
2051
2052/// A vector of values to which parameter references should be bound.
2053#[derive(Debug, Clone)]
2054pub struct Params {
2055    /// The datums that were provided in the EXECUTE statement.
2056    pub datums: Row,
2057    /// The types of the datums provided in the EXECUTE statement.
2058    pub execute_types: Vec<ScalarType>,
2059    /// The types that the prepared statement expects based on its definition.
2060    pub expected_types: Vec<ScalarType>,
2061}
2062
2063impl Params {
2064    /// Returns a `Params` with no parameters.
2065    pub fn empty() -> Params {
2066        Params {
2067            datums: Row::pack_slice(&[]),
2068            execute_types: vec![],
2069            expected_types: vec![],
2070        }
2071    }
2072}
2073
2074/// Controls planning of a SQL query.
2075#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, Copy)]
2076pub struct PlanContext {
2077    pub wall_time: DateTime<Utc>,
2078    pub ignore_if_exists_errors: bool,
2079}
2080
2081impl PlanContext {
2082    pub fn new(wall_time: DateTime<Utc>) -> Self {
2083        Self {
2084            wall_time,
2085            ignore_if_exists_errors: false,
2086        }
2087    }
2088
2089    /// Return a PlanContext with zero values. This should only be used when
2090    /// planning is required but unused (like in `plan_create_table()`) or in
2091    /// tests.
2092    pub fn zero() -> Self {
2093        PlanContext {
2094            wall_time: now::to_datetime(NOW_ZERO()),
2095            ignore_if_exists_errors: false,
2096        }
2097    }
2098
2099    pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2100        self.ignore_if_exists_errors = value;
2101        self
2102    }
2103}