mz_sql/
plan.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL planning.
11//!
12//! SQL planning is the process of taking the abstract syntax tree of a
13//! [`Statement`] and turning it into a [`Plan`] that the dataflow layer can
14//! execute.
15//!
16//! Statements must be purified before they can be planned. See the
17//! [`pure`](crate::pure) module for details.
18
19// Internal module layout.
20//
21// The entry point for planning is `statement::handle_statement`. That function
22// dispatches to a more specific `handle` function for the particular statement
23// type. For most statements, this `handle` function is uninteresting and short,
24// but anything involving a `SELECT` statement gets complicated. `SELECT`
25// queries wind through the functions in the `query` module, starting with
26// `plan_root_query` and fanning out based on the contents of the `SELECT`
27// statement.
28
29use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{
41    CollectionPlan, ColumnOrder, MapFilterProject, MirRelationExpr, MirScalarExpr, RowSetFinishing,
42};
43use mz_ore::now::{self, NOW_ZERO};
44use mz_pgcopy::CopyFormatParams;
45use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
46use mz_repr::explain::{ExplainConfig, ExplainFormat};
47use mz_repr::network_policy_id::NetworkPolicyId;
48use mz_repr::optimize::OptimizerFeatureOverrides;
49use mz_repr::refresh_schedule::RefreshSchedule;
50use mz_repr::role_id::RoleId;
51use mz_repr::{
52    CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, Row, SqlColumnType,
53    SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
54};
55use mz_sql_parser::ast::{
56    AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
57    RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
58    Value, WithOptionValue,
59};
60use mz_ssh_util::keys::SshKeyPair;
61use mz_storage_types::connections::aws::AwsConnection;
62use mz_storage_types::connections::inline::ReferencedConnection;
63use mz_storage_types::connections::{
64    AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
65    MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
66};
67use mz_storage_types::instances::StorageInstanceId;
68use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
69use mz_storage_types::sources::{
70    SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
71};
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74
75use crate::ast::{
76    ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
77    TransactionAccessMode,
78};
79use crate::catalog::{
80    CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
81    RoleAttributesRaw,
82};
83use crate::names::{
84    Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
85    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
86};
87
88pub(crate) mod error;
89pub(crate) mod explain;
90pub(crate) mod hir;
91pub(crate) mod literal;
92pub(crate) mod lowering;
93pub(crate) mod notice;
94pub(crate) mod plan_utils;
95pub(crate) mod query;
96pub(crate) mod scope;
97pub(crate) mod side_effecting_func;
98pub(crate) mod statement;
99pub(crate) mod transform_ast;
100pub(crate) mod transform_hir;
101pub(crate) mod typeconv;
102pub(crate) mod with_options;
103
104use crate::plan;
105use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
106use crate::plan::with_options::OptionalDuration;
107pub use error::PlanError;
108pub use explain::normalize_subqueries;
109pub use hir::{
110    AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
111    WindowExprType,
112};
113pub use lowering::Config as HirToMirConfig;
114pub use notice::PlanNotice;
115pub use query::{ExprContext, QueryContext, QueryLifetime};
116pub use scope::Scope;
117pub use side_effecting_func::SideEffectingFunc;
118pub use statement::ddl::{
119    AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
120    PlannedAlterRoleOption, PlannedRoleVariable, SqlServerConfigOptionExtracted,
121};
122pub use statement::{
123    StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
124    resolve_cluster_for_materialized_view,
125};
126
127use self::statement::ddl::ClusterAlterOptionExtracted;
128use self::with_options::TryFromValue;
129
130/// Instructions for executing a SQL query.
131#[derive(Debug, EnumKind)]
132#[enum_kind(PlanKind)]
133pub enum Plan {
134    CreateConnection(CreateConnectionPlan),
135    CreateDatabase(CreateDatabasePlan),
136    CreateSchema(CreateSchemaPlan),
137    CreateRole(CreateRolePlan),
138    CreateCluster(CreateClusterPlan),
139    CreateClusterReplica(CreateClusterReplicaPlan),
140    CreateSource(CreateSourcePlan),
141    CreateSources(Vec<CreateSourcePlanBundle>),
142    CreateSecret(CreateSecretPlan),
143    CreateSink(CreateSinkPlan),
144    CreateTable(CreateTablePlan),
145    CreateView(CreateViewPlan),
146    CreateMaterializedView(CreateMaterializedViewPlan),
147    CreateContinualTask(CreateContinualTaskPlan),
148    CreateNetworkPolicy(CreateNetworkPolicyPlan),
149    CreateIndex(CreateIndexPlan),
150    CreateType(CreateTypePlan),
151    Comment(CommentPlan),
152    DiscardTemp,
153    DiscardAll,
154    DropObjects(DropObjectsPlan),
155    DropOwned(DropOwnedPlan),
156    EmptyQuery,
157    ShowAllVariables,
158    ShowCreate(ShowCreatePlan),
159    ShowColumns(ShowColumnsPlan),
160    ShowVariable(ShowVariablePlan),
161    InspectShard(InspectShardPlan),
162    SetVariable(SetVariablePlan),
163    ResetVariable(ResetVariablePlan),
164    SetTransaction(SetTransactionPlan),
165    StartTransaction(StartTransactionPlan),
166    CommitTransaction(CommitTransactionPlan),
167    AbortTransaction(AbortTransactionPlan),
168    Select(SelectPlan),
169    Subscribe(SubscribePlan),
170    CopyFrom(CopyFromPlan),
171    CopyTo(CopyToPlan),
172    ExplainPlan(ExplainPlanPlan),
173    ExplainPushdown(ExplainPushdownPlan),
174    ExplainTimestamp(ExplainTimestampPlan),
175    ExplainSinkSchema(ExplainSinkSchemaPlan),
176    Insert(InsertPlan),
177    AlterCluster(AlterClusterPlan),
178    AlterClusterSwap(AlterClusterSwapPlan),
179    AlterNoop(AlterNoopPlan),
180    AlterSetCluster(AlterSetClusterPlan),
181    AlterConnection(AlterConnectionPlan),
182    AlterSource(AlterSourcePlan),
183    AlterClusterRename(AlterClusterRenamePlan),
184    AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
185    AlterItemRename(AlterItemRenamePlan),
186    AlterSchemaRename(AlterSchemaRenamePlan),
187    AlterSchemaSwap(AlterSchemaSwapPlan),
188    AlterSecret(AlterSecretPlan),
189    AlterSink(AlterSinkPlan),
190    AlterSystemSet(AlterSystemSetPlan),
191    AlterSystemReset(AlterSystemResetPlan),
192    AlterSystemResetAll(AlterSystemResetAllPlan),
193    AlterRole(AlterRolePlan),
194    AlterOwner(AlterOwnerPlan),
195    AlterTableAddColumn(AlterTablePlan),
196    AlterNetworkPolicy(AlterNetworkPolicyPlan),
197    Declare(DeclarePlan),
198    Fetch(FetchPlan),
199    Close(ClosePlan),
200    ReadThenWrite(ReadThenWritePlan),
201    Prepare(PreparePlan),
202    Execute(ExecutePlan),
203    Deallocate(DeallocatePlan),
204    Raise(RaisePlan),
205    GrantRole(GrantRolePlan),
206    RevokeRole(RevokeRolePlan),
207    GrantPrivileges(GrantPrivilegesPlan),
208    RevokePrivileges(RevokePrivilegesPlan),
209    AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
210    ReassignOwned(ReassignOwnedPlan),
211    SideEffectingFunc(SideEffectingFunc),
212    ValidateConnection(ValidateConnectionPlan),
213    AlterRetainHistory(AlterRetainHistoryPlan),
214}
215
216impl Plan {
217    /// Expresses which [`StatementKind`] can generate which set of
218    /// [`PlanKind`].
219    pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
220        match stmt {
221            StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
222            StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
223            StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
224            StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
225            StatementKind::AlterObjectRename => &[
226                PlanKind::AlterClusterRename,
227                PlanKind::AlterClusterReplicaRename,
228                PlanKind::AlterItemRename,
229                PlanKind::AlterSchemaRename,
230                PlanKind::AlterNoop,
231            ],
232            StatementKind::AlterObjectSwap => &[
233                PlanKind::AlterClusterSwap,
234                PlanKind::AlterSchemaSwap,
235                PlanKind::AlterNoop,
236            ],
237            StatementKind::AlterRole => &[PlanKind::AlterRole],
238            StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
239            StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
240            StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
241            StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
242            StatementKind::AlterSource => &[
243                PlanKind::AlterNoop,
244                PlanKind::AlterSource,
245                PlanKind::AlterRetainHistory,
246            ],
247            StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
248            StatementKind::AlterSystemResetAll => {
249                &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
250            }
251            StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
252            StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
253            StatementKind::AlterTableAddColumn => {
254                &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
255            }
256            StatementKind::Close => &[PlanKind::Close],
257            StatementKind::Comment => &[PlanKind::Comment],
258            StatementKind::Commit => &[PlanKind::CommitTransaction],
259            StatementKind::Copy => &[
260                PlanKind::CopyFrom,
261                PlanKind::Select,
262                PlanKind::Subscribe,
263                PlanKind::CopyTo,
264            ],
265            StatementKind::CreateCluster => &[PlanKind::CreateCluster],
266            StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
267            StatementKind::CreateConnection => &[PlanKind::CreateConnection],
268            StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
269            StatementKind::CreateIndex => &[PlanKind::CreateIndex],
270            StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
271            StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
272            StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
273            StatementKind::CreateRole => &[PlanKind::CreateRole],
274            StatementKind::CreateSchema => &[PlanKind::CreateSchema],
275            StatementKind::CreateSecret => &[PlanKind::CreateSecret],
276            StatementKind::CreateSink => &[PlanKind::CreateSink],
277            StatementKind::CreateSource | StatementKind::CreateSubsource => {
278                &[PlanKind::CreateSource]
279            }
280            StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
281            StatementKind::CreateTable => &[PlanKind::CreateTable],
282            StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
283            StatementKind::CreateType => &[PlanKind::CreateType],
284            StatementKind::CreateView => &[PlanKind::CreateView],
285            StatementKind::Deallocate => &[PlanKind::Deallocate],
286            StatementKind::Declare => &[PlanKind::Declare],
287            StatementKind::Delete => &[PlanKind::ReadThenWrite],
288            StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
289            StatementKind::DropObjects => &[PlanKind::DropObjects],
290            StatementKind::DropOwned => &[PlanKind::DropOwned],
291            StatementKind::Execute => &[PlanKind::Execute],
292            StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
293            StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
294            StatementKind::ExplainAnalyze => &[PlanKind::Select],
295            StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
296            StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
297            StatementKind::Fetch => &[PlanKind::Fetch],
298            StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
299            StatementKind::GrantRole => &[PlanKind::GrantRole],
300            StatementKind::Insert => &[PlanKind::Insert],
301            StatementKind::Prepare => &[PlanKind::Prepare],
302            StatementKind::Raise => &[PlanKind::Raise],
303            StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
304            StatementKind::ResetVariable => &[PlanKind::ResetVariable],
305            StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
306            StatementKind::RevokeRole => &[PlanKind::RevokeRole],
307            StatementKind::Rollback => &[PlanKind::AbortTransaction],
308            StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
309            StatementKind::SetTransaction => &[PlanKind::SetTransaction],
310            StatementKind::SetVariable => &[PlanKind::SetVariable],
311            StatementKind::Show => &[
312                PlanKind::Select,
313                PlanKind::ShowVariable,
314                PlanKind::ShowCreate,
315                PlanKind::ShowColumns,
316                PlanKind::ShowAllVariables,
317                PlanKind::InspectShard,
318            ],
319            StatementKind::StartTransaction => &[PlanKind::StartTransaction],
320            StatementKind::Subscribe => &[PlanKind::Subscribe],
321            StatementKind::Update => &[PlanKind::ReadThenWrite],
322            StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
323            StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
324        }
325    }
326
327    /// Returns a human readable name of the plan. Meant for use in messages sent back to a user.
328    pub fn name(&self) -> &str {
329        match self {
330            Plan::CreateConnection(_) => "create connection",
331            Plan::CreateDatabase(_) => "create database",
332            Plan::CreateSchema(_) => "create schema",
333            Plan::CreateRole(_) => "create role",
334            Plan::CreateCluster(_) => "create cluster",
335            Plan::CreateClusterReplica(_) => "create cluster replica",
336            Plan::CreateSource(_) => "create source",
337            Plan::CreateSources(_) => "create source",
338            Plan::CreateSecret(_) => "create secret",
339            Plan::CreateSink(_) => "create sink",
340            Plan::CreateTable(_) => "create table",
341            Plan::CreateView(_) => "create view",
342            Plan::CreateMaterializedView(_) => "create materialized view",
343            Plan::CreateContinualTask(_) => "create continual task",
344            Plan::CreateIndex(_) => "create index",
345            Plan::CreateType(_) => "create type",
346            Plan::CreateNetworkPolicy(_) => "create network policy",
347            Plan::Comment(_) => "comment",
348            Plan::DiscardTemp => "discard temp",
349            Plan::DiscardAll => "discard all",
350            Plan::DropObjects(plan) => match plan.object_type {
351                ObjectType::Table => "drop table",
352                ObjectType::View => "drop view",
353                ObjectType::MaterializedView => "drop materialized view",
354                ObjectType::Source => "drop source",
355                ObjectType::Sink => "drop sink",
356                ObjectType::Index => "drop index",
357                ObjectType::Type => "drop type",
358                ObjectType::Role => "drop roles",
359                ObjectType::Cluster => "drop clusters",
360                ObjectType::ClusterReplica => "drop cluster replicas",
361                ObjectType::Secret => "drop secret",
362                ObjectType::Connection => "drop connection",
363                ObjectType::Database => "drop database",
364                ObjectType::Schema => "drop schema",
365                ObjectType::Func => "drop function",
366                ObjectType::ContinualTask => "drop continual task",
367                ObjectType::NetworkPolicy => "drop network policy",
368            },
369            Plan::DropOwned(_) => "drop owned",
370            Plan::EmptyQuery => "do nothing",
371            Plan::ShowAllVariables => "show all variables",
372            Plan::ShowCreate(_) => "show create",
373            Plan::ShowColumns(_) => "show columns",
374            Plan::ShowVariable(_) => "show variable",
375            Plan::InspectShard(_) => "inspect shard",
376            Plan::SetVariable(_) => "set variable",
377            Plan::ResetVariable(_) => "reset variable",
378            Plan::SetTransaction(_) => "set transaction",
379            Plan::StartTransaction(_) => "start transaction",
380            Plan::CommitTransaction(_) => "commit",
381            Plan::AbortTransaction(_) => "abort",
382            Plan::Select(_) => "select",
383            Plan::Subscribe(_) => "subscribe",
384            Plan::CopyFrom(_) => "copy from",
385            Plan::CopyTo(_) => "copy to",
386            Plan::ExplainPlan(_) => "explain plan",
387            Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
388            Plan::ExplainTimestamp(_) => "explain timestamp",
389            Plan::ExplainSinkSchema(_) => "explain schema",
390            Plan::Insert(_) => "insert",
391            Plan::AlterNoop(plan) => match plan.object_type {
392                ObjectType::Table => "alter table",
393                ObjectType::View => "alter view",
394                ObjectType::MaterializedView => "alter materialized view",
395                ObjectType::Source => "alter source",
396                ObjectType::Sink => "alter sink",
397                ObjectType::Index => "alter index",
398                ObjectType::Type => "alter type",
399                ObjectType::Role => "alter role",
400                ObjectType::Cluster => "alter cluster",
401                ObjectType::ClusterReplica => "alter cluster replica",
402                ObjectType::Secret => "alter secret",
403                ObjectType::Connection => "alter connection",
404                ObjectType::Database => "alter database",
405                ObjectType::Schema => "alter schema",
406                ObjectType::Func => "alter function",
407                ObjectType::ContinualTask => "alter continual task",
408                ObjectType::NetworkPolicy => "alter network policy",
409            },
410            Plan::AlterCluster(_) => "alter cluster",
411            Plan::AlterClusterRename(_) => "alter cluster rename",
412            Plan::AlterClusterSwap(_) => "alter cluster swap",
413            Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
414            Plan::AlterSetCluster(_) => "alter set cluster",
415            Plan::AlterConnection(_) => "alter connection",
416            Plan::AlterSource(_) => "alter source",
417            Plan::AlterItemRename(_) => "rename item",
418            Plan::AlterSchemaRename(_) => "alter rename schema",
419            Plan::AlterSchemaSwap(_) => "alter swap schema",
420            Plan::AlterSecret(_) => "alter secret",
421            Plan::AlterSink(_) => "alter sink",
422            Plan::AlterSystemSet(_) => "alter system",
423            Plan::AlterSystemReset(_) => "alter system",
424            Plan::AlterSystemResetAll(_) => "alter system",
425            Plan::AlterRole(_) => "alter role",
426            Plan::AlterNetworkPolicy(_) => "alter network policy",
427            Plan::AlterOwner(plan) => match plan.object_type {
428                ObjectType::Table => "alter table owner",
429                ObjectType::View => "alter view owner",
430                ObjectType::MaterializedView => "alter materialized view owner",
431                ObjectType::Source => "alter source owner",
432                ObjectType::Sink => "alter sink owner",
433                ObjectType::Index => "alter index owner",
434                ObjectType::Type => "alter type owner",
435                ObjectType::Role => "alter role owner",
436                ObjectType::Cluster => "alter cluster owner",
437                ObjectType::ClusterReplica => "alter cluster replica owner",
438                ObjectType::Secret => "alter secret owner",
439                ObjectType::Connection => "alter connection owner",
440                ObjectType::Database => "alter database owner",
441                ObjectType::Schema => "alter schema owner",
442                ObjectType::Func => "alter function owner",
443                ObjectType::ContinualTask => "alter continual task owner",
444                ObjectType::NetworkPolicy => "alter network policy owner",
445            },
446            Plan::AlterTableAddColumn(_) => "alter table add column",
447            Plan::Declare(_) => "declare",
448            Plan::Fetch(_) => "fetch",
449            Plan::Close(_) => "close",
450            Plan::ReadThenWrite(plan) => match plan.kind {
451                MutationKind::Insert => "insert into select",
452                MutationKind::Update => "update",
453                MutationKind::Delete => "delete",
454            },
455            Plan::Prepare(_) => "prepare",
456            Plan::Execute(_) => "execute",
457            Plan::Deallocate(_) => "deallocate",
458            Plan::Raise(_) => "raise",
459            Plan::GrantRole(_) => "grant role",
460            Plan::RevokeRole(_) => "revoke role",
461            Plan::GrantPrivileges(_) => "grant privilege",
462            Plan::RevokePrivileges(_) => "revoke privilege",
463            Plan::AlterDefaultPrivileges(_) => "alter default privileges",
464            Plan::ReassignOwned(_) => "reassign owned",
465            Plan::SideEffectingFunc(_) => "side effecting func",
466            Plan::ValidateConnection(_) => "validate connection",
467            Plan::AlterRetainHistory(_) => "alter retain history",
468        }
469    }
470
471    /// Returns `true` iff this `Plan` is allowed to be executed in read-only
472    /// mode.
473    ///
474    /// We use an explicit allow-list, to avoid future additions automatically
475    /// falling into the `true` category.
476    pub fn allowed_in_read_only(&self) -> bool {
477        match self {
478            // These two set non-durable session variables, so are okay in
479            // read-only mode.
480            Plan::SetVariable(_) => true,
481            Plan::ResetVariable(_) => true,
482            Plan::SetTransaction(_) => true,
483            Plan::StartTransaction(_) => true,
484            Plan::CommitTransaction(_) => true,
485            Plan::AbortTransaction(_) => true,
486            Plan::Select(_) => true,
487            Plan::EmptyQuery => true,
488            Plan::ShowAllVariables => true,
489            Plan::ShowCreate(_) => true,
490            Plan::ShowColumns(_) => true,
491            Plan::ShowVariable(_) => true,
492            Plan::InspectShard(_) => true,
493            Plan::Subscribe(_) => true,
494            Plan::CopyTo(_) => true,
495            Plan::ExplainPlan(_) => true,
496            Plan::ExplainPushdown(_) => true,
497            Plan::ExplainTimestamp(_) => true,
498            Plan::ExplainSinkSchema(_) => true,
499            Plan::ValidateConnection(_) => true,
500            _ => false,
501        }
502    }
503}
504
505#[derive(Debug)]
506pub struct StartTransactionPlan {
507    pub access: Option<TransactionAccessMode>,
508    pub isolation_level: Option<TransactionIsolationLevel>,
509}
510
511#[derive(Debug)]
512pub enum TransactionType {
513    Explicit,
514    Implicit,
515}
516
517impl TransactionType {
518    pub fn is_explicit(&self) -> bool {
519        matches!(self, TransactionType::Explicit)
520    }
521
522    pub fn is_implicit(&self) -> bool {
523        matches!(self, TransactionType::Implicit)
524    }
525}
526
527#[derive(Debug)]
528pub struct CommitTransactionPlan {
529    pub transaction_type: TransactionType,
530}
531
532#[derive(Debug)]
533pub struct AbortTransactionPlan {
534    pub transaction_type: TransactionType,
535}
536
537#[derive(Debug)]
538pub struct CreateDatabasePlan {
539    pub name: String,
540    pub if_not_exists: bool,
541}
542
543#[derive(Debug)]
544pub struct CreateSchemaPlan {
545    pub database_spec: ResolvedDatabaseSpecifier,
546    pub schema_name: String,
547    pub if_not_exists: bool,
548}
549
550#[derive(Debug)]
551pub struct CreateRolePlan {
552    pub name: String,
553    pub attributes: RoleAttributesRaw,
554}
555
556#[derive(Debug, PartialEq, Eq, Clone)]
557pub struct CreateClusterPlan {
558    pub name: String,
559    pub variant: CreateClusterVariant,
560    pub workload_class: Option<String>,
561}
562
563#[derive(Debug, PartialEq, Eq, Clone)]
564pub enum CreateClusterVariant {
565    Managed(CreateClusterManagedPlan),
566    Unmanaged(CreateClusterUnmanagedPlan),
567}
568
569#[derive(Debug, PartialEq, Eq, Clone)]
570pub struct CreateClusterUnmanagedPlan {
571    pub replicas: Vec<(String, ReplicaConfig)>,
572}
573
574#[derive(Debug, PartialEq, Eq, Clone)]
575pub struct CreateClusterManagedPlan {
576    pub replication_factor: u32,
577    pub size: String,
578    pub availability_zones: Vec<String>,
579    pub compute: ComputeReplicaConfig,
580    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
581    pub schedule: ClusterSchedule,
582}
583
584#[derive(Debug)]
585pub struct CreateClusterReplicaPlan {
586    pub cluster_id: ClusterId,
587    pub name: String,
588    pub config: ReplicaConfig,
589}
590
591/// Configuration of introspection for a cluster replica.
592#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq)]
593pub struct ComputeReplicaIntrospectionConfig {
594    /// Whether to introspect the introspection.
595    pub debugging: bool,
596    /// The interval at which to introspect.
597    pub interval: Duration,
598}
599
600#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
601pub struct ComputeReplicaConfig {
602    pub introspection: Option<ComputeReplicaIntrospectionConfig>,
603}
604
605#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
606pub enum ReplicaConfig {
607    Unorchestrated {
608        storagectl_addrs: Vec<String>,
609        computectl_addrs: Vec<String>,
610        compute: ComputeReplicaConfig,
611    },
612    Orchestrated {
613        size: String,
614        availability_zone: Option<String>,
615        compute: ComputeReplicaConfig,
616        internal: bool,
617        billed_as: Option<String>,
618    },
619}
620
621#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
622pub enum ClusterSchedule {
623    /// The system won't automatically turn the cluster On or Off.
624    Manual,
625    /// The cluster will be On when a REFRESH materialized view on it needs to refresh.
626    /// `hydration_time_estimate` determines how much time before a refresh to turn the
627    /// cluster On, so that it can rehydrate already before the refresh time.
628    Refresh { hydration_time_estimate: Duration },
629}
630
631impl Default for ClusterSchedule {
632    fn default() -> Self {
633        // (Has to be consistent with `impl Default for ClusterScheduleOptionValue`.)
634        ClusterSchedule::Manual
635    }
636}
637
638#[derive(Debug)]
639pub struct CreateSourcePlan {
640    pub name: QualifiedItemName,
641    pub source: Source,
642    pub if_not_exists: bool,
643    pub timeline: Timeline,
644    // None for subsources, which run on the parent cluster.
645    pub in_cluster: Option<ClusterId>,
646}
647
648#[derive(Clone, Debug, PartialEq, Eq)]
649pub struct SourceReferences {
650    pub updated_at: u64,
651    pub references: Vec<SourceReference>,
652}
653
654/// An available external reference for a source and if possible to retrieve,
655/// any column names it contains.
656#[derive(Clone, Debug, PartialEq, Eq)]
657pub struct SourceReference {
658    pub name: String,
659    pub namespace: Option<String>,
660    pub columns: Vec<String>,
661}
662
663/// A [`CreateSourcePlan`] and the metadata necessary to sequence it.
664#[derive(Debug)]
665pub struct CreateSourcePlanBundle {
666    /// ID of this source in the Catalog.
667    pub item_id: CatalogItemId,
668    /// ID used to reference this source from outside the catalog, e.g. compute.
669    pub global_id: GlobalId,
670    /// Details of the source to create.
671    pub plan: CreateSourcePlan,
672    /// Other catalog objects that are referenced by this source, determined at name resolution.
673    pub resolved_ids: ResolvedIds,
674    /// All the available upstream references for this source.
675    /// Populated for top-level sources that can contain subsources/tables
676    /// and used during sequencing to populate the appropriate catalog fields.
677    pub available_source_references: Option<SourceReferences>,
678}
679
680#[derive(Debug)]
681pub struct CreateConnectionPlan {
682    pub name: QualifiedItemName,
683    pub if_not_exists: bool,
684    pub connection: Connection,
685    pub validate: bool,
686}
687
688#[derive(Debug)]
689pub struct ValidateConnectionPlan {
690    /// ID of the connection in the Catalog.
691    pub id: CatalogItemId,
692    /// The connection to validate.
693    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
694}
695
696#[derive(Debug)]
697pub struct CreateSecretPlan {
698    pub name: QualifiedItemName,
699    pub secret: Secret,
700    pub if_not_exists: bool,
701}
702
703#[derive(Debug)]
704pub struct CreateSinkPlan {
705    pub name: QualifiedItemName,
706    pub sink: Sink,
707    pub with_snapshot: bool,
708    pub if_not_exists: bool,
709    pub in_cluster: ClusterId,
710}
711
712#[derive(Debug)]
713pub struct CreateTablePlan {
714    pub name: QualifiedItemName,
715    pub table: Table,
716    pub if_not_exists: bool,
717}
718
719#[derive(Debug, Clone)]
720pub struct CreateViewPlan {
721    pub name: QualifiedItemName,
722    pub view: View,
723    /// The Catalog objects that this view is replacing, if any.
724    pub replace: Option<CatalogItemId>,
725    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
726    pub drop_ids: Vec<CatalogItemId>,
727    pub if_not_exists: bool,
728    /// True if the view contains an expression that can make the exact column list
729    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
730    pub ambiguous_columns: bool,
731}
732
733#[derive(Debug, Clone)]
734pub struct CreateMaterializedViewPlan {
735    pub name: QualifiedItemName,
736    pub materialized_view: MaterializedView,
737    /// The Catalog objects that this materialized view is replacing, if any.
738    pub replace: Option<CatalogItemId>,
739    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
740    pub drop_ids: Vec<CatalogItemId>,
741    pub if_not_exists: bool,
742    /// True if the materialized view contains an expression that can make the exact column list
743    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
744    pub ambiguous_columns: bool,
745}
746
747#[derive(Debug, Clone)]
748pub struct CreateContinualTaskPlan {
749    pub name: QualifiedItemName,
750    /// During initial creation, the `LocalId` placeholder for this CT in `continual_task.expr`.
751    /// None on restart.
752    pub placeholder_id: Option<mz_expr::LocalId>,
753    pub desc: RelationDesc,
754    /// ID of the collection we read into this continual task.
755    pub input_id: GlobalId,
756    pub with_snapshot: bool,
757    /// Definition for the continual task.
758    pub continual_task: MaterializedView,
759}
760
761#[derive(Debug, Clone)]
762pub struct CreateNetworkPolicyPlan {
763    pub name: String,
764    pub rules: Vec<NetworkPolicyRule>,
765}
766
767#[derive(Debug, Clone)]
768pub struct AlterNetworkPolicyPlan {
769    pub id: NetworkPolicyId,
770    pub name: String,
771    pub rules: Vec<NetworkPolicyRule>,
772}
773
774#[derive(Debug, Clone)]
775pub struct CreateIndexPlan {
776    pub name: QualifiedItemName,
777    pub index: Index,
778    pub if_not_exists: bool,
779}
780
781#[derive(Debug)]
782pub struct CreateTypePlan {
783    pub name: QualifiedItemName,
784    pub typ: Type,
785}
786
787#[derive(Debug)]
788pub struct DropObjectsPlan {
789    /// The IDs of only the objects directly referenced in the `DROP` statement.
790    pub referenced_ids: Vec<ObjectId>,
791    /// All object IDs to drop. Includes `referenced_ids` and all descendants.
792    pub drop_ids: Vec<ObjectId>,
793    /// The type of object that was dropped explicitly in the DROP statement. `ids` may contain
794    /// objects of different types due to CASCADE.
795    pub object_type: ObjectType,
796}
797
798#[derive(Debug)]
799pub struct DropOwnedPlan {
800    /// The role IDs that own the objects.
801    pub role_ids: Vec<RoleId>,
802    /// All object IDs to drop.
803    pub drop_ids: Vec<ObjectId>,
804    /// The privileges to revoke.
805    pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
806    /// The default privileges to revoke.
807    pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
808}
809
810#[derive(Debug)]
811pub struct ShowVariablePlan {
812    pub name: String,
813}
814
815#[derive(Debug)]
816pub struct InspectShardPlan {
817    /// ID of the storage collection to inspect.
818    pub id: GlobalId,
819}
820
821#[derive(Debug)]
822pub struct SetVariablePlan {
823    pub name: String,
824    pub value: VariableValue,
825    pub local: bool,
826}
827
828#[derive(Debug)]
829pub enum VariableValue {
830    Default,
831    Values(Vec<String>),
832}
833
834#[derive(Debug)]
835pub struct ResetVariablePlan {
836    pub name: String,
837}
838
839#[derive(Debug)]
840pub struct SetTransactionPlan {
841    pub local: bool,
842    pub modes: Vec<TransactionMode>,
843}
844
845/// A plan for select statements.
846#[derive(Clone, Debug)]
847pub struct SelectPlan {
848    /// The `SELECT` statement itself. Used for explain/notices, but not otherwise
849    /// load-bearing. Boxed to save stack space.
850    pub select: Option<Box<SelectStatement<Aug>>>,
851    /// The plan as a HIR.
852    pub source: HirRelationExpr,
853    /// At what time should this select happen?
854    pub when: QueryWhen,
855    /// Instructions how to form the result set.
856    pub finishing: RowSetFinishing,
857    /// For `COPY TO`, the format to use.
858    pub copy_to: Option<CopyFormat>,
859}
860
861impl SelectPlan {
862    pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
863        let arity = typ.arity();
864        SelectPlan {
865            select: None,
866            source: HirRelationExpr::Constant { rows, typ },
867            when: QueryWhen::Immediately,
868            finishing: RowSetFinishing::trivial(arity),
869            copy_to: None,
870        }
871    }
872}
873
874#[derive(Debug)]
875pub enum SubscribeOutput {
876    Diffs,
877    WithinTimestampOrderBy {
878        /// We pretend that mz_diff is prepended to the normal columns, making it index 0
879        order_by: Vec<ColumnOrder>,
880    },
881    EnvelopeUpsert {
882        /// Order by with just keys
883        order_by_keys: Vec<ColumnOrder>,
884    },
885    EnvelopeDebezium {
886        /// Order by with just keys
887        order_by_keys: Vec<ColumnOrder>,
888    },
889}
890
891#[derive(Debug)]
892pub struct SubscribePlan {
893    pub from: SubscribeFrom,
894    pub with_snapshot: bool,
895    pub when: QueryWhen,
896    pub up_to: Option<Timestamp>,
897    pub copy_to: Option<CopyFormat>,
898    pub emit_progress: bool,
899    pub output: SubscribeOutput,
900}
901
902#[derive(Debug, Clone)]
903pub enum SubscribeFrom {
904    /// ID of the collection to subscribe to.
905    Id(GlobalId),
906    /// Query to subscribe to.
907    Query {
908        expr: MirRelationExpr,
909        desc: RelationDesc,
910    },
911}
912
913impl SubscribeFrom {
914    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
915        match self {
916            SubscribeFrom::Id(id) => BTreeSet::from([*id]),
917            SubscribeFrom::Query { expr, .. } => expr.depends_on(),
918        }
919    }
920
921    pub fn contains_temporal(&self) -> bool {
922        match self {
923            SubscribeFrom::Id(_) => false,
924            SubscribeFrom::Query { expr, .. } => expr.contains_temporal(),
925        }
926    }
927}
928
929#[derive(Debug)]
930pub struct ShowCreatePlan {
931    pub id: ObjectId,
932    pub row: Row,
933}
934
935#[derive(Debug)]
936pub struct ShowColumnsPlan {
937    pub id: CatalogItemId,
938    pub select_plan: SelectPlan,
939    pub new_resolved_ids: ResolvedIds,
940}
941
942#[derive(Debug)]
943pub struct CopyFromPlan {
944    /// Table we're copying into.
945    pub target_id: CatalogItemId,
946    /// Human-readable full name of the target table.
947    pub target_name: String,
948    /// Source we're copying data from.
949    pub source: CopyFromSource,
950    /// How input columns map to those on the destination table.
951    ///
952    /// TODO(cf2): Remove this field in favor of the mfp.
953    pub columns: Vec<ColumnIndex>,
954    /// [`RelationDesc`] describing the input data.
955    pub source_desc: RelationDesc,
956    /// Changes the shape of the input data to match the destination table.
957    pub mfp: MapFilterProject,
958    /// Format specific params for copying the input data.
959    pub params: CopyFormatParams<'static>,
960    /// Filter for the source files we're copying from, e.g. an S3 prefix.
961    pub filter: Option<CopyFromFilter>,
962}
963
964#[derive(Debug)]
965pub enum CopyFromSource {
966    /// Copying from a file local to the user, transmitted via pgwire.
967    Stdin,
968    /// A remote resource, e.g. HTTP file.
969    ///
970    /// The contained [`HirScalarExpr`] evaluates to the Url for the remote resource.
971    Url(HirScalarExpr),
972    /// A file in an S3 bucket.
973    AwsS3 {
974        /// Expression that evaluates to the file we want to copy.
975        uri: HirScalarExpr,
976        /// Details for how we connect to AWS S3.
977        connection: AwsConnection,
978        /// ID of the connection object.
979        connection_id: CatalogItemId,
980    },
981}
982
983#[derive(Debug)]
984pub enum CopyFromFilter {
985    Files(Vec<String>),
986    Pattern(String),
987}
988
989#[derive(Debug, Clone)]
990pub struct CopyToPlan {
991    /// The select query plan whose data will be copied to destination uri.
992    pub select_plan: SelectPlan,
993    pub desc: RelationDesc,
994    /// The scalar expression to be resolved to get the destination uri.
995    pub to: HirScalarExpr,
996    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
997    /// The ID of the connection.
998    pub connection_id: CatalogItemId,
999    pub format: S3SinkFormat,
1000    pub max_file_size: u64,
1001}
1002
1003#[derive(Clone, Debug)]
1004pub struct ExplainPlanPlan {
1005    pub stage: ExplainStage,
1006    pub format: ExplainFormat,
1007    pub config: ExplainConfig,
1008    pub explainee: Explainee,
1009}
1010
1011/// The type of object to be explained
1012#[derive(Clone, Debug)]
1013pub enum Explainee {
1014    /// Lookup and explain a plan saved for an view.
1015    View(CatalogItemId),
1016    /// Lookup and explain a plan saved for an existing materialized view.
1017    MaterializedView(CatalogItemId),
1018    /// Lookup and explain a plan saved for an existing index.
1019    Index(CatalogItemId),
1020    /// Replan an existing view.
1021    ReplanView(CatalogItemId),
1022    /// Replan an existing materialized view.
1023    ReplanMaterializedView(CatalogItemId),
1024    /// Replan an existing index.
1025    ReplanIndex(CatalogItemId),
1026    /// A SQL statement.
1027    Statement(ExplaineeStatement),
1028}
1029
1030/// Explainee types that are statements.
1031#[derive(Clone, Debug, EnumKind)]
1032#[enum_kind(ExplaineeStatementKind)]
1033pub enum ExplaineeStatement {
1034    /// The object to be explained is a SELECT statement.
1035    Select {
1036        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1037        broken: bool,
1038        plan: plan::SelectPlan,
1039        desc: RelationDesc,
1040    },
1041    /// The object to be explained is a CREATE VIEW.
1042    CreateView {
1043        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1044        broken: bool,
1045        plan: plan::CreateViewPlan,
1046    },
1047    /// The object to be explained is a CREATE MATERIALIZED VIEW.
1048    CreateMaterializedView {
1049        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1050        broken: bool,
1051        plan: plan::CreateMaterializedViewPlan,
1052    },
1053    /// The object to be explained is a CREATE INDEX.
1054    CreateIndex {
1055        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1056        broken: bool,
1057        plan: plan::CreateIndexPlan,
1058    },
1059}
1060
1061impl ExplaineeStatement {
1062    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1063        match self {
1064            Self::Select { plan, .. } => plan.source.depends_on(),
1065            Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1066            Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1067            Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1068        }
1069    }
1070
1071    /// Statements that have their `broken` flag set are expected to cause a
1072    /// panic in the optimizer code. In this case:
1073    ///
1074    /// 1. The optimizer pipeline execution will stop, but the panic will be
1075    ///    intercepted and will not propagate to the caller. The partial
1076    ///    optimizer trace collected until this point will be available.
1077    /// 2. The optimizer trace tracing subscriber will delegate regular tracing
1078    ///    spans and events to the default subscriber.
1079    ///
1080    /// This is useful when debugging queries that cause panics.
1081    pub fn broken(&self) -> bool {
1082        match self {
1083            Self::Select { broken, .. } => *broken,
1084            Self::CreateView { broken, .. } => *broken,
1085            Self::CreateMaterializedView { broken, .. } => *broken,
1086            Self::CreateIndex { broken, .. } => *broken,
1087        }
1088    }
1089}
1090
1091impl ExplaineeStatementKind {
1092    pub fn supports(&self, stage: &ExplainStage) -> bool {
1093        use ExplainStage::*;
1094        match self {
1095            Self::Select => true,
1096            Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1097            Self::CreateMaterializedView => true,
1098            Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1099        }
1100    }
1101}
1102
1103impl std::fmt::Display for ExplaineeStatementKind {
1104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1105        match self {
1106            Self::Select => write!(f, "SELECT"),
1107            Self::CreateView => write!(f, "CREATE VIEW"),
1108            Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1109            Self::CreateIndex => write!(f, "CREATE INDEX"),
1110        }
1111    }
1112}
1113
1114#[derive(Clone, Debug)]
1115pub struct ExplainPushdownPlan {
1116    pub explainee: Explainee,
1117}
1118
1119#[derive(Clone, Debug)]
1120pub struct ExplainTimestampPlan {
1121    pub format: ExplainFormat,
1122    pub raw_plan: HirRelationExpr,
1123    pub when: QueryWhen,
1124}
1125
1126#[derive(Debug)]
1127pub struct ExplainSinkSchemaPlan {
1128    pub sink_from: GlobalId,
1129    pub json_schema: String,
1130}
1131
1132#[derive(Debug)]
1133pub struct SendDiffsPlan {
1134    pub id: CatalogItemId,
1135    pub updates: Vec<(Row, Diff)>,
1136    pub kind: MutationKind,
1137    pub returning: Vec<(Row, NonZeroUsize)>,
1138    pub max_result_size: u64,
1139}
1140
1141#[derive(Debug)]
1142pub struct InsertPlan {
1143    pub id: CatalogItemId,
1144    pub values: HirRelationExpr,
1145    pub returning: Vec<mz_expr::MirScalarExpr>,
1146}
1147
1148#[derive(Debug)]
1149pub struct ReadThenWritePlan {
1150    pub id: CatalogItemId,
1151    pub selection: HirRelationExpr,
1152    pub finishing: RowSetFinishing,
1153    pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1154    pub kind: MutationKind,
1155    pub returning: Vec<mz_expr::MirScalarExpr>,
1156}
1157
1158/// Generated by `ALTER ... IF EXISTS` if the named object did not exist.
1159#[derive(Debug)]
1160pub struct AlterNoopPlan {
1161    pub object_type: ObjectType,
1162}
1163
1164#[derive(Debug)]
1165pub struct AlterSetClusterPlan {
1166    pub id: CatalogItemId,
1167    pub set_cluster: ClusterId,
1168}
1169
1170#[derive(Debug)]
1171pub struct AlterRetainHistoryPlan {
1172    pub id: CatalogItemId,
1173    pub value: Option<Value>,
1174    pub window: CompactionWindow,
1175    pub object_type: ObjectType,
1176}
1177
1178#[derive(Debug, Clone)]
1179
1180pub enum AlterOptionParameter<T = String> {
1181    Set(T),
1182    Reset,
1183    Unchanged,
1184}
1185
1186#[derive(Debug)]
1187pub enum AlterConnectionAction {
1188    RotateKeys,
1189    AlterOptions {
1190        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1191        drop_options: BTreeSet<ConnectionOptionName>,
1192        validate: bool,
1193    },
1194}
1195
1196#[derive(Debug)]
1197pub struct AlterConnectionPlan {
1198    pub id: CatalogItemId,
1199    pub action: AlterConnectionAction,
1200}
1201
1202#[derive(Debug)]
1203pub enum AlterSourceAction {
1204    AddSubsourceExports {
1205        subsources: Vec<CreateSourcePlanBundle>,
1206        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1207    },
1208    RefreshReferences {
1209        references: SourceReferences,
1210    },
1211}
1212
1213#[derive(Debug)]
1214pub struct AlterSourcePlan {
1215    pub item_id: CatalogItemId,
1216    pub ingestion_id: GlobalId,
1217    pub action: AlterSourceAction,
1218}
1219
1220#[derive(Debug, Clone)]
1221pub struct AlterSinkPlan {
1222    pub item_id: CatalogItemId,
1223    pub global_id: GlobalId,
1224    pub sink: Sink,
1225    pub with_snapshot: bool,
1226    pub in_cluster: ClusterId,
1227}
1228
1229#[derive(Debug, Clone)]
1230pub struct AlterClusterPlan {
1231    pub id: ClusterId,
1232    pub name: String,
1233    pub options: PlanClusterOption,
1234    pub strategy: AlterClusterPlanStrategy,
1235}
1236
1237#[derive(Debug)]
1238pub struct AlterClusterRenamePlan {
1239    pub id: ClusterId,
1240    pub name: String,
1241    pub to_name: String,
1242}
1243
1244#[derive(Debug)]
1245pub struct AlterClusterReplicaRenamePlan {
1246    pub cluster_id: ClusterId,
1247    pub replica_id: ReplicaId,
1248    pub name: QualifiedReplica,
1249    pub to_name: String,
1250}
1251
1252#[derive(Debug)]
1253pub struct AlterItemRenamePlan {
1254    pub id: CatalogItemId,
1255    pub current_full_name: FullItemName,
1256    pub to_name: String,
1257    pub object_type: ObjectType,
1258}
1259
1260#[derive(Debug)]
1261pub struct AlterSchemaRenamePlan {
1262    pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1263    pub new_schema_name: String,
1264}
1265
1266#[derive(Debug)]
1267pub struct AlterSchemaSwapPlan {
1268    pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1269    pub schema_a_name: String,
1270    pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1271    pub schema_b_name: String,
1272    pub name_temp: String,
1273}
1274
1275#[derive(Debug)]
1276pub struct AlterClusterSwapPlan {
1277    pub id_a: ClusterId,
1278    pub id_b: ClusterId,
1279    pub name_a: String,
1280    pub name_b: String,
1281    pub name_temp: String,
1282}
1283
1284#[derive(Debug)]
1285pub struct AlterSecretPlan {
1286    pub id: CatalogItemId,
1287    pub secret_as: MirScalarExpr,
1288}
1289
1290#[derive(Debug)]
1291pub struct AlterSystemSetPlan {
1292    pub name: String,
1293    pub value: VariableValue,
1294}
1295
1296#[derive(Debug)]
1297pub struct AlterSystemResetPlan {
1298    pub name: String,
1299}
1300
1301#[derive(Debug)]
1302pub struct AlterSystemResetAllPlan {}
1303
1304#[derive(Debug)]
1305pub struct AlterRolePlan {
1306    pub id: RoleId,
1307    pub name: String,
1308    pub option: PlannedAlterRoleOption,
1309}
1310
1311#[derive(Debug)]
1312pub struct AlterOwnerPlan {
1313    pub id: ObjectId,
1314    pub object_type: ObjectType,
1315    pub new_owner: RoleId,
1316}
1317
1318#[derive(Debug)]
1319pub struct AlterTablePlan {
1320    pub relation_id: CatalogItemId,
1321    pub column_name: ColumnName,
1322    pub column_type: SqlColumnType,
1323    pub raw_sql_type: RawDataType,
1324}
1325
1326#[derive(Debug)]
1327pub struct DeclarePlan {
1328    pub name: String,
1329    pub stmt: Statement<Raw>,
1330    pub sql: String,
1331    pub params: Params,
1332}
1333
1334#[derive(Debug)]
1335pub struct FetchPlan {
1336    pub name: String,
1337    pub count: Option<FetchDirection>,
1338    pub timeout: ExecuteTimeout,
1339}
1340
1341#[derive(Debug)]
1342pub struct ClosePlan {
1343    pub name: String,
1344}
1345
1346#[derive(Debug)]
1347pub struct PreparePlan {
1348    pub name: String,
1349    pub stmt: Statement<Raw>,
1350    pub sql: String,
1351    pub desc: StatementDesc,
1352}
1353
1354#[derive(Debug)]
1355pub struct ExecutePlan {
1356    pub name: String,
1357    pub params: Params,
1358}
1359
1360#[derive(Debug)]
1361pub struct DeallocatePlan {
1362    pub name: Option<String>,
1363}
1364
1365#[derive(Debug)]
1366pub struct RaisePlan {
1367    pub severity: NoticeSeverity,
1368}
1369
1370#[derive(Debug)]
1371pub struct GrantRolePlan {
1372    /// The roles that are gaining members.
1373    pub role_ids: Vec<RoleId>,
1374    /// The roles that will be added to `role_id`.
1375    pub member_ids: Vec<RoleId>,
1376    /// The role that granted the membership.
1377    pub grantor_id: RoleId,
1378}
1379
1380#[derive(Debug)]
1381pub struct RevokeRolePlan {
1382    /// The roles that are losing members.
1383    pub role_ids: Vec<RoleId>,
1384    /// The roles that will be removed from `role_id`.
1385    pub member_ids: Vec<RoleId>,
1386    /// The role that revoked the membership.
1387    pub grantor_id: RoleId,
1388}
1389
1390#[derive(Debug)]
1391pub struct UpdatePrivilege {
1392    /// The privileges being granted/revoked on an object.
1393    pub acl_mode: AclMode,
1394    /// The ID of the object receiving privileges.
1395    pub target_id: SystemObjectId,
1396    /// The role that is granting the privileges.
1397    pub grantor: RoleId,
1398}
1399
1400#[derive(Debug)]
1401pub struct GrantPrivilegesPlan {
1402    /// Description of each privilege being granted.
1403    pub update_privileges: Vec<UpdatePrivilege>,
1404    /// The roles that will granted the privileges.
1405    pub grantees: Vec<RoleId>,
1406}
1407
1408#[derive(Debug)]
1409pub struct RevokePrivilegesPlan {
1410    /// Description of each privilege being revoked.
1411    pub update_privileges: Vec<UpdatePrivilege>,
1412    /// The roles that will have privileges revoked.
1413    pub revokees: Vec<RoleId>,
1414}
1415#[derive(Debug)]
1416pub struct AlterDefaultPrivilegesPlan {
1417    /// Description of objects that match this default privilege.
1418    pub privilege_objects: Vec<DefaultPrivilegeObject>,
1419    /// The privilege to be granted/revoked from the matching objects.
1420    pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1421    /// Whether this is a grant or revoke.
1422    pub is_grant: bool,
1423}
1424
1425#[derive(Debug)]
1426pub struct ReassignOwnedPlan {
1427    /// The roles whose owned objects are being reassigned.
1428    pub old_roles: Vec<RoleId>,
1429    /// The new owner of the objects.
1430    pub new_role: RoleId,
1431    /// All object IDs to reassign.
1432    pub reassign_ids: Vec<ObjectId>,
1433}
1434
1435#[derive(Debug)]
1436pub struct CommentPlan {
1437    /// The object that this comment is associated with.
1438    pub object_id: CommentObjectId,
1439    /// A sub-component of the object that this comment is associated with, e.g. a column.
1440    ///
1441    /// TODO(parkmycar): <https://github.com/MaterializeInc/database-issues/issues/6711>.
1442    pub sub_component: Option<usize>,
1443    /// The comment itself. If `None` that indicates we should clear the existing comment.
1444    pub comment: Option<String>,
1445}
1446
1447#[derive(Clone, Debug)]
1448pub enum TableDataSource {
1449    /// The table owns data created via INSERT/UPDATE/DELETE statements.
1450    TableWrites { defaults: Vec<Expr<Aug>> },
1451
1452    /// The table receives its data from the identified `DataSourceDesc`.
1453    /// This table type does not support INSERT/UPDATE/DELETE statements.
1454    DataSource {
1455        desc: DataSourceDesc,
1456        timeline: Timeline,
1457    },
1458}
1459
1460#[derive(Clone, Debug)]
1461pub struct Table {
1462    pub create_sql: String,
1463    pub desc: VersionedRelationDesc,
1464    pub temporary: bool,
1465    pub compaction_window: Option<CompactionWindow>,
1466    pub data_source: TableDataSource,
1467}
1468
1469#[derive(Clone, Debug)]
1470pub struct Source {
1471    pub create_sql: String,
1472    pub data_source: DataSourceDesc,
1473    pub desc: RelationDesc,
1474    pub compaction_window: Option<CompactionWindow>,
1475}
1476
1477#[derive(Debug, Clone)]
1478pub enum DataSourceDesc {
1479    /// Receives data from an external system.
1480    Ingestion(SourceDesc<ReferencedConnection>),
1481    /// Receives data from an external system.
1482    OldSyntaxIngestion {
1483        desc: SourceDesc<ReferencedConnection>,
1484        // If we're dealing with an old syntax ingestion the progress id will be some other collection
1485        // and the ingestion itself will have the data from a default external reference
1486        progress_subsource: CatalogItemId,
1487        data_config: SourceExportDataConfig<ReferencedConnection>,
1488        details: SourceExportDetails,
1489    },
1490    /// This source receives its data from the identified ingestion,
1491    /// specifically the output identified by `external_reference`.
1492    IngestionExport {
1493        ingestion_id: CatalogItemId,
1494        external_reference: UnresolvedItemName,
1495        details: SourceExportDetails,
1496        data_config: SourceExportDataConfig<ReferencedConnection>,
1497    },
1498    /// Receives data from the source's reclocking/remapping operations.
1499    Progress,
1500    /// Receives data from HTTP post requests.
1501    Webhook {
1502        validate_using: Option<WebhookValidation>,
1503        body_format: WebhookBodyFormat,
1504        headers: WebhookHeaders,
1505        /// Only `Some` when created via `CREATE TABLE ... FROM WEBHOOK`.
1506        cluster_id: Option<StorageInstanceId>,
1507    },
1508}
1509
1510#[derive(Clone, Debug, Serialize)]
1511pub struct WebhookValidation {
1512    /// The expression used to validate a request.
1513    pub expression: MirScalarExpr,
1514    /// Description of the source that will be created.
1515    pub relation_desc: RelationDesc,
1516    /// The column index to provide the request body and whether to provide it as bytes.
1517    pub bodies: Vec<(usize, bool)>,
1518    /// The column index to provide the request headers and whether to provide the values as bytes.
1519    pub headers: Vec<(usize, bool)>,
1520    /// Any secrets that are used in that validation.
1521    pub secrets: Vec<WebhookValidationSecret>,
1522}
1523
1524impl WebhookValidation {
1525    const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1526
1527    /// Attempt to reduce the internal [`MirScalarExpr`] into a simpler expression.
1528    ///
1529    /// The reduction happens on a separate thread, we also only wait for
1530    /// `WebhookValidation::MAX_REDUCE_TIME` before timing out and returning an error.
1531    pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1532        let WebhookValidation {
1533            expression,
1534            relation_desc,
1535            ..
1536        } = self;
1537
1538        // On a different thread, attempt to reduce the expression.
1539        let mut expression_ = expression.clone();
1540        let desc_ = relation_desc.clone();
1541        let reduce_task = mz_ore::task::spawn_blocking(
1542            || "webhook-validation-reduce",
1543            move || {
1544                expression_.reduce(&desc_.typ().column_types);
1545                expression_
1546            },
1547        );
1548
1549        match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1550            Ok(Ok(reduced_expr)) => {
1551                *expression = reduced_expr;
1552                Ok(())
1553            }
1554            Ok(Err(_)) => Err("joining task"),
1555            Err(_) => Err("timeout"),
1556        }
1557    }
1558}
1559
1560#[derive(Clone, Debug, Default, Serialize)]
1561pub struct WebhookHeaders {
1562    /// Optionally include a column named `headers` whose content is possibly filtered.
1563    pub header_column: Option<WebhookHeaderFilters>,
1564    /// The column index to provide the specific request header, and whether to provide it as bytes.
1565    pub mapped_headers: BTreeMap<usize, (String, bool)>,
1566}
1567
1568impl WebhookHeaders {
1569    /// Returns the number of columns needed to represent our headers.
1570    pub fn num_columns(&self) -> usize {
1571        let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1572        let mapped_headers = self.mapped_headers.len();
1573
1574        header_column + mapped_headers
1575    }
1576}
1577
1578#[derive(Clone, Debug, Default, Serialize)]
1579pub struct WebhookHeaderFilters {
1580    pub block: BTreeSet<String>,
1581    pub allow: BTreeSet<String>,
1582}
1583
1584#[derive(Copy, Clone, Debug, Serialize, Arbitrary)]
1585pub enum WebhookBodyFormat {
1586    Json { array: bool },
1587    Bytes,
1588    Text,
1589}
1590
1591impl From<WebhookBodyFormat> for SqlScalarType {
1592    fn from(value: WebhookBodyFormat) -> Self {
1593        match value {
1594            WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1595            WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1596            WebhookBodyFormat::Text => SqlScalarType::String,
1597        }
1598    }
1599}
1600
1601#[derive(Clone, Debug, Serialize)]
1602pub struct WebhookValidationSecret {
1603    /// Identifies the secret by [`CatalogItemId`].
1604    pub id: CatalogItemId,
1605    /// Column index for the expression context that this secret was originally evaluated in.
1606    pub column_idx: usize,
1607    /// Whether or not this secret should be provided to the expression as Bytes or a String.
1608    pub use_bytes: bool,
1609}
1610
1611#[derive(Clone, Debug)]
1612pub struct Connection {
1613    pub create_sql: String,
1614    pub details: ConnectionDetails,
1615}
1616
1617#[derive(Clone, Debug, Serialize)]
1618pub enum ConnectionDetails {
1619    Kafka(KafkaConnection<ReferencedConnection>),
1620    Csr(CsrConnection<ReferencedConnection>),
1621    Postgres(PostgresConnection<ReferencedConnection>),
1622    Ssh {
1623        connection: SshConnection,
1624        key_1: SshKey,
1625        key_2: SshKey,
1626    },
1627    Aws(AwsConnection),
1628    AwsPrivatelink(AwsPrivatelinkConnection),
1629    MySql(MySqlConnection<ReferencedConnection>),
1630    SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1631    IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1632}
1633
1634impl ConnectionDetails {
1635    pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1636        match self {
1637            ConnectionDetails::Kafka(c) => {
1638                mz_storage_types::connections::Connection::Kafka(c.clone())
1639            }
1640            ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1641            ConnectionDetails::Postgres(c) => {
1642                mz_storage_types::connections::Connection::Postgres(c.clone())
1643            }
1644            ConnectionDetails::Ssh { connection, .. } => {
1645                mz_storage_types::connections::Connection::Ssh(connection.clone())
1646            }
1647            ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1648            ConnectionDetails::AwsPrivatelink(c) => {
1649                mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1650            }
1651            ConnectionDetails::MySql(c) => {
1652                mz_storage_types::connections::Connection::MySql(c.clone())
1653            }
1654            ConnectionDetails::SqlServer(c) => {
1655                mz_storage_types::connections::Connection::SqlServer(c.clone())
1656            }
1657            ConnectionDetails::IcebergCatalog(c) => {
1658                mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1659            }
1660        }
1661    }
1662}
1663
1664#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1665pub struct NetworkPolicyRule {
1666    pub name: String,
1667    pub action: NetworkPolicyRuleAction,
1668    pub address: PolicyAddress,
1669    pub direction: NetworkPolicyRuleDirection,
1670}
1671
1672#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1673pub enum NetworkPolicyRuleAction {
1674    Allow,
1675}
1676
1677impl std::fmt::Display for NetworkPolicyRuleAction {
1678    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1679        match self {
1680            Self::Allow => write!(f, "allow"),
1681        }
1682    }
1683}
1684impl TryFrom<&str> for NetworkPolicyRuleAction {
1685    type Error = PlanError;
1686    fn try_from(value: &str) -> Result<Self, Self::Error> {
1687        match value.to_uppercase().as_str() {
1688            "ALLOW" => Ok(Self::Allow),
1689            _ => Err(PlanError::Unstructured(
1690                "Allow is the only valid option".into(),
1691            )),
1692        }
1693    }
1694}
1695
1696#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1697pub enum NetworkPolicyRuleDirection {
1698    Ingress,
1699}
1700impl std::fmt::Display for NetworkPolicyRuleDirection {
1701    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1702        match self {
1703            Self::Ingress => write!(f, "ingress"),
1704        }
1705    }
1706}
1707impl TryFrom<&str> for NetworkPolicyRuleDirection {
1708    type Error = PlanError;
1709    fn try_from(value: &str) -> Result<Self, Self::Error> {
1710        match value.to_uppercase().as_str() {
1711            "INGRESS" => Ok(Self::Ingress),
1712            _ => Err(PlanError::Unstructured(
1713                "Ingress is the only valid option".into(),
1714            )),
1715        }
1716    }
1717}
1718
1719#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1720pub struct PolicyAddress(pub IpNet);
1721impl std::fmt::Display for PolicyAddress {
1722    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1723        write!(f, "{}", &self.0.to_string())
1724    }
1725}
1726impl From<String> for PolicyAddress {
1727    fn from(value: String) -> Self {
1728        Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1729    }
1730}
1731impl TryFrom<&str> for PolicyAddress {
1732    type Error = PlanError;
1733    fn try_from(value: &str) -> Result<Self, Self::Error> {
1734        let net = IpNet::from_str(value)
1735            .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1736        Ok(Self(net))
1737    }
1738}
1739
1740impl Serialize for PolicyAddress {
1741    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1742    where
1743        S: serde::Serializer,
1744    {
1745        serializer.serialize_str(&format!("{}", &self.0))
1746    }
1747}
1748
1749#[derive(Clone, Debug, Serialize)]
1750pub enum SshKey {
1751    PublicOnly(String),
1752    Both(SshKeyPair),
1753}
1754
1755impl SshKey {
1756    pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1757        match self {
1758            SshKey::PublicOnly(_) => None,
1759            SshKey::Both(key_pair) => Some(key_pair),
1760        }
1761    }
1762
1763    pub fn public_key(&self) -> String {
1764        match self {
1765            SshKey::PublicOnly(s) => s.into(),
1766            SshKey::Both(p) => p.ssh_public_key(),
1767        }
1768    }
1769}
1770
1771#[derive(Clone, Debug)]
1772pub struct Secret {
1773    pub create_sql: String,
1774    pub secret_as: MirScalarExpr,
1775}
1776
1777#[derive(Clone, Debug)]
1778pub struct Sink {
1779    /// Parse-able SQL that is stored durably and defines this sink.
1780    pub create_sql: String,
1781    /// Collection we read into this sink.
1782    pub from: GlobalId,
1783    /// Type of connection to the external service we sink into.
1784    pub connection: StorageSinkConnection<ReferencedConnection>,
1785    // TODO(guswynn): this probably should just be in the `connection`.
1786    pub envelope: SinkEnvelope,
1787    pub version: u64,
1788}
1789
1790#[derive(Clone, Debug)]
1791pub struct View {
1792    /// Parse-able SQL that is stored durably and defines this view.
1793    pub create_sql: String,
1794    /// Unoptimized high-level expression from parsing the `create_sql`.
1795    pub expr: HirRelationExpr,
1796    /// All of the catalog objects that are referenced by this view, according to the `expr`.
1797    pub dependencies: DependencyIds,
1798    /// Columns of this view.
1799    pub column_names: Vec<ColumnName>,
1800    /// If this view is created in the temporary schema, e.g. `CREATE TEMPORARY ...`.
1801    pub temporary: bool,
1802}
1803
1804#[derive(Clone, Debug)]
1805pub struct MaterializedView {
1806    /// Parse-able SQL that is stored durably and defines this materialized view.
1807    pub create_sql: String,
1808    /// Unoptimized high-level expression from parsing the `create_sql`.
1809    pub expr: HirRelationExpr,
1810    /// All of the catalog objects that are referenced by this materialized view, according to the `expr`.
1811    pub dependencies: DependencyIds,
1812    /// Columns of this view.
1813    pub column_names: Vec<ColumnName>,
1814    /// Cluster this materialized view will get installed on.
1815    pub cluster_id: ClusterId,
1816    pub non_null_assertions: Vec<usize>,
1817    pub compaction_window: Option<CompactionWindow>,
1818    pub refresh_schedule: Option<RefreshSchedule>,
1819    pub as_of: Option<Timestamp>,
1820}
1821
1822#[derive(Clone, Debug)]
1823pub struct Index {
1824    /// Parse-able SQL that is stored durably and defines this index.
1825    pub create_sql: String,
1826    /// Collection this index is on top of.
1827    pub on: GlobalId,
1828    pub keys: Vec<mz_expr::MirScalarExpr>,
1829    pub compaction_window: Option<CompactionWindow>,
1830    pub cluster_id: ClusterId,
1831}
1832
1833#[derive(Clone, Debug)]
1834pub struct Type {
1835    pub create_sql: String,
1836    pub inner: CatalogType<IdReference>,
1837}
1838
1839/// Specifies when a `Peek` or `Subscribe` should occur.
1840#[derive(Deserialize, Clone, Debug, PartialEq)]
1841pub enum QueryWhen {
1842    /// The peek should occur at the latest possible timestamp that allows the
1843    /// peek to complete immediately.
1844    Immediately,
1845    /// The peek should occur at a timestamp that allows the peek to see all
1846    /// data written to tables within Materialize.
1847    FreshestTableWrite,
1848    /// The peek should occur at the timestamp described by the specified
1849    /// expression.
1850    ///
1851    /// The expression may have any type.
1852    AtTimestamp(Timestamp),
1853    /// Same as Immediately, but will also advance to at least the specified
1854    /// expression.
1855    AtLeastTimestamp(Timestamp),
1856}
1857
1858impl QueryWhen {
1859    /// Returns a timestamp to which the candidate must be advanced.
1860    pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1861        match self {
1862            QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1863            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1864        }
1865    }
1866    /// Returns whether the candidate's upper bound is constrained.
1867    /// This is only true for `AtTimestamp` since it is the only variant that
1868    /// specifies a timestamp.
1869    pub fn constrains_upper(&self) -> bool {
1870        match self {
1871            QueryWhen::AtTimestamp(_) => true,
1872            QueryWhen::AtLeastTimestamp(_)
1873            | QueryWhen::Immediately
1874            | QueryWhen::FreshestTableWrite => false,
1875        }
1876    }
1877    /// Returns whether the candidate must be advanced to the since.
1878    pub fn advance_to_since(&self) -> bool {
1879        match self {
1880            QueryWhen::Immediately
1881            | QueryWhen::AtLeastTimestamp(_)
1882            | QueryWhen::FreshestTableWrite => true,
1883            QueryWhen::AtTimestamp(_) => false,
1884        }
1885    }
1886    /// Returns whether the candidate can be advanced to the upper.
1887    pub fn can_advance_to_upper(&self) -> bool {
1888        match self {
1889            QueryWhen::Immediately => true,
1890            QueryWhen::FreshestTableWrite
1891            | QueryWhen::AtTimestamp(_)
1892            | QueryWhen::AtLeastTimestamp(_) => false,
1893        }
1894    }
1895
1896    /// Returns whether the candidate can be advanced to the timeline's timestamp.
1897    pub fn can_advance_to_timeline_ts(&self) -> bool {
1898        match self {
1899            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1900            QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1901        }
1902    }
1903    /// Returns whether the candidate must be advanced to the timeline's timestamp.
1904    pub fn must_advance_to_timeline_ts(&self) -> bool {
1905        match self {
1906            QueryWhen::FreshestTableWrite => true,
1907            QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1908                false
1909            }
1910        }
1911    }
1912    /// Returns whether the selected timestamp should be tracked within the current transaction.
1913    pub fn is_transactional(&self) -> bool {
1914        match self {
1915            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1916            QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1917        }
1918    }
1919}
1920
1921#[derive(Debug, Copy, Clone)]
1922pub enum MutationKind {
1923    Insert,
1924    Update,
1925    Delete,
1926}
1927
1928#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1929pub enum CopyFormat {
1930    Text,
1931    Csv,
1932    Binary,
1933    Parquet,
1934}
1935
1936#[derive(Debug, Copy, Clone)]
1937pub enum ExecuteTimeout {
1938    None,
1939    Seconds(f64),
1940    WaitOnce,
1941}
1942
1943#[derive(Clone, Debug)]
1944pub enum IndexOption {
1945    /// Configures the logical compaction window for an index.
1946    RetainHistory(CompactionWindow),
1947}
1948
1949#[derive(Clone, Debug)]
1950pub enum TableOption {
1951    /// Configures the logical compaction window for a table.
1952    RetainHistory(CompactionWindow),
1953}
1954
1955#[derive(Clone, Debug)]
1956pub struct PlanClusterOption {
1957    pub availability_zones: AlterOptionParameter<Vec<String>>,
1958    pub introspection_debugging: AlterOptionParameter<bool>,
1959    pub introspection_interval: AlterOptionParameter<OptionalDuration>,
1960    pub managed: AlterOptionParameter<bool>,
1961    pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
1962    pub replication_factor: AlterOptionParameter<u32>,
1963    pub size: AlterOptionParameter,
1964    pub schedule: AlterOptionParameter<ClusterSchedule>,
1965    pub workload_class: AlterOptionParameter<Option<String>>,
1966}
1967
1968impl Default for PlanClusterOption {
1969    fn default() -> Self {
1970        Self {
1971            availability_zones: AlterOptionParameter::Unchanged,
1972            introspection_debugging: AlterOptionParameter::Unchanged,
1973            introspection_interval: AlterOptionParameter::Unchanged,
1974            managed: AlterOptionParameter::Unchanged,
1975            replicas: AlterOptionParameter::Unchanged,
1976            replication_factor: AlterOptionParameter::Unchanged,
1977            size: AlterOptionParameter::Unchanged,
1978            schedule: AlterOptionParameter::Unchanged,
1979            workload_class: AlterOptionParameter::Unchanged,
1980        }
1981    }
1982}
1983
1984#[derive(Clone, Debug, PartialEq, Eq)]
1985pub enum AlterClusterPlanStrategy {
1986    None,
1987    For(Duration),
1988    UntilReady {
1989        on_timeout: OnTimeoutAction,
1990        timeout: Duration,
1991    },
1992}
1993
1994#[derive(Clone, Debug, PartialEq, Eq)]
1995pub enum OnTimeoutAction {
1996    Commit,
1997    Rollback,
1998}
1999
2000impl Default for OnTimeoutAction {
2001    fn default() -> Self {
2002        Self::Commit
2003    }
2004}
2005
2006impl TryFrom<&str> for OnTimeoutAction {
2007    type Error = PlanError;
2008    fn try_from(value: &str) -> Result<Self, Self::Error> {
2009        match value.to_uppercase().as_str() {
2010            "COMMIT" => Ok(Self::Commit),
2011            "ROLLBACK" => Ok(Self::Rollback),
2012            _ => Err(PlanError::Unstructured(
2013                "Valid options are COMMIT, ROLLBACK".into(),
2014            )),
2015        }
2016    }
2017}
2018
2019impl AlterClusterPlanStrategy {
2020    pub fn is_none(&self) -> bool {
2021        matches!(self, Self::None)
2022    }
2023    pub fn is_some(&self) -> bool {
2024        !matches!(self, Self::None)
2025    }
2026}
2027
2028impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2029    type Error = PlanError;
2030
2031    fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2032        Ok(match value.wait {
2033            Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2034            Some(ClusterAlterOptionValue::UntilReady(options)) => {
2035                let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2036                Self::UntilReady {
2037                    timeout: match extracted.timeout {
2038                        Some(d) => d,
2039                        None => Err(PlanError::UntilReadyTimeoutRequired)?,
2040                    },
2041                    on_timeout: match extracted.on_timeout {
2042                        Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2043                            PlanError::InvalidOptionValue {
2044                                option_name: "ON TIMEOUT".into(),
2045                                err: Box::new(e),
2046                            }
2047                        })?,
2048                        None => OnTimeoutAction::default(),
2049                    },
2050                }
2051            }
2052            None => Self::None,
2053        })
2054    }
2055}
2056
2057/// A vector of values to which parameter references should be bound.
2058#[derive(Debug, Clone)]
2059pub struct Params {
2060    /// The datums that were provided in the EXECUTE statement.
2061    pub datums: Row,
2062    /// The types of the datums provided in the EXECUTE statement.
2063    pub execute_types: Vec<SqlScalarType>,
2064    /// The types that the prepared statement expects based on its definition.
2065    pub expected_types: Vec<SqlScalarType>,
2066}
2067
2068impl Params {
2069    /// Returns a `Params` with no parameters.
2070    pub fn empty() -> Params {
2071        Params {
2072            datums: Row::pack_slice(&[]),
2073            execute_types: vec![],
2074            expected_types: vec![],
2075        }
2076    }
2077}
2078
2079/// Controls planning of a SQL query.
2080#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, Copy)]
2081pub struct PlanContext {
2082    pub wall_time: DateTime<Utc>,
2083    pub ignore_if_exists_errors: bool,
2084}
2085
2086impl PlanContext {
2087    pub fn new(wall_time: DateTime<Utc>) -> Self {
2088        Self {
2089            wall_time,
2090            ignore_if_exists_errors: false,
2091        }
2092    }
2093
2094    /// Return a PlanContext with zero values. This should only be used when
2095    /// planning is required but unused (like in `plan_create_table()`) or in
2096    /// tests.
2097    pub fn zero() -> Self {
2098        PlanContext {
2099            wall_time: now::to_datetime(NOW_ZERO()),
2100            ignore_if_exists_errors: false,
2101        }
2102    }
2103
2104    pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2105        self.ignore_if_exists_errors = value;
2106        self
2107    }
2108}