mz_sql/
plan.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL planning.
11//!
12//! SQL planning is the process of taking the abstract syntax tree of a
13//! [`Statement`] and turning it into a [`Plan`] that the dataflow layer can
14//! execute.
15//!
16//! Statements must be purified before they can be planned. See the
17//! [`pure`](crate::pure) module for details.
18
19// Internal module layout.
20//
21// The entry point for planning is `statement::handle_statement`. That function
22// dispatches to a more specific `handle` function for the particular statement
23// type. For most statements, this `handle` function is uninteresting and short,
24// but anything involving a `SELECT` statement gets complicated. `SELECT`
25// queries wind through the functions in the `query` module, starting with
26// `plan_root_query` and fanning out based on the contents of the `SELECT`
27// statement.
28
29use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{
41    CollectionPlan, ColumnOrder, MapFilterProject, MirRelationExpr, MirScalarExpr, RowSetFinishing,
42};
43use mz_ore::now::{self, NOW_ZERO};
44use mz_pgcopy::CopyFormatParams;
45use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
46use mz_repr::explain::{ExplainConfig, ExplainFormat};
47use mz_repr::network_policy_id::NetworkPolicyId;
48use mz_repr::optimize::OptimizerFeatureOverrides;
49use mz_repr::refresh_schedule::RefreshSchedule;
50use mz_repr::role_id::RoleId;
51use mz_repr::{
52    CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, Row, SqlColumnType,
53    SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
54};
55use mz_sql_parser::ast::{
56    AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
57    RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
58    Value, WithOptionValue,
59};
60use mz_ssh_util::keys::SshKeyPair;
61use mz_storage_types::connections::aws::AwsConnection;
62use mz_storage_types::connections::inline::ReferencedConnection;
63use mz_storage_types::connections::{
64    AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
65    MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
66};
67use mz_storage_types::instances::StorageInstanceId;
68use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
69use mz_storage_types::sources::{
70    SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
71};
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74
75use crate::ast::{
76    ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
77    TransactionAccessMode,
78};
79use crate::catalog::{
80    CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
81    RoleAttributesRaw,
82};
83use crate::names::{
84    Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
85    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
86};
87
88pub(crate) mod error;
89pub(crate) mod explain;
90pub(crate) mod hir;
91pub(crate) mod literal;
92pub(crate) mod lowering;
93pub(crate) mod notice;
94pub(crate) mod plan_utils;
95pub(crate) mod query;
96pub(crate) mod scope;
97pub(crate) mod side_effecting_func;
98pub(crate) mod statement;
99pub(crate) mod transform_ast;
100pub(crate) mod transform_hir;
101pub(crate) mod typeconv;
102pub(crate) mod with_options;
103
104use crate::plan;
105use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
106use crate::plan::with_options::OptionalDuration;
107pub use error::PlanError;
108pub use explain::normalize_subqueries;
109pub use hir::{
110    AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
111    WindowExprType,
112};
113pub use lowering::Config as HirToMirConfig;
114pub use notice::PlanNotice;
115pub use query::{ExprContext, QueryContext, QueryLifetime};
116pub use scope::Scope;
117pub use side_effecting_func::SideEffectingFunc;
118pub use statement::ddl::{
119    AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
120    PlannedAlterRoleOption, PlannedRoleVariable, SqlServerConfigOptionExtracted,
121};
122pub use statement::{
123    StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
124    resolve_cluster_for_materialized_view,
125};
126
127use self::statement::ddl::ClusterAlterOptionExtracted;
128use self::with_options::TryFromValue;
129
130/// Instructions for executing a SQL query.
131#[derive(Debug, EnumKind)]
132#[enum_kind(PlanKind)]
133pub enum Plan {
134    CreateConnection(CreateConnectionPlan),
135    CreateDatabase(CreateDatabasePlan),
136    CreateSchema(CreateSchemaPlan),
137    CreateRole(CreateRolePlan),
138    CreateCluster(CreateClusterPlan),
139    CreateClusterReplica(CreateClusterReplicaPlan),
140    CreateSource(CreateSourcePlan),
141    CreateSources(Vec<CreateSourcePlanBundle>),
142    CreateSecret(CreateSecretPlan),
143    CreateSink(CreateSinkPlan),
144    CreateTable(CreateTablePlan),
145    CreateView(CreateViewPlan),
146    CreateMaterializedView(CreateMaterializedViewPlan),
147    CreateContinualTask(CreateContinualTaskPlan),
148    CreateNetworkPolicy(CreateNetworkPolicyPlan),
149    CreateIndex(CreateIndexPlan),
150    CreateType(CreateTypePlan),
151    Comment(CommentPlan),
152    DiscardTemp,
153    DiscardAll,
154    DropObjects(DropObjectsPlan),
155    DropOwned(DropOwnedPlan),
156    EmptyQuery,
157    ShowAllVariables,
158    ShowCreate(ShowCreatePlan),
159    ShowColumns(ShowColumnsPlan),
160    ShowVariable(ShowVariablePlan),
161    InspectShard(InspectShardPlan),
162    SetVariable(SetVariablePlan),
163    ResetVariable(ResetVariablePlan),
164    SetTransaction(SetTransactionPlan),
165    StartTransaction(StartTransactionPlan),
166    CommitTransaction(CommitTransactionPlan),
167    AbortTransaction(AbortTransactionPlan),
168    Select(SelectPlan),
169    Subscribe(SubscribePlan),
170    CopyFrom(CopyFromPlan),
171    CopyTo(CopyToPlan),
172    ExplainPlan(ExplainPlanPlan),
173    ExplainPushdown(ExplainPushdownPlan),
174    ExplainTimestamp(ExplainTimestampPlan),
175    ExplainSinkSchema(ExplainSinkSchemaPlan),
176    Insert(InsertPlan),
177    AlterCluster(AlterClusterPlan),
178    AlterClusterSwap(AlterClusterSwapPlan),
179    AlterNoop(AlterNoopPlan),
180    AlterSetCluster(AlterSetClusterPlan),
181    AlterConnection(AlterConnectionPlan),
182    AlterSource(AlterSourcePlan),
183    AlterClusterRename(AlterClusterRenamePlan),
184    AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
185    AlterItemRename(AlterItemRenamePlan),
186    AlterSchemaRename(AlterSchemaRenamePlan),
187    AlterSchemaSwap(AlterSchemaSwapPlan),
188    AlterSecret(AlterSecretPlan),
189    AlterSink(AlterSinkPlan),
190    AlterSystemSet(AlterSystemSetPlan),
191    AlterSystemReset(AlterSystemResetPlan),
192    AlterSystemResetAll(AlterSystemResetAllPlan),
193    AlterRole(AlterRolePlan),
194    AlterOwner(AlterOwnerPlan),
195    AlterTableAddColumn(AlterTablePlan),
196    AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementPlan),
197    AlterNetworkPolicy(AlterNetworkPolicyPlan),
198    Declare(DeclarePlan),
199    Fetch(FetchPlan),
200    Close(ClosePlan),
201    ReadThenWrite(ReadThenWritePlan),
202    Prepare(PreparePlan),
203    Execute(ExecutePlan),
204    Deallocate(DeallocatePlan),
205    Raise(RaisePlan),
206    GrantRole(GrantRolePlan),
207    RevokeRole(RevokeRolePlan),
208    GrantPrivileges(GrantPrivilegesPlan),
209    RevokePrivileges(RevokePrivilegesPlan),
210    AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
211    ReassignOwned(ReassignOwnedPlan),
212    SideEffectingFunc(SideEffectingFunc),
213    ValidateConnection(ValidateConnectionPlan),
214    AlterRetainHistory(AlterRetainHistoryPlan),
215}
216
217impl Plan {
218    /// Expresses which [`StatementKind`] can generate which set of
219    /// [`PlanKind`].
220    pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
221        match stmt {
222            StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
223            StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
224            StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
225            StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
226            StatementKind::AlterObjectRename => &[
227                PlanKind::AlterClusterRename,
228                PlanKind::AlterClusterReplicaRename,
229                PlanKind::AlterItemRename,
230                PlanKind::AlterSchemaRename,
231                PlanKind::AlterNoop,
232            ],
233            StatementKind::AlterObjectSwap => &[
234                PlanKind::AlterClusterSwap,
235                PlanKind::AlterSchemaSwap,
236                PlanKind::AlterNoop,
237            ],
238            StatementKind::AlterRole => &[PlanKind::AlterRole],
239            StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
240            StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
241            StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
242            StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
243            StatementKind::AlterSource => &[
244                PlanKind::AlterNoop,
245                PlanKind::AlterSource,
246                PlanKind::AlterRetainHistory,
247            ],
248            StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
249            StatementKind::AlterSystemResetAll => {
250                &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
251            }
252            StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
253            StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
254            StatementKind::AlterTableAddColumn => {
255                &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
256            }
257            StatementKind::AlterMaterializedViewApplyReplacement => &[
258                PlanKind::AlterNoop,
259                PlanKind::AlterMaterializedViewApplyReplacement,
260            ],
261            StatementKind::Close => &[PlanKind::Close],
262            StatementKind::Comment => &[PlanKind::Comment],
263            StatementKind::Commit => &[PlanKind::CommitTransaction],
264            StatementKind::Copy => &[
265                PlanKind::CopyFrom,
266                PlanKind::Select,
267                PlanKind::Subscribe,
268                PlanKind::CopyTo,
269            ],
270            StatementKind::CreateCluster => &[PlanKind::CreateCluster],
271            StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
272            StatementKind::CreateConnection => &[PlanKind::CreateConnection],
273            StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
274            StatementKind::CreateIndex => &[PlanKind::CreateIndex],
275            StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
276            StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
277            StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
278            StatementKind::CreateRole => &[PlanKind::CreateRole],
279            StatementKind::CreateSchema => &[PlanKind::CreateSchema],
280            StatementKind::CreateSecret => &[PlanKind::CreateSecret],
281            StatementKind::CreateSink => &[PlanKind::CreateSink],
282            StatementKind::CreateSource | StatementKind::CreateSubsource => {
283                &[PlanKind::CreateSource]
284            }
285            StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
286            StatementKind::CreateTable => &[PlanKind::CreateTable],
287            StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
288            StatementKind::CreateType => &[PlanKind::CreateType],
289            StatementKind::CreateView => &[PlanKind::CreateView],
290            StatementKind::Deallocate => &[PlanKind::Deallocate],
291            StatementKind::Declare => &[PlanKind::Declare],
292            StatementKind::Delete => &[PlanKind::ReadThenWrite],
293            StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
294            StatementKind::DropObjects => &[PlanKind::DropObjects],
295            StatementKind::DropOwned => &[PlanKind::DropOwned],
296            StatementKind::Execute => &[PlanKind::Execute],
297            StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
298            StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
299            StatementKind::ExplainAnalyzeObject => &[PlanKind::Select],
300            StatementKind::ExplainAnalyzeCluster => &[PlanKind::Select],
301            StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
302            StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
303            StatementKind::Fetch => &[PlanKind::Fetch],
304            StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
305            StatementKind::GrantRole => &[PlanKind::GrantRole],
306            StatementKind::Insert => &[PlanKind::Insert],
307            StatementKind::Prepare => &[PlanKind::Prepare],
308            StatementKind::Raise => &[PlanKind::Raise],
309            StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
310            StatementKind::ResetVariable => &[PlanKind::ResetVariable],
311            StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
312            StatementKind::RevokeRole => &[PlanKind::RevokeRole],
313            StatementKind::Rollback => &[PlanKind::AbortTransaction],
314            StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
315            StatementKind::SetTransaction => &[PlanKind::SetTransaction],
316            StatementKind::SetVariable => &[PlanKind::SetVariable],
317            StatementKind::Show => &[
318                PlanKind::Select,
319                PlanKind::ShowVariable,
320                PlanKind::ShowCreate,
321                PlanKind::ShowColumns,
322                PlanKind::ShowAllVariables,
323                PlanKind::InspectShard,
324            ],
325            StatementKind::StartTransaction => &[PlanKind::StartTransaction],
326            StatementKind::Subscribe => &[PlanKind::Subscribe],
327            StatementKind::Update => &[PlanKind::ReadThenWrite],
328            StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
329            StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
330        }
331    }
332
333    /// Returns a human readable name of the plan. Meant for use in messages sent back to a user.
334    pub fn name(&self) -> &str {
335        match self {
336            Plan::CreateConnection(_) => "create connection",
337            Plan::CreateDatabase(_) => "create database",
338            Plan::CreateSchema(_) => "create schema",
339            Plan::CreateRole(_) => "create role",
340            Plan::CreateCluster(_) => "create cluster",
341            Plan::CreateClusterReplica(_) => "create cluster replica",
342            Plan::CreateSource(_) => "create source",
343            Plan::CreateSources(_) => "create source",
344            Plan::CreateSecret(_) => "create secret",
345            Plan::CreateSink(_) => "create sink",
346            Plan::CreateTable(_) => "create table",
347            Plan::CreateView(_) => "create view",
348            Plan::CreateMaterializedView(_) => "create materialized view",
349            Plan::CreateContinualTask(_) => "create continual task",
350            Plan::CreateIndex(_) => "create index",
351            Plan::CreateType(_) => "create type",
352            Plan::CreateNetworkPolicy(_) => "create network policy",
353            Plan::Comment(_) => "comment",
354            Plan::DiscardTemp => "discard temp",
355            Plan::DiscardAll => "discard all",
356            Plan::DropObjects(plan) => match plan.object_type {
357                ObjectType::Table => "drop table",
358                ObjectType::View => "drop view",
359                ObjectType::MaterializedView => "drop materialized view",
360                ObjectType::Source => "drop source",
361                ObjectType::Sink => "drop sink",
362                ObjectType::Index => "drop index",
363                ObjectType::Type => "drop type",
364                ObjectType::Role => "drop roles",
365                ObjectType::Cluster => "drop clusters",
366                ObjectType::ClusterReplica => "drop cluster replicas",
367                ObjectType::Secret => "drop secret",
368                ObjectType::Connection => "drop connection",
369                ObjectType::Database => "drop database",
370                ObjectType::Schema => "drop schema",
371                ObjectType::Func => "drop function",
372                ObjectType::ContinualTask => "drop continual task",
373                ObjectType::NetworkPolicy => "drop network policy",
374            },
375            Plan::DropOwned(_) => "drop owned",
376            Plan::EmptyQuery => "do nothing",
377            Plan::ShowAllVariables => "show all variables",
378            Plan::ShowCreate(_) => "show create",
379            Plan::ShowColumns(_) => "show columns",
380            Plan::ShowVariable(_) => "show variable",
381            Plan::InspectShard(_) => "inspect shard",
382            Plan::SetVariable(_) => "set variable",
383            Plan::ResetVariable(_) => "reset variable",
384            Plan::SetTransaction(_) => "set transaction",
385            Plan::StartTransaction(_) => "start transaction",
386            Plan::CommitTransaction(_) => "commit",
387            Plan::AbortTransaction(_) => "abort",
388            Plan::Select(_) => "select",
389            Plan::Subscribe(_) => "subscribe",
390            Plan::CopyFrom(_) => "copy from",
391            Plan::CopyTo(_) => "copy to",
392            Plan::ExplainPlan(_) => "explain plan",
393            Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
394            Plan::ExplainTimestamp(_) => "explain timestamp",
395            Plan::ExplainSinkSchema(_) => "explain schema",
396            Plan::Insert(_) => "insert",
397            Plan::AlterNoop(plan) => match plan.object_type {
398                ObjectType::Table => "alter table",
399                ObjectType::View => "alter view",
400                ObjectType::MaterializedView => "alter materialized view",
401                ObjectType::Source => "alter source",
402                ObjectType::Sink => "alter sink",
403                ObjectType::Index => "alter index",
404                ObjectType::Type => "alter type",
405                ObjectType::Role => "alter role",
406                ObjectType::Cluster => "alter cluster",
407                ObjectType::ClusterReplica => "alter cluster replica",
408                ObjectType::Secret => "alter secret",
409                ObjectType::Connection => "alter connection",
410                ObjectType::Database => "alter database",
411                ObjectType::Schema => "alter schema",
412                ObjectType::Func => "alter function",
413                ObjectType::ContinualTask => "alter continual task",
414                ObjectType::NetworkPolicy => "alter network policy",
415            },
416            Plan::AlterCluster(_) => "alter cluster",
417            Plan::AlterClusterRename(_) => "alter cluster rename",
418            Plan::AlterClusterSwap(_) => "alter cluster swap",
419            Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
420            Plan::AlterSetCluster(_) => "alter set cluster",
421            Plan::AlterConnection(_) => "alter connection",
422            Plan::AlterSource(_) => "alter source",
423            Plan::AlterItemRename(_) => "rename item",
424            Plan::AlterSchemaRename(_) => "alter rename schema",
425            Plan::AlterSchemaSwap(_) => "alter swap schema",
426            Plan::AlterSecret(_) => "alter secret",
427            Plan::AlterSink(_) => "alter sink",
428            Plan::AlterSystemSet(_) => "alter system",
429            Plan::AlterSystemReset(_) => "alter system",
430            Plan::AlterSystemResetAll(_) => "alter system",
431            Plan::AlterRole(_) => "alter role",
432            Plan::AlterNetworkPolicy(_) => "alter network policy",
433            Plan::AlterOwner(plan) => match plan.object_type {
434                ObjectType::Table => "alter table owner",
435                ObjectType::View => "alter view owner",
436                ObjectType::MaterializedView => "alter materialized view owner",
437                ObjectType::Source => "alter source owner",
438                ObjectType::Sink => "alter sink owner",
439                ObjectType::Index => "alter index owner",
440                ObjectType::Type => "alter type owner",
441                ObjectType::Role => "alter role owner",
442                ObjectType::Cluster => "alter cluster owner",
443                ObjectType::ClusterReplica => "alter cluster replica owner",
444                ObjectType::Secret => "alter secret owner",
445                ObjectType::Connection => "alter connection owner",
446                ObjectType::Database => "alter database owner",
447                ObjectType::Schema => "alter schema owner",
448                ObjectType::Func => "alter function owner",
449                ObjectType::ContinualTask => "alter continual task owner",
450                ObjectType::NetworkPolicy => "alter network policy owner",
451            },
452            Plan::AlterTableAddColumn(_) => "alter table add column",
453            Plan::AlterMaterializedViewApplyReplacement(_) => {
454                "alter materialized view apply replacement"
455            }
456            Plan::Declare(_) => "declare",
457            Plan::Fetch(_) => "fetch",
458            Plan::Close(_) => "close",
459            Plan::ReadThenWrite(plan) => match plan.kind {
460                MutationKind::Insert => "insert into select",
461                MutationKind::Update => "update",
462                MutationKind::Delete => "delete",
463            },
464            Plan::Prepare(_) => "prepare",
465            Plan::Execute(_) => "execute",
466            Plan::Deallocate(_) => "deallocate",
467            Plan::Raise(_) => "raise",
468            Plan::GrantRole(_) => "grant role",
469            Plan::RevokeRole(_) => "revoke role",
470            Plan::GrantPrivileges(_) => "grant privilege",
471            Plan::RevokePrivileges(_) => "revoke privilege",
472            Plan::AlterDefaultPrivileges(_) => "alter default privileges",
473            Plan::ReassignOwned(_) => "reassign owned",
474            Plan::SideEffectingFunc(_) => "side effecting func",
475            Plan::ValidateConnection(_) => "validate connection",
476            Plan::AlterRetainHistory(_) => "alter retain history",
477        }
478    }
479
480    /// Returns `true` iff this `Plan` is allowed to be executed in read-only
481    /// mode.
482    ///
483    /// We use an explicit allow-list, to avoid future additions automatically
484    /// falling into the `true` category.
485    pub fn allowed_in_read_only(&self) -> bool {
486        match self {
487            // These two set non-durable session variables, so are okay in
488            // read-only mode.
489            Plan::SetVariable(_) => true,
490            Plan::ResetVariable(_) => true,
491            Plan::SetTransaction(_) => true,
492            Plan::StartTransaction(_) => true,
493            Plan::CommitTransaction(_) => true,
494            Plan::AbortTransaction(_) => true,
495            Plan::Select(_) => true,
496            Plan::EmptyQuery => true,
497            Plan::ShowAllVariables => true,
498            Plan::ShowCreate(_) => true,
499            Plan::ShowColumns(_) => true,
500            Plan::ShowVariable(_) => true,
501            Plan::InspectShard(_) => true,
502            Plan::Subscribe(_) => true,
503            Plan::CopyTo(_) => true,
504            Plan::ExplainPlan(_) => true,
505            Plan::ExplainPushdown(_) => true,
506            Plan::ExplainTimestamp(_) => true,
507            Plan::ExplainSinkSchema(_) => true,
508            Plan::ValidateConnection(_) => true,
509            _ => false,
510        }
511    }
512}
513
514#[derive(Debug)]
515pub struct StartTransactionPlan {
516    pub access: Option<TransactionAccessMode>,
517    pub isolation_level: Option<TransactionIsolationLevel>,
518}
519
520#[derive(Debug)]
521pub enum TransactionType {
522    Explicit,
523    Implicit,
524}
525
526impl TransactionType {
527    pub fn is_explicit(&self) -> bool {
528        matches!(self, TransactionType::Explicit)
529    }
530
531    pub fn is_implicit(&self) -> bool {
532        matches!(self, TransactionType::Implicit)
533    }
534}
535
536#[derive(Debug)]
537pub struct CommitTransactionPlan {
538    pub transaction_type: TransactionType,
539}
540
541#[derive(Debug)]
542pub struct AbortTransactionPlan {
543    pub transaction_type: TransactionType,
544}
545
546#[derive(Debug)]
547pub struct CreateDatabasePlan {
548    pub name: String,
549    pub if_not_exists: bool,
550}
551
552#[derive(Debug)]
553pub struct CreateSchemaPlan {
554    pub database_spec: ResolvedDatabaseSpecifier,
555    pub schema_name: String,
556    pub if_not_exists: bool,
557}
558
559#[derive(Debug)]
560pub struct CreateRolePlan {
561    pub name: String,
562    pub attributes: RoleAttributesRaw,
563}
564
565#[derive(Debug, PartialEq, Eq, Clone)]
566pub struct CreateClusterPlan {
567    pub name: String,
568    pub variant: CreateClusterVariant,
569    pub workload_class: Option<String>,
570}
571
572#[derive(Debug, PartialEq, Eq, Clone)]
573pub enum CreateClusterVariant {
574    Managed(CreateClusterManagedPlan),
575    Unmanaged(CreateClusterUnmanagedPlan),
576}
577
578#[derive(Debug, PartialEq, Eq, Clone)]
579pub struct CreateClusterUnmanagedPlan {
580    pub replicas: Vec<(String, ReplicaConfig)>,
581}
582
583#[derive(Debug, PartialEq, Eq, Clone)]
584pub struct CreateClusterManagedPlan {
585    pub replication_factor: u32,
586    pub size: String,
587    pub availability_zones: Vec<String>,
588    pub compute: ComputeReplicaConfig,
589    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
590    pub schedule: ClusterSchedule,
591}
592
593#[derive(Debug)]
594pub struct CreateClusterReplicaPlan {
595    pub cluster_id: ClusterId,
596    pub name: String,
597    pub config: ReplicaConfig,
598}
599
600/// Configuration of introspection for a cluster replica.
601#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq)]
602pub struct ComputeReplicaIntrospectionConfig {
603    /// Whether to introspect the introspection.
604    pub debugging: bool,
605    /// The interval at which to introspect.
606    pub interval: Duration,
607}
608
609#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
610pub struct ComputeReplicaConfig {
611    pub introspection: Option<ComputeReplicaIntrospectionConfig>,
612}
613
614#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
615pub enum ReplicaConfig {
616    Unorchestrated {
617        storagectl_addrs: Vec<String>,
618        computectl_addrs: Vec<String>,
619        compute: ComputeReplicaConfig,
620    },
621    Orchestrated {
622        size: String,
623        availability_zone: Option<String>,
624        compute: ComputeReplicaConfig,
625        internal: bool,
626        billed_as: Option<String>,
627    },
628}
629
630#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
631pub enum ClusterSchedule {
632    /// The system won't automatically turn the cluster On or Off.
633    Manual,
634    /// The cluster will be On when a REFRESH materialized view on it needs to refresh.
635    /// `hydration_time_estimate` determines how much time before a refresh to turn the
636    /// cluster On, so that it can rehydrate already before the refresh time.
637    Refresh { hydration_time_estimate: Duration },
638}
639
640impl Default for ClusterSchedule {
641    fn default() -> Self {
642        // (Has to be consistent with `impl Default for ClusterScheduleOptionValue`.)
643        ClusterSchedule::Manual
644    }
645}
646
647#[derive(Debug)]
648pub struct CreateSourcePlan {
649    pub name: QualifiedItemName,
650    pub source: Source,
651    pub if_not_exists: bool,
652    pub timeline: Timeline,
653    // None for subsources, which run on the parent cluster.
654    pub in_cluster: Option<ClusterId>,
655}
656
657#[derive(Clone, Debug, PartialEq, Eq)]
658pub struct SourceReferences {
659    pub updated_at: u64,
660    pub references: Vec<SourceReference>,
661}
662
663/// An available external reference for a source and if possible to retrieve,
664/// any column names it contains.
665#[derive(Clone, Debug, PartialEq, Eq)]
666pub struct SourceReference {
667    pub name: String,
668    pub namespace: Option<String>,
669    pub columns: Vec<String>,
670}
671
672/// A [`CreateSourcePlan`] and the metadata necessary to sequence it.
673#[derive(Debug)]
674pub struct CreateSourcePlanBundle {
675    /// ID of this source in the Catalog.
676    pub item_id: CatalogItemId,
677    /// ID used to reference this source from outside the catalog, e.g. compute.
678    pub global_id: GlobalId,
679    /// Details of the source to create.
680    pub plan: CreateSourcePlan,
681    /// Other catalog objects that are referenced by this source, determined at name resolution.
682    pub resolved_ids: ResolvedIds,
683    /// All the available upstream references for this source.
684    /// Populated for top-level sources that can contain subsources/tables
685    /// and used during sequencing to populate the appropriate catalog fields.
686    pub available_source_references: Option<SourceReferences>,
687}
688
689#[derive(Debug)]
690pub struct CreateConnectionPlan {
691    pub name: QualifiedItemName,
692    pub if_not_exists: bool,
693    pub connection: Connection,
694    pub validate: bool,
695}
696
697#[derive(Debug)]
698pub struct ValidateConnectionPlan {
699    /// ID of the connection in the Catalog.
700    pub id: CatalogItemId,
701    /// The connection to validate.
702    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
703}
704
705#[derive(Debug)]
706pub struct CreateSecretPlan {
707    pub name: QualifiedItemName,
708    pub secret: Secret,
709    pub if_not_exists: bool,
710}
711
712#[derive(Debug)]
713pub struct CreateSinkPlan {
714    pub name: QualifiedItemName,
715    pub sink: Sink,
716    pub with_snapshot: bool,
717    pub if_not_exists: bool,
718    pub in_cluster: ClusterId,
719}
720
721#[derive(Debug)]
722pub struct CreateTablePlan {
723    pub name: QualifiedItemName,
724    pub table: Table,
725    pub if_not_exists: bool,
726}
727
728#[derive(Debug, Clone)]
729pub struct CreateViewPlan {
730    pub name: QualifiedItemName,
731    pub view: View,
732    /// The Catalog objects that this view is replacing, if any.
733    pub replace: Option<CatalogItemId>,
734    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
735    pub drop_ids: Vec<CatalogItemId>,
736    pub if_not_exists: bool,
737    /// True if the view contains an expression that can make the exact column list
738    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
739    pub ambiguous_columns: bool,
740}
741
742#[derive(Debug, Clone)]
743pub struct CreateMaterializedViewPlan {
744    pub name: QualifiedItemName,
745    pub materialized_view: MaterializedView,
746    /// The Catalog objects that this materialized view is replacing, if any.
747    pub replace: Option<CatalogItemId>,
748    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
749    pub drop_ids: Vec<CatalogItemId>,
750    pub if_not_exists: bool,
751    /// True if the materialized view contains an expression that can make the exact column list
752    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
753    pub ambiguous_columns: bool,
754}
755
756#[derive(Debug, Clone)]
757pub struct CreateContinualTaskPlan {
758    pub name: QualifiedItemName,
759    /// During initial creation, the `LocalId` placeholder for this CT in `continual_task.expr`.
760    /// None on restart.
761    pub placeholder_id: Option<mz_expr::LocalId>,
762    pub desc: RelationDesc,
763    /// ID of the collection we read into this continual task.
764    pub input_id: GlobalId,
765    pub with_snapshot: bool,
766    /// Definition for the continual task.
767    pub continual_task: MaterializedView,
768}
769
770#[derive(Debug, Clone)]
771pub struct CreateNetworkPolicyPlan {
772    pub name: String,
773    pub rules: Vec<NetworkPolicyRule>,
774}
775
776#[derive(Debug, Clone)]
777pub struct AlterNetworkPolicyPlan {
778    pub id: NetworkPolicyId,
779    pub name: String,
780    pub rules: Vec<NetworkPolicyRule>,
781}
782
783#[derive(Debug, Clone)]
784pub struct CreateIndexPlan {
785    pub name: QualifiedItemName,
786    pub index: Index,
787    pub if_not_exists: bool,
788}
789
790#[derive(Debug)]
791pub struct CreateTypePlan {
792    pub name: QualifiedItemName,
793    pub typ: Type,
794}
795
796#[derive(Debug)]
797pub struct DropObjectsPlan {
798    /// The IDs of only the objects directly referenced in the `DROP` statement.
799    pub referenced_ids: Vec<ObjectId>,
800    /// All object IDs to drop. Includes `referenced_ids` and all descendants.
801    pub drop_ids: Vec<ObjectId>,
802    /// The type of object that was dropped explicitly in the DROP statement. `ids` may contain
803    /// objects of different types due to CASCADE.
804    pub object_type: ObjectType,
805}
806
807#[derive(Debug)]
808pub struct DropOwnedPlan {
809    /// The role IDs that own the objects.
810    pub role_ids: Vec<RoleId>,
811    /// All object IDs to drop.
812    pub drop_ids: Vec<ObjectId>,
813    /// The privileges to revoke.
814    pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
815    /// The default privileges to revoke.
816    pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
817}
818
819#[derive(Debug)]
820pub struct ShowVariablePlan {
821    pub name: String,
822}
823
824#[derive(Debug)]
825pub struct InspectShardPlan {
826    /// ID of the storage collection to inspect.
827    pub id: GlobalId,
828}
829
830#[derive(Debug)]
831pub struct SetVariablePlan {
832    pub name: String,
833    pub value: VariableValue,
834    pub local: bool,
835}
836
837#[derive(Debug)]
838pub enum VariableValue {
839    Default,
840    Values(Vec<String>),
841}
842
843#[derive(Debug)]
844pub struct ResetVariablePlan {
845    pub name: String,
846}
847
848#[derive(Debug)]
849pub struct SetTransactionPlan {
850    pub local: bool,
851    pub modes: Vec<TransactionMode>,
852}
853
854/// A plan for select statements.
855#[derive(Clone, Debug)]
856pub struct SelectPlan {
857    /// The `SELECT` statement itself. Used for explain/notices, but not otherwise
858    /// load-bearing. Boxed to save stack space.
859    pub select: Option<Box<SelectStatement<Aug>>>,
860    /// The plan as a HIR.
861    pub source: HirRelationExpr,
862    /// At what time should this select happen?
863    pub when: QueryWhen,
864    /// Instructions how to form the result set.
865    pub finishing: RowSetFinishing,
866    /// For `COPY TO STDOUT`, the format to use.
867    pub copy_to: Option<CopyFormat>,
868}
869
870impl SelectPlan {
871    pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
872        let arity = typ.arity();
873        SelectPlan {
874            select: None,
875            source: HirRelationExpr::Constant { rows, typ },
876            when: QueryWhen::Immediately,
877            finishing: RowSetFinishing::trivial(arity),
878            copy_to: None,
879        }
880    }
881}
882
883#[derive(Debug)]
884pub enum SubscribeOutput {
885    Diffs,
886    WithinTimestampOrderBy {
887        /// We pretend that mz_diff is prepended to the normal columns, making it index 0
888        order_by: Vec<ColumnOrder>,
889    },
890    EnvelopeUpsert {
891        /// Order by with just keys
892        order_by_keys: Vec<ColumnOrder>,
893    },
894    EnvelopeDebezium {
895        /// Order by with just keys
896        order_by_keys: Vec<ColumnOrder>,
897    },
898}
899
900#[derive(Debug)]
901pub struct SubscribePlan {
902    pub from: SubscribeFrom,
903    pub with_snapshot: bool,
904    pub when: QueryWhen,
905    pub up_to: Option<Timestamp>,
906    pub copy_to: Option<CopyFormat>,
907    pub emit_progress: bool,
908    pub output: SubscribeOutput,
909}
910
911#[derive(Debug, Clone)]
912pub enum SubscribeFrom {
913    /// ID of the collection to subscribe to.
914    Id(GlobalId),
915    /// Query to subscribe to.
916    Query {
917        expr: MirRelationExpr,
918        desc: RelationDesc,
919    },
920}
921
922impl SubscribeFrom {
923    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
924        match self {
925            SubscribeFrom::Id(id) => BTreeSet::from([*id]),
926            SubscribeFrom::Query { expr, .. } => expr.depends_on(),
927        }
928    }
929
930    pub fn contains_temporal(&self) -> bool {
931        match self {
932            SubscribeFrom::Id(_) => false,
933            SubscribeFrom::Query { expr, .. } => expr.contains_temporal(),
934        }
935    }
936}
937
938#[derive(Debug)]
939pub struct ShowCreatePlan {
940    pub id: ObjectId,
941    pub row: Row,
942}
943
944#[derive(Debug)]
945pub struct ShowColumnsPlan {
946    pub id: CatalogItemId,
947    pub select_plan: SelectPlan,
948    pub new_resolved_ids: ResolvedIds,
949}
950
951#[derive(Debug)]
952pub struct CopyFromPlan {
953    /// Table we're copying into.
954    pub target_id: CatalogItemId,
955    /// Human-readable full name of the target table.
956    pub target_name: String,
957    /// Source we're copying data from.
958    pub source: CopyFromSource,
959    /// How input columns map to those on the destination table.
960    ///
961    /// TODO(cf2): Remove this field in favor of the mfp.
962    pub columns: Vec<ColumnIndex>,
963    /// [`RelationDesc`] describing the input data.
964    pub source_desc: RelationDesc,
965    /// Changes the shape of the input data to match the destination table.
966    pub mfp: MapFilterProject,
967    /// Format specific params for copying the input data.
968    pub params: CopyFormatParams<'static>,
969    /// Filter for the source files we're copying from, e.g. an S3 prefix.
970    pub filter: Option<CopyFromFilter>,
971}
972
973#[derive(Debug)]
974pub enum CopyFromSource {
975    /// Copying from a file local to the user, transmitted via pgwire.
976    Stdin,
977    /// A remote resource, e.g. HTTP file.
978    ///
979    /// The contained [`HirScalarExpr`] evaluates to the Url for the remote resource.
980    Url(HirScalarExpr),
981    /// A file in an S3 bucket.
982    AwsS3 {
983        /// Expression that evaluates to the file we want to copy.
984        uri: HirScalarExpr,
985        /// Details for how we connect to AWS S3.
986        connection: AwsConnection,
987        /// ID of the connection object.
988        connection_id: CatalogItemId,
989    },
990}
991
992#[derive(Debug)]
993pub enum CopyFromFilter {
994    Files(Vec<String>),
995    Pattern(String),
996}
997
998/// `COPY TO S3`
999///
1000/// (This is a completely different thing from `COPY TO STDOUT`. That is a `Plan::Select` with
1001/// `copy_to` set.)
1002#[derive(Debug, Clone)]
1003pub struct CopyToPlan {
1004    /// The select query plan whose data will be copied to destination uri.
1005    pub select_plan: SelectPlan,
1006    pub desc: RelationDesc,
1007    /// The scalar expression to be resolved to get the destination uri.
1008    pub to: HirScalarExpr,
1009    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
1010    /// The ID of the connection.
1011    pub connection_id: CatalogItemId,
1012    pub format: S3SinkFormat,
1013    pub max_file_size: u64,
1014}
1015
1016#[derive(Clone, Debug)]
1017pub struct ExplainPlanPlan {
1018    pub stage: ExplainStage,
1019    pub format: ExplainFormat,
1020    pub config: ExplainConfig,
1021    pub explainee: Explainee,
1022}
1023
1024/// The type of object to be explained
1025#[derive(Clone, Debug)]
1026pub enum Explainee {
1027    /// Lookup and explain a plan saved for an view.
1028    View(CatalogItemId),
1029    /// Lookup and explain a plan saved for an existing materialized view.
1030    MaterializedView(CatalogItemId),
1031    /// Lookup and explain a plan saved for an existing index.
1032    Index(CatalogItemId),
1033    /// Replan an existing view.
1034    ReplanView(CatalogItemId),
1035    /// Replan an existing materialized view.
1036    ReplanMaterializedView(CatalogItemId),
1037    /// Replan an existing index.
1038    ReplanIndex(CatalogItemId),
1039    /// A SQL statement.
1040    Statement(ExplaineeStatement),
1041}
1042
1043/// Explainee types that are statements.
1044#[derive(Clone, Debug, EnumKind)]
1045#[enum_kind(ExplaineeStatementKind)]
1046pub enum ExplaineeStatement {
1047    /// The object to be explained is a SELECT statement.
1048    Select {
1049        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1050        broken: bool,
1051        plan: plan::SelectPlan,
1052        desc: RelationDesc,
1053    },
1054    /// The object to be explained is a CREATE VIEW.
1055    CreateView {
1056        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1057        broken: bool,
1058        plan: plan::CreateViewPlan,
1059    },
1060    /// The object to be explained is a CREATE MATERIALIZED VIEW.
1061    CreateMaterializedView {
1062        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1063        broken: bool,
1064        plan: plan::CreateMaterializedViewPlan,
1065    },
1066    /// The object to be explained is a CREATE INDEX.
1067    CreateIndex {
1068        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1069        broken: bool,
1070        plan: plan::CreateIndexPlan,
1071    },
1072}
1073
1074impl ExplaineeStatement {
1075    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1076        match self {
1077            Self::Select { plan, .. } => plan.source.depends_on(),
1078            Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1079            Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1080            Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1081        }
1082    }
1083
1084    /// Statements that have their `broken` flag set are expected to cause a
1085    /// panic in the optimizer code. In this case:
1086    ///
1087    /// 1. The optimizer pipeline execution will stop, but the panic will be
1088    ///    intercepted and will not propagate to the caller. The partial
1089    ///    optimizer trace collected until this point will be available.
1090    /// 2. The optimizer trace tracing subscriber will delegate regular tracing
1091    ///    spans and events to the default subscriber.
1092    ///
1093    /// This is useful when debugging queries that cause panics.
1094    pub fn broken(&self) -> bool {
1095        match self {
1096            Self::Select { broken, .. } => *broken,
1097            Self::CreateView { broken, .. } => *broken,
1098            Self::CreateMaterializedView { broken, .. } => *broken,
1099            Self::CreateIndex { broken, .. } => *broken,
1100        }
1101    }
1102}
1103
1104impl ExplaineeStatementKind {
1105    pub fn supports(&self, stage: &ExplainStage) -> bool {
1106        use ExplainStage::*;
1107        match self {
1108            Self::Select => true,
1109            Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1110            Self::CreateMaterializedView => true,
1111            Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1112        }
1113    }
1114}
1115
1116impl std::fmt::Display for ExplaineeStatementKind {
1117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1118        match self {
1119            Self::Select => write!(f, "SELECT"),
1120            Self::CreateView => write!(f, "CREATE VIEW"),
1121            Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1122            Self::CreateIndex => write!(f, "CREATE INDEX"),
1123        }
1124    }
1125}
1126
1127#[derive(Clone, Debug)]
1128pub struct ExplainPushdownPlan {
1129    pub explainee: Explainee,
1130}
1131
1132#[derive(Clone, Debug)]
1133pub struct ExplainTimestampPlan {
1134    pub format: ExplainFormat,
1135    pub raw_plan: HirRelationExpr,
1136    pub when: QueryWhen,
1137}
1138
1139#[derive(Debug)]
1140pub struct ExplainSinkSchemaPlan {
1141    pub sink_from: GlobalId,
1142    pub json_schema: String,
1143}
1144
1145#[derive(Debug)]
1146pub struct SendDiffsPlan {
1147    pub id: CatalogItemId,
1148    pub updates: Vec<(Row, Diff)>,
1149    pub kind: MutationKind,
1150    pub returning: Vec<(Row, NonZeroUsize)>,
1151    pub max_result_size: u64,
1152}
1153
1154#[derive(Debug)]
1155pub struct InsertPlan {
1156    pub id: CatalogItemId,
1157    pub values: HirRelationExpr,
1158    pub returning: Vec<mz_expr::MirScalarExpr>,
1159}
1160
1161#[derive(Debug)]
1162pub struct ReadThenWritePlan {
1163    pub id: CatalogItemId,
1164    pub selection: HirRelationExpr,
1165    pub finishing: RowSetFinishing,
1166    pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1167    pub kind: MutationKind,
1168    pub returning: Vec<mz_expr::MirScalarExpr>,
1169}
1170
1171/// Generated by `ALTER ... IF EXISTS` if the named object did not exist.
1172#[derive(Debug)]
1173pub struct AlterNoopPlan {
1174    pub object_type: ObjectType,
1175}
1176
1177#[derive(Debug)]
1178pub struct AlterSetClusterPlan {
1179    pub id: CatalogItemId,
1180    pub set_cluster: ClusterId,
1181}
1182
1183#[derive(Debug)]
1184pub struct AlterRetainHistoryPlan {
1185    pub id: CatalogItemId,
1186    pub value: Option<Value>,
1187    pub window: CompactionWindow,
1188    pub object_type: ObjectType,
1189}
1190
1191#[derive(Debug, Clone)]
1192
1193pub enum AlterOptionParameter<T = String> {
1194    Set(T),
1195    Reset,
1196    Unchanged,
1197}
1198
1199#[derive(Debug)]
1200pub enum AlterConnectionAction {
1201    RotateKeys,
1202    AlterOptions {
1203        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1204        drop_options: BTreeSet<ConnectionOptionName>,
1205        validate: bool,
1206    },
1207}
1208
1209#[derive(Debug)]
1210pub struct AlterConnectionPlan {
1211    pub id: CatalogItemId,
1212    pub action: AlterConnectionAction,
1213}
1214
1215#[derive(Debug)]
1216pub enum AlterSourceAction {
1217    AddSubsourceExports {
1218        subsources: Vec<CreateSourcePlanBundle>,
1219        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1220    },
1221    RefreshReferences {
1222        references: SourceReferences,
1223    },
1224}
1225
1226#[derive(Debug)]
1227pub struct AlterSourcePlan {
1228    pub item_id: CatalogItemId,
1229    pub ingestion_id: GlobalId,
1230    pub action: AlterSourceAction,
1231}
1232
1233#[derive(Debug, Clone)]
1234pub struct AlterSinkPlan {
1235    pub item_id: CatalogItemId,
1236    pub global_id: GlobalId,
1237    pub sink: Sink,
1238    pub with_snapshot: bool,
1239    pub in_cluster: ClusterId,
1240}
1241
1242#[derive(Debug, Clone)]
1243pub struct AlterClusterPlan {
1244    pub id: ClusterId,
1245    pub name: String,
1246    pub options: PlanClusterOption,
1247    pub strategy: AlterClusterPlanStrategy,
1248}
1249
1250#[derive(Debug)]
1251pub struct AlterClusterRenamePlan {
1252    pub id: ClusterId,
1253    pub name: String,
1254    pub to_name: String,
1255}
1256
1257#[derive(Debug)]
1258pub struct AlterClusterReplicaRenamePlan {
1259    pub cluster_id: ClusterId,
1260    pub replica_id: ReplicaId,
1261    pub name: QualifiedReplica,
1262    pub to_name: String,
1263}
1264
1265#[derive(Debug)]
1266pub struct AlterItemRenamePlan {
1267    pub id: CatalogItemId,
1268    pub current_full_name: FullItemName,
1269    pub to_name: String,
1270    pub object_type: ObjectType,
1271}
1272
1273#[derive(Debug)]
1274pub struct AlterSchemaRenamePlan {
1275    pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1276    pub new_schema_name: String,
1277}
1278
1279#[derive(Debug)]
1280pub struct AlterSchemaSwapPlan {
1281    pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1282    pub schema_a_name: String,
1283    pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1284    pub schema_b_name: String,
1285    pub name_temp: String,
1286}
1287
1288#[derive(Debug)]
1289pub struct AlterClusterSwapPlan {
1290    pub id_a: ClusterId,
1291    pub id_b: ClusterId,
1292    pub name_a: String,
1293    pub name_b: String,
1294    pub name_temp: String,
1295}
1296
1297#[derive(Debug)]
1298pub struct AlterSecretPlan {
1299    pub id: CatalogItemId,
1300    pub secret_as: MirScalarExpr,
1301}
1302
1303#[derive(Debug)]
1304pub struct AlterSystemSetPlan {
1305    pub name: String,
1306    pub value: VariableValue,
1307}
1308
1309#[derive(Debug)]
1310pub struct AlterSystemResetPlan {
1311    pub name: String,
1312}
1313
1314#[derive(Debug)]
1315pub struct AlterSystemResetAllPlan {}
1316
1317#[derive(Debug)]
1318pub struct AlterRolePlan {
1319    pub id: RoleId,
1320    pub name: String,
1321    pub option: PlannedAlterRoleOption,
1322}
1323
1324#[derive(Debug)]
1325pub struct AlterOwnerPlan {
1326    pub id: ObjectId,
1327    pub object_type: ObjectType,
1328    pub new_owner: RoleId,
1329}
1330
1331#[derive(Debug)]
1332pub struct AlterTablePlan {
1333    pub relation_id: CatalogItemId,
1334    pub column_name: ColumnName,
1335    pub column_type: SqlColumnType,
1336    pub raw_sql_type: RawDataType,
1337}
1338
1339#[derive(Debug)]
1340pub struct AlterMaterializedViewApplyReplacementPlan {
1341    pub id: CatalogItemId,
1342    pub replacement_id: CatalogItemId,
1343}
1344
1345#[derive(Debug)]
1346pub struct DeclarePlan {
1347    pub name: String,
1348    pub stmt: Statement<Raw>,
1349    pub sql: String,
1350    pub params: Params,
1351}
1352
1353#[derive(Debug)]
1354pub struct FetchPlan {
1355    pub name: String,
1356    pub count: Option<FetchDirection>,
1357    pub timeout: ExecuteTimeout,
1358}
1359
1360#[derive(Debug)]
1361pub struct ClosePlan {
1362    pub name: String,
1363}
1364
1365#[derive(Debug)]
1366pub struct PreparePlan {
1367    pub name: String,
1368    pub stmt: Statement<Raw>,
1369    pub sql: String,
1370    pub desc: StatementDesc,
1371}
1372
1373#[derive(Debug)]
1374pub struct ExecutePlan {
1375    pub name: String,
1376    pub params: Params,
1377}
1378
1379#[derive(Debug)]
1380pub struct DeallocatePlan {
1381    pub name: Option<String>,
1382}
1383
1384#[derive(Debug)]
1385pub struct RaisePlan {
1386    pub severity: NoticeSeverity,
1387}
1388
1389#[derive(Debug)]
1390pub struct GrantRolePlan {
1391    /// The roles that are gaining members.
1392    pub role_ids: Vec<RoleId>,
1393    /// The roles that will be added to `role_id`.
1394    pub member_ids: Vec<RoleId>,
1395    /// The role that granted the membership.
1396    pub grantor_id: RoleId,
1397}
1398
1399#[derive(Debug)]
1400pub struct RevokeRolePlan {
1401    /// The roles that are losing members.
1402    pub role_ids: Vec<RoleId>,
1403    /// The roles that will be removed from `role_id`.
1404    pub member_ids: Vec<RoleId>,
1405    /// The role that revoked the membership.
1406    pub grantor_id: RoleId,
1407}
1408
1409#[derive(Debug)]
1410pub struct UpdatePrivilege {
1411    /// The privileges being granted/revoked on an object.
1412    pub acl_mode: AclMode,
1413    /// The ID of the object receiving privileges.
1414    pub target_id: SystemObjectId,
1415    /// The role that is granting the privileges.
1416    pub grantor: RoleId,
1417}
1418
1419#[derive(Debug)]
1420pub struct GrantPrivilegesPlan {
1421    /// Description of each privilege being granted.
1422    pub update_privileges: Vec<UpdatePrivilege>,
1423    /// The roles that will granted the privileges.
1424    pub grantees: Vec<RoleId>,
1425}
1426
1427#[derive(Debug)]
1428pub struct RevokePrivilegesPlan {
1429    /// Description of each privilege being revoked.
1430    pub update_privileges: Vec<UpdatePrivilege>,
1431    /// The roles that will have privileges revoked.
1432    pub revokees: Vec<RoleId>,
1433}
1434#[derive(Debug)]
1435pub struct AlterDefaultPrivilegesPlan {
1436    /// Description of objects that match this default privilege.
1437    pub privilege_objects: Vec<DefaultPrivilegeObject>,
1438    /// The privilege to be granted/revoked from the matching objects.
1439    pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1440    /// Whether this is a grant or revoke.
1441    pub is_grant: bool,
1442}
1443
1444#[derive(Debug)]
1445pub struct ReassignOwnedPlan {
1446    /// The roles whose owned objects are being reassigned.
1447    pub old_roles: Vec<RoleId>,
1448    /// The new owner of the objects.
1449    pub new_role: RoleId,
1450    /// All object IDs to reassign.
1451    pub reassign_ids: Vec<ObjectId>,
1452}
1453
1454#[derive(Debug)]
1455pub struct CommentPlan {
1456    /// The object that this comment is associated with.
1457    pub object_id: CommentObjectId,
1458    /// A sub-component of the object that this comment is associated with, e.g. a column.
1459    ///
1460    /// TODO(parkmycar): <https://github.com/MaterializeInc/database-issues/issues/6711>.
1461    pub sub_component: Option<usize>,
1462    /// The comment itself. If `None` that indicates we should clear the existing comment.
1463    pub comment: Option<String>,
1464}
1465
1466#[derive(Clone, Debug)]
1467pub enum TableDataSource {
1468    /// The table owns data created via INSERT/UPDATE/DELETE statements.
1469    TableWrites { defaults: Vec<Expr<Aug>> },
1470
1471    /// The table receives its data from the identified `DataSourceDesc`.
1472    /// This table type does not support INSERT/UPDATE/DELETE statements.
1473    DataSource {
1474        desc: DataSourceDesc,
1475        timeline: Timeline,
1476    },
1477}
1478
1479#[derive(Clone, Debug)]
1480pub struct Table {
1481    pub create_sql: String,
1482    pub desc: VersionedRelationDesc,
1483    pub temporary: bool,
1484    pub compaction_window: Option<CompactionWindow>,
1485    pub data_source: TableDataSource,
1486}
1487
1488#[derive(Clone, Debug)]
1489pub struct Source {
1490    pub create_sql: String,
1491    pub data_source: DataSourceDesc,
1492    pub desc: RelationDesc,
1493    pub compaction_window: Option<CompactionWindow>,
1494}
1495
1496#[derive(Debug, Clone)]
1497pub enum DataSourceDesc {
1498    /// Receives data from an external system.
1499    Ingestion(SourceDesc<ReferencedConnection>),
1500    /// Receives data from an external system.
1501    OldSyntaxIngestion {
1502        desc: SourceDesc<ReferencedConnection>,
1503        // If we're dealing with an old syntax ingestion the progress id will be some other collection
1504        // and the ingestion itself will have the data from a default external reference
1505        progress_subsource: CatalogItemId,
1506        data_config: SourceExportDataConfig<ReferencedConnection>,
1507        details: SourceExportDetails,
1508    },
1509    /// This source receives its data from the identified ingestion,
1510    /// specifically the output identified by `external_reference`.
1511    IngestionExport {
1512        ingestion_id: CatalogItemId,
1513        external_reference: UnresolvedItemName,
1514        details: SourceExportDetails,
1515        data_config: SourceExportDataConfig<ReferencedConnection>,
1516    },
1517    /// Receives data from the source's reclocking/remapping operations.
1518    Progress,
1519    /// Receives data from HTTP post requests.
1520    Webhook {
1521        validate_using: Option<WebhookValidation>,
1522        body_format: WebhookBodyFormat,
1523        headers: WebhookHeaders,
1524        /// Only `Some` when created via `CREATE TABLE ... FROM WEBHOOK`.
1525        cluster_id: Option<StorageInstanceId>,
1526    },
1527}
1528
1529#[derive(Clone, Debug, Serialize)]
1530pub struct WebhookValidation {
1531    /// The expression used to validate a request.
1532    pub expression: MirScalarExpr,
1533    /// Description of the source that will be created.
1534    pub relation_desc: RelationDesc,
1535    /// The column index to provide the request body and whether to provide it as bytes.
1536    pub bodies: Vec<(usize, bool)>,
1537    /// The column index to provide the request headers and whether to provide the values as bytes.
1538    pub headers: Vec<(usize, bool)>,
1539    /// Any secrets that are used in that validation.
1540    pub secrets: Vec<WebhookValidationSecret>,
1541}
1542
1543impl WebhookValidation {
1544    const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1545
1546    /// Attempt to reduce the internal [`MirScalarExpr`] into a simpler expression.
1547    ///
1548    /// The reduction happens on a separate thread, we also only wait for
1549    /// `WebhookValidation::MAX_REDUCE_TIME` before timing out and returning an error.
1550    pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1551        let WebhookValidation {
1552            expression,
1553            relation_desc,
1554            ..
1555        } = self;
1556
1557        // On a different thread, attempt to reduce the expression.
1558        let mut expression_ = expression.clone();
1559        let desc_ = relation_desc.clone();
1560        let reduce_task = mz_ore::task::spawn_blocking(
1561            || "webhook-validation-reduce",
1562            move || {
1563                expression_.reduce(&desc_.typ().column_types);
1564                expression_
1565            },
1566        );
1567
1568        match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1569            Ok(reduced_expr) => {
1570                *expression = reduced_expr;
1571                Ok(())
1572            }
1573            Err(_) => Err("timeout"),
1574        }
1575    }
1576}
1577
1578#[derive(Clone, Debug, Default, Serialize)]
1579pub struct WebhookHeaders {
1580    /// Optionally include a column named `headers` whose content is possibly filtered.
1581    pub header_column: Option<WebhookHeaderFilters>,
1582    /// The column index to provide the specific request header, and whether to provide it as bytes.
1583    pub mapped_headers: BTreeMap<usize, (String, bool)>,
1584}
1585
1586impl WebhookHeaders {
1587    /// Returns the number of columns needed to represent our headers.
1588    pub fn num_columns(&self) -> usize {
1589        let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1590        let mapped_headers = self.mapped_headers.len();
1591
1592        header_column + mapped_headers
1593    }
1594}
1595
1596#[derive(Clone, Debug, Default, Serialize)]
1597pub struct WebhookHeaderFilters {
1598    pub block: BTreeSet<String>,
1599    pub allow: BTreeSet<String>,
1600}
1601
1602#[derive(Copy, Clone, Debug, Serialize, Arbitrary)]
1603pub enum WebhookBodyFormat {
1604    Json { array: bool },
1605    Bytes,
1606    Text,
1607}
1608
1609impl From<WebhookBodyFormat> for SqlScalarType {
1610    fn from(value: WebhookBodyFormat) -> Self {
1611        match value {
1612            WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1613            WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1614            WebhookBodyFormat::Text => SqlScalarType::String,
1615        }
1616    }
1617}
1618
1619#[derive(Clone, Debug, Serialize)]
1620pub struct WebhookValidationSecret {
1621    /// Identifies the secret by [`CatalogItemId`].
1622    pub id: CatalogItemId,
1623    /// Column index for the expression context that this secret was originally evaluated in.
1624    pub column_idx: usize,
1625    /// Whether or not this secret should be provided to the expression as Bytes or a String.
1626    pub use_bytes: bool,
1627}
1628
1629#[derive(Clone, Debug)]
1630pub struct Connection {
1631    pub create_sql: String,
1632    pub details: ConnectionDetails,
1633}
1634
1635#[derive(Clone, Debug, Serialize)]
1636pub enum ConnectionDetails {
1637    Kafka(KafkaConnection<ReferencedConnection>),
1638    Csr(CsrConnection<ReferencedConnection>),
1639    Postgres(PostgresConnection<ReferencedConnection>),
1640    Ssh {
1641        connection: SshConnection,
1642        key_1: SshKey,
1643        key_2: SshKey,
1644    },
1645    Aws(AwsConnection),
1646    AwsPrivatelink(AwsPrivatelinkConnection),
1647    MySql(MySqlConnection<ReferencedConnection>),
1648    SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1649    IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1650}
1651
1652impl ConnectionDetails {
1653    pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1654        match self {
1655            ConnectionDetails::Kafka(c) => {
1656                mz_storage_types::connections::Connection::Kafka(c.clone())
1657            }
1658            ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1659            ConnectionDetails::Postgres(c) => {
1660                mz_storage_types::connections::Connection::Postgres(c.clone())
1661            }
1662            ConnectionDetails::Ssh { connection, .. } => {
1663                mz_storage_types::connections::Connection::Ssh(connection.clone())
1664            }
1665            ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1666            ConnectionDetails::AwsPrivatelink(c) => {
1667                mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1668            }
1669            ConnectionDetails::MySql(c) => {
1670                mz_storage_types::connections::Connection::MySql(c.clone())
1671            }
1672            ConnectionDetails::SqlServer(c) => {
1673                mz_storage_types::connections::Connection::SqlServer(c.clone())
1674            }
1675            ConnectionDetails::IcebergCatalog(c) => {
1676                mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1677            }
1678        }
1679    }
1680}
1681
1682#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1683pub struct NetworkPolicyRule {
1684    pub name: String,
1685    pub action: NetworkPolicyRuleAction,
1686    pub address: PolicyAddress,
1687    pub direction: NetworkPolicyRuleDirection,
1688}
1689
1690#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1691pub enum NetworkPolicyRuleAction {
1692    Allow,
1693}
1694
1695impl std::fmt::Display for NetworkPolicyRuleAction {
1696    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1697        match self {
1698            Self::Allow => write!(f, "allow"),
1699        }
1700    }
1701}
1702impl TryFrom<&str> for NetworkPolicyRuleAction {
1703    type Error = PlanError;
1704    fn try_from(value: &str) -> Result<Self, Self::Error> {
1705        match value.to_uppercase().as_str() {
1706            "ALLOW" => Ok(Self::Allow),
1707            _ => Err(PlanError::Unstructured(
1708                "Allow is the only valid option".into(),
1709            )),
1710        }
1711    }
1712}
1713
1714#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1715pub enum NetworkPolicyRuleDirection {
1716    Ingress,
1717}
1718impl std::fmt::Display for NetworkPolicyRuleDirection {
1719    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1720        match self {
1721            Self::Ingress => write!(f, "ingress"),
1722        }
1723    }
1724}
1725impl TryFrom<&str> for NetworkPolicyRuleDirection {
1726    type Error = PlanError;
1727    fn try_from(value: &str) -> Result<Self, Self::Error> {
1728        match value.to_uppercase().as_str() {
1729            "INGRESS" => Ok(Self::Ingress),
1730            _ => Err(PlanError::Unstructured(
1731                "Ingress is the only valid option".into(),
1732            )),
1733        }
1734    }
1735}
1736
1737#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1738pub struct PolicyAddress(pub IpNet);
1739impl std::fmt::Display for PolicyAddress {
1740    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1741        write!(f, "{}", &self.0.to_string())
1742    }
1743}
1744impl From<String> for PolicyAddress {
1745    fn from(value: String) -> Self {
1746        Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1747    }
1748}
1749impl TryFrom<&str> for PolicyAddress {
1750    type Error = PlanError;
1751    fn try_from(value: &str) -> Result<Self, Self::Error> {
1752        let net = IpNet::from_str(value)
1753            .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1754        Ok(Self(net))
1755    }
1756}
1757
1758impl Serialize for PolicyAddress {
1759    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1760    where
1761        S: serde::Serializer,
1762    {
1763        serializer.serialize_str(&format!("{}", &self.0))
1764    }
1765}
1766
1767#[derive(Clone, Debug, Serialize)]
1768pub enum SshKey {
1769    PublicOnly(String),
1770    Both(SshKeyPair),
1771}
1772
1773impl SshKey {
1774    pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1775        match self {
1776            SshKey::PublicOnly(_) => None,
1777            SshKey::Both(key_pair) => Some(key_pair),
1778        }
1779    }
1780
1781    pub fn public_key(&self) -> String {
1782        match self {
1783            SshKey::PublicOnly(s) => s.into(),
1784            SshKey::Both(p) => p.ssh_public_key(),
1785        }
1786    }
1787}
1788
1789#[derive(Clone, Debug)]
1790pub struct Secret {
1791    pub create_sql: String,
1792    pub secret_as: MirScalarExpr,
1793}
1794
1795#[derive(Clone, Debug)]
1796pub struct Sink {
1797    /// Parse-able SQL that is stored durably and defines this sink.
1798    pub create_sql: String,
1799    /// Collection we read into this sink.
1800    pub from: GlobalId,
1801    /// Type of connection to the external service we sink into.
1802    pub connection: StorageSinkConnection<ReferencedConnection>,
1803    // TODO(guswynn): this probably should just be in the `connection`.
1804    pub envelope: SinkEnvelope,
1805    pub version: u64,
1806}
1807
1808#[derive(Clone, Debug)]
1809pub struct View {
1810    /// Parse-able SQL that is stored durably and defines this view.
1811    pub create_sql: String,
1812    /// Unoptimized high-level expression from parsing the `create_sql`.
1813    pub expr: HirRelationExpr,
1814    /// All of the catalog objects that are referenced by this view, according to the `expr`.
1815    pub dependencies: DependencyIds,
1816    /// Columns of this view.
1817    pub column_names: Vec<ColumnName>,
1818    /// If this view is created in the temporary schema, e.g. `CREATE TEMPORARY ...`.
1819    pub temporary: bool,
1820}
1821
1822#[derive(Clone, Debug)]
1823pub struct MaterializedView {
1824    /// Parse-able SQL that is stored durably and defines this materialized view.
1825    pub create_sql: String,
1826    /// Unoptimized high-level expression from parsing the `create_sql`.
1827    pub expr: HirRelationExpr,
1828    /// All of the catalog objects that are referenced by this materialized view, according to the `expr`.
1829    pub dependencies: DependencyIds,
1830    /// Columns of this view.
1831    pub column_names: Vec<ColumnName>,
1832    pub replacement_target: Option<CatalogItemId>,
1833    /// Cluster this materialized view will get installed on.
1834    pub cluster_id: ClusterId,
1835    pub non_null_assertions: Vec<usize>,
1836    pub compaction_window: Option<CompactionWindow>,
1837    pub refresh_schedule: Option<RefreshSchedule>,
1838    pub as_of: Option<Timestamp>,
1839}
1840
1841#[derive(Clone, Debug)]
1842pub struct Index {
1843    /// Parse-able SQL that is stored durably and defines this index.
1844    pub create_sql: String,
1845    /// Collection this index is on top of.
1846    pub on: GlobalId,
1847    pub keys: Vec<mz_expr::MirScalarExpr>,
1848    pub compaction_window: Option<CompactionWindow>,
1849    pub cluster_id: ClusterId,
1850}
1851
1852#[derive(Clone, Debug)]
1853pub struct Type {
1854    pub create_sql: String,
1855    pub inner: CatalogType<IdReference>,
1856}
1857
1858/// Specifies when a `Peek` or `Subscribe` should occur.
1859#[derive(Deserialize, Clone, Debug, PartialEq)]
1860pub enum QueryWhen {
1861    /// The peek should occur at the latest possible timestamp that allows the
1862    /// peek to complete immediately.
1863    Immediately,
1864    /// The peek should occur at a timestamp that allows the peek to see all
1865    /// data written to tables within Materialize.
1866    FreshestTableWrite,
1867    /// The peek should occur at the timestamp described by the specified
1868    /// expression.
1869    ///
1870    /// The expression may have any type.
1871    AtTimestamp(Timestamp),
1872    /// Same as Immediately, but will also advance to at least the specified
1873    /// expression.
1874    AtLeastTimestamp(Timestamp),
1875}
1876
1877impl QueryWhen {
1878    /// Returns a timestamp to which the candidate must be advanced.
1879    pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1880        match self {
1881            QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1882            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1883        }
1884    }
1885    /// Returns whether the candidate's upper bound is constrained.
1886    /// This is only true for `AtTimestamp` since it is the only variant that
1887    /// specifies a timestamp.
1888    pub fn constrains_upper(&self) -> bool {
1889        match self {
1890            QueryWhen::AtTimestamp(_) => true,
1891            QueryWhen::AtLeastTimestamp(_)
1892            | QueryWhen::Immediately
1893            | QueryWhen::FreshestTableWrite => false,
1894        }
1895    }
1896    /// Returns whether the candidate must be advanced to the since.
1897    pub fn advance_to_since(&self) -> bool {
1898        match self {
1899            QueryWhen::Immediately
1900            | QueryWhen::AtLeastTimestamp(_)
1901            | QueryWhen::FreshestTableWrite => true,
1902            QueryWhen::AtTimestamp(_) => false,
1903        }
1904    }
1905    /// Returns whether the candidate can be advanced to the upper.
1906    pub fn can_advance_to_upper(&self) -> bool {
1907        match self {
1908            QueryWhen::Immediately => true,
1909            QueryWhen::FreshestTableWrite
1910            | QueryWhen::AtTimestamp(_)
1911            | QueryWhen::AtLeastTimestamp(_) => false,
1912        }
1913    }
1914
1915    /// Returns whether the candidate can be advanced to the timeline's timestamp.
1916    pub fn can_advance_to_timeline_ts(&self) -> bool {
1917        match self {
1918            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1919            QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1920        }
1921    }
1922    /// Returns whether the candidate must be advanced to the timeline's timestamp.
1923    pub fn must_advance_to_timeline_ts(&self) -> bool {
1924        match self {
1925            QueryWhen::FreshestTableWrite => true,
1926            QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1927                false
1928            }
1929        }
1930    }
1931    /// Returns whether the selected timestamp should be tracked within the current transaction.
1932    pub fn is_transactional(&self) -> bool {
1933        match self {
1934            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1935            QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1936        }
1937    }
1938}
1939
1940#[derive(Debug, Copy, Clone)]
1941pub enum MutationKind {
1942    Insert,
1943    Update,
1944    Delete,
1945}
1946
1947#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1948pub enum CopyFormat {
1949    Text,
1950    Csv,
1951    Binary,
1952    Parquet,
1953}
1954
1955#[derive(Debug, Copy, Clone)]
1956pub enum ExecuteTimeout {
1957    None,
1958    Seconds(f64),
1959    WaitOnce,
1960}
1961
1962#[derive(Clone, Debug)]
1963pub enum IndexOption {
1964    /// Configures the logical compaction window for an index.
1965    RetainHistory(CompactionWindow),
1966}
1967
1968#[derive(Clone, Debug)]
1969pub enum TableOption {
1970    /// Configures the logical compaction window for a table.
1971    RetainHistory(CompactionWindow),
1972}
1973
1974#[derive(Clone, Debug)]
1975pub struct PlanClusterOption {
1976    pub availability_zones: AlterOptionParameter<Vec<String>>,
1977    pub introspection_debugging: AlterOptionParameter<bool>,
1978    pub introspection_interval: AlterOptionParameter<OptionalDuration>,
1979    pub managed: AlterOptionParameter<bool>,
1980    pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
1981    pub replication_factor: AlterOptionParameter<u32>,
1982    pub size: AlterOptionParameter,
1983    pub schedule: AlterOptionParameter<ClusterSchedule>,
1984    pub workload_class: AlterOptionParameter<Option<String>>,
1985}
1986
1987impl Default for PlanClusterOption {
1988    fn default() -> Self {
1989        Self {
1990            availability_zones: AlterOptionParameter::Unchanged,
1991            introspection_debugging: AlterOptionParameter::Unchanged,
1992            introspection_interval: AlterOptionParameter::Unchanged,
1993            managed: AlterOptionParameter::Unchanged,
1994            replicas: AlterOptionParameter::Unchanged,
1995            replication_factor: AlterOptionParameter::Unchanged,
1996            size: AlterOptionParameter::Unchanged,
1997            schedule: AlterOptionParameter::Unchanged,
1998            workload_class: AlterOptionParameter::Unchanged,
1999        }
2000    }
2001}
2002
2003#[derive(Clone, Debug, PartialEq, Eq)]
2004pub enum AlterClusterPlanStrategy {
2005    None,
2006    For(Duration),
2007    UntilReady {
2008        on_timeout: OnTimeoutAction,
2009        timeout: Duration,
2010    },
2011}
2012
2013#[derive(Clone, Debug, PartialEq, Eq)]
2014pub enum OnTimeoutAction {
2015    Commit,
2016    Rollback,
2017}
2018
2019impl Default for OnTimeoutAction {
2020    fn default() -> Self {
2021        Self::Commit
2022    }
2023}
2024
2025impl TryFrom<&str> for OnTimeoutAction {
2026    type Error = PlanError;
2027    fn try_from(value: &str) -> Result<Self, Self::Error> {
2028        match value.to_uppercase().as_str() {
2029            "COMMIT" => Ok(Self::Commit),
2030            "ROLLBACK" => Ok(Self::Rollback),
2031            _ => Err(PlanError::Unstructured(
2032                "Valid options are COMMIT, ROLLBACK".into(),
2033            )),
2034        }
2035    }
2036}
2037
2038impl AlterClusterPlanStrategy {
2039    pub fn is_none(&self) -> bool {
2040        matches!(self, Self::None)
2041    }
2042    pub fn is_some(&self) -> bool {
2043        !matches!(self, Self::None)
2044    }
2045}
2046
2047impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2048    type Error = PlanError;
2049
2050    fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2051        Ok(match value.wait {
2052            Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2053            Some(ClusterAlterOptionValue::UntilReady(options)) => {
2054                let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2055                Self::UntilReady {
2056                    timeout: match extracted.timeout {
2057                        Some(d) => d,
2058                        None => Err(PlanError::UntilReadyTimeoutRequired)?,
2059                    },
2060                    on_timeout: match extracted.on_timeout {
2061                        Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2062                            PlanError::InvalidOptionValue {
2063                                option_name: "ON TIMEOUT".into(),
2064                                err: Box::new(e),
2065                            }
2066                        })?,
2067                        None => OnTimeoutAction::default(),
2068                    },
2069                }
2070            }
2071            None => Self::None,
2072        })
2073    }
2074}
2075
2076/// A vector of values to which parameter references should be bound.
2077#[derive(Debug, Clone)]
2078pub struct Params {
2079    /// The datums that were provided in the EXECUTE statement.
2080    pub datums: Row,
2081    /// The types of the datums provided in the EXECUTE statement.
2082    pub execute_types: Vec<SqlScalarType>,
2083    /// The types that the prepared statement expects based on its definition.
2084    pub expected_types: Vec<SqlScalarType>,
2085}
2086
2087impl Params {
2088    /// Returns a `Params` with no parameters.
2089    pub fn empty() -> Params {
2090        Params {
2091            datums: Row::pack_slice(&[]),
2092            execute_types: vec![],
2093            expected_types: vec![],
2094        }
2095    }
2096}
2097
2098/// Controls planning of a SQL query.
2099#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, Copy)]
2100pub struct PlanContext {
2101    pub wall_time: DateTime<Utc>,
2102    pub ignore_if_exists_errors: bool,
2103}
2104
2105impl PlanContext {
2106    pub fn new(wall_time: DateTime<Utc>) -> Self {
2107        Self {
2108            wall_time,
2109            ignore_if_exists_errors: false,
2110        }
2111    }
2112
2113    /// Return a PlanContext with zero values. This should only be used when
2114    /// planning is required but unused (like in `plan_create_table()`) or in
2115    /// tests.
2116    pub fn zero() -> Self {
2117        PlanContext {
2118            wall_time: now::to_datetime(NOW_ZERO()),
2119            ignore_if_exists_errors: false,
2120        }
2121    }
2122
2123    pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2124        self.ignore_if_exists_errors = value;
2125        self
2126    }
2127}