mz_sql/
plan.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL planning.
11//!
12//! SQL planning is the process of taking the abstract syntax tree of a
13//! [`Statement`] and turning it into a [`Plan`] that the dataflow layer can
14//! execute.
15//!
16//! Statements must be purified before they can be planned. See the
17//! [`pure`](crate::pure) module for details.
18
19// Internal module layout.
20//
21// The entry point for planning is `statement::handle_statement`. That function
22// dispatches to a more specific `handle` function for the particular statement
23// type. For most statements, this `handle` function is uninteresting and short,
24// but anything involving a `SELECT` statement gets complicated. `SELECT`
25// queries wind through the functions in the `query` module, starting with
26// `plan_root_query` and fanning out based on the contents of the `SELECT`
27// statement.
28
29use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{
41    CollectionPlan, ColumnOrder, MapFilterProject, MirRelationExpr, MirScalarExpr, RowSetFinishing,
42};
43use mz_ore::now::{self, NOW_ZERO};
44use mz_pgcopy::CopyFormatParams;
45use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
46use mz_repr::explain::{ExplainConfig, ExplainFormat};
47use mz_repr::network_policy_id::NetworkPolicyId;
48use mz_repr::optimize::OptimizerFeatureOverrides;
49use mz_repr::refresh_schedule::RefreshSchedule;
50use mz_repr::role_id::RoleId;
51use mz_repr::{
52    CatalogItemId, ColumnIndex, ColumnName, ColumnType, Diff, GlobalId, RelationDesc, Row,
53    ScalarType, Timestamp, VersionedRelationDesc,
54};
55use mz_sql_parser::ast::{
56    AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
57    RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
58    Value, WithOptionValue,
59};
60use mz_ssh_util::keys::SshKeyPair;
61use mz_storage_types::connections::aws::AwsConnection;
62use mz_storage_types::connections::inline::ReferencedConnection;
63use mz_storage_types::connections::{
64    AwsPrivatelinkConnection, CsrConnection, KafkaConnection, MySqlConnection, PostgresConnection,
65    SqlServerConnectionDetails, SshConnection,
66};
67use mz_storage_types::instances::StorageInstanceId;
68use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
69use mz_storage_types::sources::{
70    SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
71};
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74
75use crate::ast::{
76    ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
77    TransactionAccessMode,
78};
79use crate::catalog::{
80    CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
81    RoleAttributes,
82};
83use crate::names::{
84    Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
85    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
86};
87
88pub(crate) mod error;
89pub(crate) mod explain;
90pub(crate) mod hir;
91pub(crate) mod literal;
92pub(crate) mod lowering;
93pub(crate) mod notice;
94pub(crate) mod plan_utils;
95pub(crate) mod query;
96pub(crate) mod scope;
97pub(crate) mod side_effecting_func;
98pub(crate) mod statement;
99pub(crate) mod transform_ast;
100pub(crate) mod transform_hir;
101pub(crate) mod typeconv;
102pub(crate) mod with_options;
103
104use crate::plan;
105use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
106use crate::plan::with_options::OptionalDuration;
107pub use error::PlanError;
108pub use explain::normalize_subqueries;
109pub use hir::{
110    AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
111    WindowExprType,
112};
113pub use lowering::Config as HirToMirConfig;
114pub use notice::PlanNotice;
115pub use query::{ExprContext, QueryContext, QueryLifetime};
116pub use scope::Scope;
117pub use side_effecting_func::SideEffectingFunc;
118pub use statement::ddl::{
119    AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
120    PlannedAlterRoleOption, PlannedRoleVariable, SqlServerConfigOptionExtracted,
121};
122pub use statement::{
123    StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
124    resolve_cluster_for_materialized_view,
125};
126
127use self::statement::ddl::ClusterAlterOptionExtracted;
128use self::with_options::TryFromValue;
129
130/// Instructions for executing a SQL query.
131#[derive(Debug, EnumKind)]
132#[enum_kind(PlanKind)]
133pub enum Plan {
134    CreateConnection(CreateConnectionPlan),
135    CreateDatabase(CreateDatabasePlan),
136    CreateSchema(CreateSchemaPlan),
137    CreateRole(CreateRolePlan),
138    CreateCluster(CreateClusterPlan),
139    CreateClusterReplica(CreateClusterReplicaPlan),
140    CreateSource(CreateSourcePlan),
141    CreateSources(Vec<CreateSourcePlanBundle>),
142    CreateSecret(CreateSecretPlan),
143    CreateSink(CreateSinkPlan),
144    CreateTable(CreateTablePlan),
145    CreateView(CreateViewPlan),
146    CreateMaterializedView(CreateMaterializedViewPlan),
147    CreateContinualTask(CreateContinualTaskPlan),
148    CreateNetworkPolicy(CreateNetworkPolicyPlan),
149    CreateIndex(CreateIndexPlan),
150    CreateType(CreateTypePlan),
151    Comment(CommentPlan),
152    DiscardTemp,
153    DiscardAll,
154    DropObjects(DropObjectsPlan),
155    DropOwned(DropOwnedPlan),
156    EmptyQuery,
157    ShowAllVariables,
158    ShowCreate(ShowCreatePlan),
159    ShowColumns(ShowColumnsPlan),
160    ShowVariable(ShowVariablePlan),
161    InspectShard(InspectShardPlan),
162    SetVariable(SetVariablePlan),
163    ResetVariable(ResetVariablePlan),
164    SetTransaction(SetTransactionPlan),
165    StartTransaction(StartTransactionPlan),
166    CommitTransaction(CommitTransactionPlan),
167    AbortTransaction(AbortTransactionPlan),
168    Select(SelectPlan),
169    Subscribe(SubscribePlan),
170    CopyFrom(CopyFromPlan),
171    CopyTo(CopyToPlan),
172    ExplainPlan(ExplainPlanPlan),
173    ExplainPushdown(ExplainPushdownPlan),
174    ExplainTimestamp(ExplainTimestampPlan),
175    ExplainSinkSchema(ExplainSinkSchemaPlan),
176    Insert(InsertPlan),
177    AlterCluster(AlterClusterPlan),
178    AlterClusterSwap(AlterClusterSwapPlan),
179    AlterNoop(AlterNoopPlan),
180    AlterSetCluster(AlterSetClusterPlan),
181    AlterConnection(AlterConnectionPlan),
182    AlterSource(AlterSourcePlan),
183    AlterClusterRename(AlterClusterRenamePlan),
184    AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
185    AlterItemRename(AlterItemRenamePlan),
186    AlterSchemaRename(AlterSchemaRenamePlan),
187    AlterSchemaSwap(AlterSchemaSwapPlan),
188    AlterSecret(AlterSecretPlan),
189    AlterSink(AlterSinkPlan),
190    AlterSystemSet(AlterSystemSetPlan),
191    AlterSystemReset(AlterSystemResetPlan),
192    AlterSystemResetAll(AlterSystemResetAllPlan),
193    AlterRole(AlterRolePlan),
194    AlterOwner(AlterOwnerPlan),
195    AlterTableAddColumn(AlterTablePlan),
196    AlterNetworkPolicy(AlterNetworkPolicyPlan),
197    Declare(DeclarePlan),
198    Fetch(FetchPlan),
199    Close(ClosePlan),
200    ReadThenWrite(ReadThenWritePlan),
201    Prepare(PreparePlan),
202    Execute(ExecutePlan),
203    Deallocate(DeallocatePlan),
204    Raise(RaisePlan),
205    GrantRole(GrantRolePlan),
206    RevokeRole(RevokeRolePlan),
207    GrantPrivileges(GrantPrivilegesPlan),
208    RevokePrivileges(RevokePrivilegesPlan),
209    AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
210    ReassignOwned(ReassignOwnedPlan),
211    SideEffectingFunc(SideEffectingFunc),
212    ValidateConnection(ValidateConnectionPlan),
213    AlterRetainHistory(AlterRetainHistoryPlan),
214}
215
216impl Plan {
217    /// Expresses which [`StatementKind`] can generate which set of
218    /// [`PlanKind`].
219    pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
220        match stmt {
221            StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
222            StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
223            StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
224            StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
225            StatementKind::AlterObjectRename => &[
226                PlanKind::AlterClusterRename,
227                PlanKind::AlterClusterReplicaRename,
228                PlanKind::AlterItemRename,
229                PlanKind::AlterSchemaRename,
230                PlanKind::AlterNoop,
231            ],
232            StatementKind::AlterObjectSwap => &[
233                PlanKind::AlterClusterSwap,
234                PlanKind::AlterSchemaSwap,
235                PlanKind::AlterNoop,
236            ],
237            StatementKind::AlterRole => &[PlanKind::AlterRole],
238            StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
239            StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
240            StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
241            StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
242            StatementKind::AlterSource => &[
243                PlanKind::AlterNoop,
244                PlanKind::AlterSource,
245                PlanKind::AlterRetainHistory,
246            ],
247            StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
248            StatementKind::AlterSystemResetAll => {
249                &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
250            }
251            StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
252            StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
253            StatementKind::AlterTableAddColumn => {
254                &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
255            }
256            StatementKind::Close => &[PlanKind::Close],
257            StatementKind::Comment => &[PlanKind::Comment],
258            StatementKind::Commit => &[PlanKind::CommitTransaction],
259            StatementKind::Copy => &[
260                PlanKind::CopyFrom,
261                PlanKind::Select,
262                PlanKind::Subscribe,
263                PlanKind::CopyTo,
264            ],
265            StatementKind::CreateCluster => &[PlanKind::CreateCluster],
266            StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
267            StatementKind::CreateConnection => &[PlanKind::CreateConnection],
268            StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
269            StatementKind::CreateIndex => &[PlanKind::CreateIndex],
270            StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
271            StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
272            StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
273            StatementKind::CreateRole => &[PlanKind::CreateRole],
274            StatementKind::CreateSchema => &[PlanKind::CreateSchema],
275            StatementKind::CreateSecret => &[PlanKind::CreateSecret],
276            StatementKind::CreateSink => &[PlanKind::CreateSink],
277            StatementKind::CreateSource | StatementKind::CreateSubsource => {
278                &[PlanKind::CreateSource]
279            }
280            StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
281            StatementKind::CreateTable => &[PlanKind::CreateTable],
282            StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
283            StatementKind::CreateType => &[PlanKind::CreateType],
284            StatementKind::CreateView => &[PlanKind::CreateView],
285            StatementKind::Deallocate => &[PlanKind::Deallocate],
286            StatementKind::Declare => &[PlanKind::Declare],
287            StatementKind::Delete => &[PlanKind::ReadThenWrite],
288            StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
289            StatementKind::DropObjects => &[PlanKind::DropObjects],
290            StatementKind::DropOwned => &[PlanKind::DropOwned],
291            StatementKind::Execute => &[PlanKind::Execute],
292            StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
293            StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
294            StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
295            StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
296            StatementKind::Fetch => &[PlanKind::Fetch],
297            StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
298            StatementKind::GrantRole => &[PlanKind::GrantRole],
299            StatementKind::Insert => &[PlanKind::Insert],
300            StatementKind::Prepare => &[PlanKind::Prepare],
301            StatementKind::Raise => &[PlanKind::Raise],
302            StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
303            StatementKind::ResetVariable => &[PlanKind::ResetVariable],
304            StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
305            StatementKind::RevokeRole => &[PlanKind::RevokeRole],
306            StatementKind::Rollback => &[PlanKind::AbortTransaction],
307            StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
308            StatementKind::SetTransaction => &[PlanKind::SetTransaction],
309            StatementKind::SetVariable => &[PlanKind::SetVariable],
310            StatementKind::Show => &[
311                PlanKind::Select,
312                PlanKind::ShowVariable,
313                PlanKind::ShowCreate,
314                PlanKind::ShowColumns,
315                PlanKind::ShowAllVariables,
316                PlanKind::InspectShard,
317            ],
318            StatementKind::StartTransaction => &[PlanKind::StartTransaction],
319            StatementKind::Subscribe => &[PlanKind::Subscribe],
320            StatementKind::Update => &[PlanKind::ReadThenWrite],
321            StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
322            StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
323        }
324    }
325
326    /// Returns a human readable name of the plan. Meant for use in messages sent back to a user.
327    pub fn name(&self) -> &str {
328        match self {
329            Plan::CreateConnection(_) => "create connection",
330            Plan::CreateDatabase(_) => "create database",
331            Plan::CreateSchema(_) => "create schema",
332            Plan::CreateRole(_) => "create role",
333            Plan::CreateCluster(_) => "create cluster",
334            Plan::CreateClusterReplica(_) => "create cluster replica",
335            Plan::CreateSource(_) => "create source",
336            Plan::CreateSources(_) => "create source",
337            Plan::CreateSecret(_) => "create secret",
338            Plan::CreateSink(_) => "create sink",
339            Plan::CreateTable(_) => "create table",
340            Plan::CreateView(_) => "create view",
341            Plan::CreateMaterializedView(_) => "create materialized view",
342            Plan::CreateContinualTask(_) => "create continual task",
343            Plan::CreateIndex(_) => "create index",
344            Plan::CreateType(_) => "create type",
345            Plan::CreateNetworkPolicy(_) => "create network policy",
346            Plan::Comment(_) => "comment",
347            Plan::DiscardTemp => "discard temp",
348            Plan::DiscardAll => "discard all",
349            Plan::DropObjects(plan) => match plan.object_type {
350                ObjectType::Table => "drop table",
351                ObjectType::View => "drop view",
352                ObjectType::MaterializedView => "drop materialized view",
353                ObjectType::Source => "drop source",
354                ObjectType::Sink => "drop sink",
355                ObjectType::Index => "drop index",
356                ObjectType::Type => "drop type",
357                ObjectType::Role => "drop roles",
358                ObjectType::Cluster => "drop clusters",
359                ObjectType::ClusterReplica => "drop cluster replicas",
360                ObjectType::Secret => "drop secret",
361                ObjectType::Connection => "drop connection",
362                ObjectType::Database => "drop database",
363                ObjectType::Schema => "drop schema",
364                ObjectType::Func => "drop function",
365                ObjectType::ContinualTask => "drop continual task",
366                ObjectType::NetworkPolicy => "drop network policy",
367            },
368            Plan::DropOwned(_) => "drop owned",
369            Plan::EmptyQuery => "do nothing",
370            Plan::ShowAllVariables => "show all variables",
371            Plan::ShowCreate(_) => "show create",
372            Plan::ShowColumns(_) => "show columns",
373            Plan::ShowVariable(_) => "show variable",
374            Plan::InspectShard(_) => "inspect shard",
375            Plan::SetVariable(_) => "set variable",
376            Plan::ResetVariable(_) => "reset variable",
377            Plan::SetTransaction(_) => "set transaction",
378            Plan::StartTransaction(_) => "start transaction",
379            Plan::CommitTransaction(_) => "commit",
380            Plan::AbortTransaction(_) => "abort",
381            Plan::Select(_) => "select",
382            Plan::Subscribe(_) => "subscribe",
383            Plan::CopyFrom(_) => "copy from",
384            Plan::CopyTo(_) => "copy to",
385            Plan::ExplainPlan(_) => "explain plan",
386            Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
387            Plan::ExplainTimestamp(_) => "explain timestamp",
388            Plan::ExplainSinkSchema(_) => "explain schema",
389            Plan::Insert(_) => "insert",
390            Plan::AlterNoop(plan) => match plan.object_type {
391                ObjectType::Table => "alter table",
392                ObjectType::View => "alter view",
393                ObjectType::MaterializedView => "alter materialized view",
394                ObjectType::Source => "alter source",
395                ObjectType::Sink => "alter sink",
396                ObjectType::Index => "alter index",
397                ObjectType::Type => "alter type",
398                ObjectType::Role => "alter role",
399                ObjectType::Cluster => "alter cluster",
400                ObjectType::ClusterReplica => "alter cluster replica",
401                ObjectType::Secret => "alter secret",
402                ObjectType::Connection => "alter connection",
403                ObjectType::Database => "alter database",
404                ObjectType::Schema => "alter schema",
405                ObjectType::Func => "alter function",
406                ObjectType::ContinualTask => "alter continual task",
407                ObjectType::NetworkPolicy => "alter network policy",
408            },
409            Plan::AlterCluster(_) => "alter cluster",
410            Plan::AlterClusterRename(_) => "alter cluster rename",
411            Plan::AlterClusterSwap(_) => "alter cluster swap",
412            Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
413            Plan::AlterSetCluster(_) => "alter set cluster",
414            Plan::AlterConnection(_) => "alter connection",
415            Plan::AlterSource(_) => "alter source",
416            Plan::AlterItemRename(_) => "rename item",
417            Plan::AlterSchemaRename(_) => "alter rename schema",
418            Plan::AlterSchemaSwap(_) => "alter swap schema",
419            Plan::AlterSecret(_) => "alter secret",
420            Plan::AlterSink(_) => "alter sink",
421            Plan::AlterSystemSet(_) => "alter system",
422            Plan::AlterSystemReset(_) => "alter system",
423            Plan::AlterSystemResetAll(_) => "alter system",
424            Plan::AlterRole(_) => "alter role",
425            Plan::AlterNetworkPolicy(_) => "alter network policy",
426            Plan::AlterOwner(plan) => match plan.object_type {
427                ObjectType::Table => "alter table owner",
428                ObjectType::View => "alter view owner",
429                ObjectType::MaterializedView => "alter materialized view owner",
430                ObjectType::Source => "alter source owner",
431                ObjectType::Sink => "alter sink owner",
432                ObjectType::Index => "alter index owner",
433                ObjectType::Type => "alter type owner",
434                ObjectType::Role => "alter role owner",
435                ObjectType::Cluster => "alter cluster owner",
436                ObjectType::ClusterReplica => "alter cluster replica owner",
437                ObjectType::Secret => "alter secret owner",
438                ObjectType::Connection => "alter connection owner",
439                ObjectType::Database => "alter database owner",
440                ObjectType::Schema => "alter schema owner",
441                ObjectType::Func => "alter function owner",
442                ObjectType::ContinualTask => "alter continual task owner",
443                ObjectType::NetworkPolicy => "alter network policy owner",
444            },
445            Plan::AlterTableAddColumn(_) => "alter table add column",
446            Plan::Declare(_) => "declare",
447            Plan::Fetch(_) => "fetch",
448            Plan::Close(_) => "close",
449            Plan::ReadThenWrite(plan) => match plan.kind {
450                MutationKind::Insert => "insert into select",
451                MutationKind::Update => "update",
452                MutationKind::Delete => "delete",
453            },
454            Plan::Prepare(_) => "prepare",
455            Plan::Execute(_) => "execute",
456            Plan::Deallocate(_) => "deallocate",
457            Plan::Raise(_) => "raise",
458            Plan::GrantRole(_) => "grant role",
459            Plan::RevokeRole(_) => "revoke role",
460            Plan::GrantPrivileges(_) => "grant privilege",
461            Plan::RevokePrivileges(_) => "revoke privilege",
462            Plan::AlterDefaultPrivileges(_) => "alter default privileges",
463            Plan::ReassignOwned(_) => "reassign owned",
464            Plan::SideEffectingFunc(_) => "side effecting func",
465            Plan::ValidateConnection(_) => "validate connection",
466            Plan::AlterRetainHistory(_) => "alter retain history",
467        }
468    }
469
470    /// Returns `true` iff this `Plan` is allowed to be executed in read-only
471    /// mode.
472    ///
473    /// We use an explicit allow-list, to avoid future additions automatically
474    /// falling into the `true` category.
475    pub fn allowed_in_read_only(&self) -> bool {
476        match self {
477            // These two set non-durable session variables, so are okay in
478            // read-only mode.
479            Plan::SetVariable(_) => true,
480            Plan::ResetVariable(_) => true,
481            Plan::SetTransaction(_) => true,
482            Plan::StartTransaction(_) => true,
483            Plan::CommitTransaction(_) => true,
484            Plan::AbortTransaction(_) => true,
485            Plan::Select(_) => true,
486            Plan::EmptyQuery => true,
487            Plan::ShowAllVariables => true,
488            Plan::ShowCreate(_) => true,
489            Plan::ShowColumns(_) => true,
490            Plan::ShowVariable(_) => true,
491            Plan::InspectShard(_) => true,
492            Plan::Subscribe(_) => true,
493            Plan::CopyTo(_) => true,
494            Plan::ExplainPlan(_) => true,
495            Plan::ExplainPushdown(_) => true,
496            Plan::ExplainTimestamp(_) => true,
497            Plan::ExplainSinkSchema(_) => true,
498            Plan::ValidateConnection(_) => true,
499            _ => false,
500        }
501    }
502}
503
504#[derive(Debug)]
505pub struct StartTransactionPlan {
506    pub access: Option<TransactionAccessMode>,
507    pub isolation_level: Option<TransactionIsolationLevel>,
508}
509
510#[derive(Debug)]
511pub enum TransactionType {
512    Explicit,
513    Implicit,
514}
515
516impl TransactionType {
517    pub fn is_explicit(&self) -> bool {
518        matches!(self, TransactionType::Explicit)
519    }
520
521    pub fn is_implicit(&self) -> bool {
522        matches!(self, TransactionType::Implicit)
523    }
524}
525
526#[derive(Debug)]
527pub struct CommitTransactionPlan {
528    pub transaction_type: TransactionType,
529}
530
531#[derive(Debug)]
532pub struct AbortTransactionPlan {
533    pub transaction_type: TransactionType,
534}
535
536#[derive(Debug)]
537pub struct CreateDatabasePlan {
538    pub name: String,
539    pub if_not_exists: bool,
540}
541
542#[derive(Debug)]
543pub struct CreateSchemaPlan {
544    pub database_spec: ResolvedDatabaseSpecifier,
545    pub schema_name: String,
546    pub if_not_exists: bool,
547}
548
549#[derive(Debug)]
550pub struct CreateRolePlan {
551    pub name: String,
552    pub attributes: RoleAttributes,
553}
554
555#[derive(Debug, PartialEq, Eq, Clone)]
556pub struct CreateClusterPlan {
557    pub name: String,
558    pub variant: CreateClusterVariant,
559    pub workload_class: Option<String>,
560}
561
562#[derive(Debug, PartialEq, Eq, Clone)]
563pub enum CreateClusterVariant {
564    Managed(CreateClusterManagedPlan),
565    Unmanaged(CreateClusterUnmanagedPlan),
566}
567
568#[derive(Debug, PartialEq, Eq, Clone)]
569pub struct CreateClusterUnmanagedPlan {
570    pub replicas: Vec<(String, ReplicaConfig)>,
571}
572
573#[derive(Debug, PartialEq, Eq, Clone)]
574pub struct CreateClusterManagedPlan {
575    pub replication_factor: u32,
576    pub size: String,
577    pub availability_zones: Vec<String>,
578    pub compute: ComputeReplicaConfig,
579    pub disk: bool,
580    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
581    pub schedule: ClusterSchedule,
582}
583
584#[derive(Debug)]
585pub struct CreateClusterReplicaPlan {
586    pub cluster_id: ClusterId,
587    pub name: String,
588    pub config: ReplicaConfig,
589}
590
591/// Configuration of introspection for a cluster replica.
592#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq)]
593pub struct ComputeReplicaIntrospectionConfig {
594    /// Whether to introspect the introspection.
595    pub debugging: bool,
596    /// The interval at which to introspect.
597    pub interval: Duration,
598}
599
600#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
601pub struct ComputeReplicaConfig {
602    pub introspection: Option<ComputeReplicaIntrospectionConfig>,
603}
604
605#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
606pub enum ReplicaConfig {
607    Unorchestrated {
608        storagectl_addrs: Vec<String>,
609        storage_addrs: Vec<String>,
610        computectl_addrs: Vec<String>,
611        compute_addrs: Vec<String>,
612        workers: usize,
613        compute: ComputeReplicaConfig,
614    },
615    Orchestrated {
616        size: String,
617        availability_zone: Option<String>,
618        compute: ComputeReplicaConfig,
619        disk: bool,
620        internal: bool,
621        billed_as: Option<String>,
622    },
623}
624
625#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
626pub enum ClusterSchedule {
627    /// The system won't automatically turn the cluster On or Off.
628    Manual,
629    /// The cluster will be On when a REFRESH materialized view on it needs to refresh.
630    /// `hydration_time_estimate` determines how much time before a refresh to turn the
631    /// cluster On, so that it can rehydrate already before the refresh time.
632    Refresh { hydration_time_estimate: Duration },
633}
634
635impl Default for ClusterSchedule {
636    fn default() -> Self {
637        // (Has to be consistent with `impl Default for ClusterScheduleOptionValue`.)
638        ClusterSchedule::Manual
639    }
640}
641
642#[derive(Debug)]
643pub struct CreateSourcePlan {
644    pub name: QualifiedItemName,
645    pub source: Source,
646    pub if_not_exists: bool,
647    pub timeline: Timeline,
648    // None for subsources, which run on the parent cluster.
649    pub in_cluster: Option<ClusterId>,
650}
651
652#[derive(Clone, Debug, PartialEq, Eq)]
653pub struct SourceReferences {
654    pub updated_at: u64,
655    pub references: Vec<SourceReference>,
656}
657
658/// An available external reference for a source and if possible to retrieve,
659/// any column names it contains.
660#[derive(Clone, Debug, PartialEq, Eq)]
661pub struct SourceReference {
662    pub name: String,
663    pub namespace: Option<String>,
664    pub columns: Vec<String>,
665}
666
667/// A [`CreateSourcePlan`] and the metadata necessary to sequence it.
668#[derive(Debug)]
669pub struct CreateSourcePlanBundle {
670    /// ID of this source in the Catalog.
671    pub item_id: CatalogItemId,
672    /// ID used to reference this source from outside the catalog, e.g. compute.
673    pub global_id: GlobalId,
674    /// Details of the source to create.
675    pub plan: CreateSourcePlan,
676    /// Other catalog objects that are referenced by this source, determined at name resolution.
677    pub resolved_ids: ResolvedIds,
678    /// All the available upstream references for this source.
679    /// Populated for top-level sources that can contain subsources/tables
680    /// and used during sequencing to populate the appropriate catalog fields.
681    pub available_source_references: Option<SourceReferences>,
682}
683
684#[derive(Debug)]
685pub struct CreateConnectionPlan {
686    pub name: QualifiedItemName,
687    pub if_not_exists: bool,
688    pub connection: Connection,
689    pub validate: bool,
690}
691
692#[derive(Debug)]
693pub struct ValidateConnectionPlan {
694    /// ID of the connection in the Catalog.
695    pub id: CatalogItemId,
696    /// The connection to validate.
697    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
698}
699
700#[derive(Debug)]
701pub struct CreateSecretPlan {
702    pub name: QualifiedItemName,
703    pub secret: Secret,
704    pub if_not_exists: bool,
705}
706
707#[derive(Debug)]
708pub struct CreateSinkPlan {
709    pub name: QualifiedItemName,
710    pub sink: Sink,
711    pub with_snapshot: bool,
712    pub if_not_exists: bool,
713    pub in_cluster: ClusterId,
714}
715
716#[derive(Debug)]
717pub struct CreateTablePlan {
718    pub name: QualifiedItemName,
719    pub table: Table,
720    pub if_not_exists: bool,
721}
722
723#[derive(Debug, Clone)]
724pub struct CreateViewPlan {
725    pub name: QualifiedItemName,
726    pub view: View,
727    /// The Catalog objects that this view is replacing, if any.
728    pub replace: Option<CatalogItemId>,
729    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
730    pub drop_ids: Vec<CatalogItemId>,
731    pub if_not_exists: bool,
732    /// True if the view contains an expression that can make the exact column list
733    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
734    pub ambiguous_columns: bool,
735}
736
737#[derive(Debug, Clone)]
738pub struct CreateMaterializedViewPlan {
739    pub name: QualifiedItemName,
740    pub materialized_view: MaterializedView,
741    /// The Catalog objects that this materialized view is replacing, if any.
742    pub replace: Option<CatalogItemId>,
743    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
744    pub drop_ids: Vec<CatalogItemId>,
745    pub if_not_exists: bool,
746    /// True if the materialized view contains an expression that can make the exact column list
747    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
748    pub ambiguous_columns: bool,
749}
750
751#[derive(Debug, Clone)]
752pub struct CreateContinualTaskPlan {
753    pub name: QualifiedItemName,
754    /// During initial creation, the `LocalId` placeholder for this CT in `continual_task.expr`.
755    /// None on restart.
756    pub placeholder_id: Option<mz_expr::LocalId>,
757    pub desc: RelationDesc,
758    /// ID of the collection we read into this continual task.
759    pub input_id: GlobalId,
760    pub with_snapshot: bool,
761    /// Definition for the continual task.
762    pub continual_task: MaterializedView,
763}
764
765#[derive(Debug, Clone)]
766pub struct CreateNetworkPolicyPlan {
767    pub name: String,
768    pub rules: Vec<NetworkPolicyRule>,
769}
770
771#[derive(Debug, Clone)]
772pub struct AlterNetworkPolicyPlan {
773    pub id: NetworkPolicyId,
774    pub name: String,
775    pub rules: Vec<NetworkPolicyRule>,
776}
777
778#[derive(Debug, Clone)]
779pub struct CreateIndexPlan {
780    pub name: QualifiedItemName,
781    pub index: Index,
782    pub if_not_exists: bool,
783}
784
785#[derive(Debug)]
786pub struct CreateTypePlan {
787    pub name: QualifiedItemName,
788    pub typ: Type,
789}
790
791#[derive(Debug)]
792pub struct DropObjectsPlan {
793    /// The IDs of only the objects directly referenced in the `DROP` statement.
794    pub referenced_ids: Vec<ObjectId>,
795    /// All object IDs to drop. Includes `referenced_ids` and all descendants.
796    pub drop_ids: Vec<ObjectId>,
797    /// The type of object that was dropped explicitly in the DROP statement. `ids` may contain
798    /// objects of different types due to CASCADE.
799    pub object_type: ObjectType,
800}
801
802#[derive(Debug)]
803pub struct DropOwnedPlan {
804    /// The role IDs that own the objects.
805    pub role_ids: Vec<RoleId>,
806    /// All object IDs to drop.
807    pub drop_ids: Vec<ObjectId>,
808    /// The privileges to revoke.
809    pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
810    /// The default privileges to revoke.
811    pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
812}
813
814#[derive(Debug)]
815pub struct ShowVariablePlan {
816    pub name: String,
817}
818
819#[derive(Debug)]
820pub struct InspectShardPlan {
821    /// ID of the storage collection to inspect.
822    pub id: GlobalId,
823}
824
825#[derive(Debug)]
826pub struct SetVariablePlan {
827    pub name: String,
828    pub value: VariableValue,
829    pub local: bool,
830}
831
832#[derive(Debug)]
833pub enum VariableValue {
834    Default,
835    Values(Vec<String>),
836}
837
838#[derive(Debug)]
839pub struct ResetVariablePlan {
840    pub name: String,
841}
842
843#[derive(Debug)]
844pub struct SetTransactionPlan {
845    pub local: bool,
846    pub modes: Vec<TransactionMode>,
847}
848
849/// A plan for select statements.
850#[derive(Clone, Debug)]
851pub struct SelectPlan {
852    /// The `SELECT` statement itself. Used for explain/notices, but not otherwise
853    /// load-bearing. Boxed to save stack space.
854    pub select: Option<Box<SelectStatement<Aug>>>,
855    /// The plan as a HIR.
856    pub source: HirRelationExpr,
857    /// At what time should this select happen?
858    pub when: QueryWhen,
859    /// Instructions how to form the result set.
860    pub finishing: RowSetFinishing,
861    /// For `COPY TO`, the format to use.
862    pub copy_to: Option<CopyFormat>,
863}
864
865#[derive(Debug)]
866pub enum SubscribeOutput {
867    Diffs,
868    WithinTimestampOrderBy {
869        /// We pretend that mz_diff is prepended to the normal columns, making it index 0
870        order_by: Vec<ColumnOrder>,
871    },
872    EnvelopeUpsert {
873        /// Order by with just keys
874        order_by_keys: Vec<ColumnOrder>,
875    },
876    EnvelopeDebezium {
877        /// Order by with just keys
878        order_by_keys: Vec<ColumnOrder>,
879    },
880}
881
882#[derive(Debug)]
883pub struct SubscribePlan {
884    pub from: SubscribeFrom,
885    pub with_snapshot: bool,
886    pub when: QueryWhen,
887    pub up_to: Option<MirScalarExpr>,
888    pub copy_to: Option<CopyFormat>,
889    pub emit_progress: bool,
890    pub output: SubscribeOutput,
891}
892
893#[derive(Debug, Clone)]
894pub enum SubscribeFrom {
895    /// ID of the collection to subscribe to.
896    Id(GlobalId),
897    /// Query to subscribe to.
898    Query {
899        expr: MirRelationExpr,
900        desc: RelationDesc,
901    },
902}
903
904impl SubscribeFrom {
905    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
906        match self {
907            SubscribeFrom::Id(id) => BTreeSet::from([*id]),
908            SubscribeFrom::Query { expr, .. } => expr.depends_on(),
909        }
910    }
911
912    pub fn contains_temporal(&self) -> bool {
913        match self {
914            SubscribeFrom::Id(_) => false,
915            SubscribeFrom::Query { expr, .. } => expr.contains_temporal(),
916        }
917    }
918}
919
920#[derive(Debug)]
921pub struct ShowCreatePlan {
922    pub id: ObjectId,
923    pub row: Row,
924}
925
926#[derive(Debug)]
927pub struct ShowColumnsPlan {
928    pub id: CatalogItemId,
929    pub select_plan: SelectPlan,
930    pub new_resolved_ids: ResolvedIds,
931}
932
933#[derive(Debug)]
934pub struct CopyFromPlan {
935    /// Table we're copying into.
936    pub id: CatalogItemId,
937    /// Source we're copying data from.
938    pub source: CopyFromSource,
939    /// How input columns map to those on the destination table.
940    ///
941    /// TODO(cf2): Remove this field in favor of the mfp.
942    pub columns: Vec<ColumnIndex>,
943    /// [`RelationDesc`] describing the input data.
944    pub source_desc: RelationDesc,
945    /// Changes the shape of the input data to match the destination table.
946    pub mfp: MapFilterProject,
947    /// Format specific params for copying the input data.
948    pub params: CopyFormatParams<'static>,
949    /// Filter for the source files we're copying from, e.g. an S3 prefix.
950    pub filter: Option<CopyFromFilter>,
951}
952
953#[derive(Debug)]
954pub enum CopyFromSource {
955    /// Copying from a file local to the user, transmitted via pgwire.
956    Stdin,
957    /// A remote resource, e.g. HTTP file.
958    ///
959    /// The contained [`HirScalarExpr`] evaluates to the Url for the remote resource.
960    Url(HirScalarExpr),
961    /// A file in an S3 bucket.
962    AwsS3 {
963        /// Expression that evaluates to the file we want to copy.
964        uri: HirScalarExpr,
965        /// Details for how we connect to AWS S3.
966        connection: AwsConnection,
967        /// ID of the connection object.
968        connection_id: CatalogItemId,
969    },
970}
971
972#[derive(Debug)]
973pub enum CopyFromFilter {
974    Files(Vec<String>),
975    Pattern(String),
976}
977
978#[derive(Debug, Clone)]
979pub struct CopyToPlan {
980    /// The select query plan whose data will be copied to destination uri.
981    pub select_plan: SelectPlan,
982    pub desc: RelationDesc,
983    /// The scalar expression to be resolved to get the destination uri.
984    pub to: HirScalarExpr,
985    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
986    /// The ID of the connection.
987    pub connection_id: CatalogItemId,
988    pub format: S3SinkFormat,
989    pub max_file_size: u64,
990}
991
992#[derive(Clone, Debug)]
993pub struct ExplainPlanPlan {
994    pub stage: ExplainStage,
995    pub format: ExplainFormat,
996    pub config: ExplainConfig,
997    pub explainee: Explainee,
998}
999
1000/// The type of object to be explained
1001#[derive(Clone, Debug)]
1002pub enum Explainee {
1003    /// Lookup and explain a plan saved for an view.
1004    View(CatalogItemId),
1005    /// Lookup and explain a plan saved for an existing materialized view.
1006    MaterializedView(CatalogItemId),
1007    /// Lookup and explain a plan saved for an existing index.
1008    Index(CatalogItemId),
1009    /// Replan an existing view.
1010    ReplanView(CatalogItemId),
1011    /// Replan an existing materialized view.
1012    ReplanMaterializedView(CatalogItemId),
1013    /// Replan an existing index.
1014    ReplanIndex(CatalogItemId),
1015    /// A SQL statement.
1016    Statement(ExplaineeStatement),
1017}
1018
1019/// Explainee types that are statements.
1020#[derive(Clone, Debug, EnumKind)]
1021#[enum_kind(ExplaineeStatementKind)]
1022pub enum ExplaineeStatement {
1023    /// The object to be explained is a SELECT statement.
1024    Select {
1025        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1026        broken: bool,
1027        plan: plan::SelectPlan,
1028        desc: RelationDesc,
1029    },
1030    /// The object to be explained is a CREATE VIEW.
1031    CreateView {
1032        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1033        broken: bool,
1034        plan: plan::CreateViewPlan,
1035    },
1036    /// The object to be explained is a CREATE MATERIALIZED VIEW.
1037    CreateMaterializedView {
1038        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1039        broken: bool,
1040        plan: plan::CreateMaterializedViewPlan,
1041    },
1042    /// The object to be explained is a CREATE INDEX.
1043    CreateIndex {
1044        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1045        broken: bool,
1046        plan: plan::CreateIndexPlan,
1047    },
1048}
1049
1050impl ExplaineeStatement {
1051    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1052        match self {
1053            Self::Select { plan, .. } => plan.source.depends_on(),
1054            Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1055            Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1056            Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1057        }
1058    }
1059
1060    /// Statements that have their `broken` flag set are expected to cause a
1061    /// panic in the optimizer code. In this case:
1062    ///
1063    /// 1. The optimizer pipeline execution will stop, but the panic will be
1064    ///    intercepted and will not propagate to the caller. The partial
1065    ///    optimizer trace collected until this point will be available.
1066    /// 2. The optimizer trace tracing subscriber will delegate regular tracing
1067    ///    spans and events to the default subscriber.
1068    ///
1069    /// This is useful when debugging queries that cause panics.
1070    pub fn broken(&self) -> bool {
1071        match self {
1072            Self::Select { broken, .. } => *broken,
1073            Self::CreateView { broken, .. } => *broken,
1074            Self::CreateMaterializedView { broken, .. } => *broken,
1075            Self::CreateIndex { broken, .. } => *broken,
1076        }
1077    }
1078}
1079
1080impl ExplaineeStatementKind {
1081    pub fn supports(&self, stage: &ExplainStage) -> bool {
1082        use ExplainStage::*;
1083        match self {
1084            Self::Select => true,
1085            Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1086            Self::CreateMaterializedView => true,
1087            Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1088        }
1089    }
1090}
1091
1092impl std::fmt::Display for ExplaineeStatementKind {
1093    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1094        match self {
1095            Self::Select => write!(f, "SELECT"),
1096            Self::CreateView => write!(f, "CREATE VIEW"),
1097            Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1098            Self::CreateIndex => write!(f, "CREATE INDEX"),
1099        }
1100    }
1101}
1102
1103#[derive(Clone, Debug)]
1104pub struct ExplainPushdownPlan {
1105    pub explainee: Explainee,
1106}
1107
1108#[derive(Clone, Debug)]
1109pub struct ExplainTimestampPlan {
1110    pub format: ExplainFormat,
1111    pub raw_plan: HirRelationExpr,
1112    pub when: QueryWhen,
1113}
1114
1115#[derive(Debug)]
1116pub struct ExplainSinkSchemaPlan {
1117    pub sink_from: GlobalId,
1118    pub json_schema: String,
1119}
1120
1121#[derive(Debug)]
1122pub struct SendDiffsPlan {
1123    pub id: CatalogItemId,
1124    pub updates: Vec<(Row, Diff)>,
1125    pub kind: MutationKind,
1126    pub returning: Vec<(Row, NonZeroUsize)>,
1127    pub max_result_size: u64,
1128}
1129
1130#[derive(Debug)]
1131pub struct InsertPlan {
1132    pub id: CatalogItemId,
1133    pub values: HirRelationExpr,
1134    pub returning: Vec<mz_expr::MirScalarExpr>,
1135}
1136
1137#[derive(Debug)]
1138pub struct ReadThenWritePlan {
1139    pub id: CatalogItemId,
1140    pub selection: HirRelationExpr,
1141    pub finishing: RowSetFinishing,
1142    pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1143    pub kind: MutationKind,
1144    pub returning: Vec<mz_expr::MirScalarExpr>,
1145}
1146
1147/// Generated by `ALTER ... IF EXISTS` if the named object did not exist.
1148#[derive(Debug)]
1149pub struct AlterNoopPlan {
1150    pub object_type: ObjectType,
1151}
1152
1153#[derive(Debug)]
1154pub struct AlterSetClusterPlan {
1155    pub id: CatalogItemId,
1156    pub set_cluster: ClusterId,
1157}
1158
1159#[derive(Debug)]
1160pub struct AlterRetainHistoryPlan {
1161    pub id: CatalogItemId,
1162    pub value: Option<Value>,
1163    pub window: CompactionWindow,
1164    pub object_type: ObjectType,
1165}
1166
1167#[derive(Debug, Clone)]
1168
1169pub enum AlterOptionParameter<T = String> {
1170    Set(T),
1171    Reset,
1172    Unchanged,
1173}
1174
1175#[derive(Debug)]
1176pub enum AlterConnectionAction {
1177    RotateKeys,
1178    AlterOptions {
1179        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1180        drop_options: BTreeSet<ConnectionOptionName>,
1181        validate: bool,
1182    },
1183}
1184
1185#[derive(Debug)]
1186pub struct AlterConnectionPlan {
1187    pub id: CatalogItemId,
1188    pub action: AlterConnectionAction,
1189}
1190
1191#[derive(Debug)]
1192pub enum AlterSourceAction {
1193    AddSubsourceExports {
1194        subsources: Vec<CreateSourcePlanBundle>,
1195        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1196    },
1197    RefreshReferences {
1198        references: SourceReferences,
1199    },
1200}
1201
1202#[derive(Debug)]
1203pub struct AlterSourcePlan {
1204    pub item_id: CatalogItemId,
1205    pub ingestion_id: GlobalId,
1206    pub action: AlterSourceAction,
1207}
1208
1209#[derive(Debug, Clone)]
1210pub struct AlterSinkPlan {
1211    pub item_id: CatalogItemId,
1212    pub global_id: GlobalId,
1213    pub sink: Sink,
1214    pub with_snapshot: bool,
1215    pub in_cluster: ClusterId,
1216}
1217
1218#[derive(Debug, Clone)]
1219pub struct AlterClusterPlan {
1220    pub id: ClusterId,
1221    pub name: String,
1222    pub options: PlanClusterOption,
1223    pub strategy: AlterClusterPlanStrategy,
1224}
1225
1226#[derive(Debug)]
1227pub struct AlterClusterRenamePlan {
1228    pub id: ClusterId,
1229    pub name: String,
1230    pub to_name: String,
1231}
1232
1233#[derive(Debug)]
1234pub struct AlterClusterReplicaRenamePlan {
1235    pub cluster_id: ClusterId,
1236    pub replica_id: ReplicaId,
1237    pub name: QualifiedReplica,
1238    pub to_name: String,
1239}
1240
1241#[derive(Debug)]
1242pub struct AlterItemRenamePlan {
1243    pub id: CatalogItemId,
1244    pub current_full_name: FullItemName,
1245    pub to_name: String,
1246    pub object_type: ObjectType,
1247}
1248
1249#[derive(Debug)]
1250pub struct AlterSchemaRenamePlan {
1251    pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1252    pub new_schema_name: String,
1253}
1254
1255#[derive(Debug)]
1256pub struct AlterSchemaSwapPlan {
1257    pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1258    pub schema_a_name: String,
1259    pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1260    pub schema_b_name: String,
1261    pub name_temp: String,
1262}
1263
1264#[derive(Debug)]
1265pub struct AlterClusterSwapPlan {
1266    pub id_a: ClusterId,
1267    pub id_b: ClusterId,
1268    pub name_a: String,
1269    pub name_b: String,
1270    pub name_temp: String,
1271}
1272
1273#[derive(Debug)]
1274pub struct AlterSecretPlan {
1275    pub id: CatalogItemId,
1276    pub secret_as: MirScalarExpr,
1277}
1278
1279#[derive(Debug)]
1280pub struct AlterSystemSetPlan {
1281    pub name: String,
1282    pub value: VariableValue,
1283}
1284
1285#[derive(Debug)]
1286pub struct AlterSystemResetPlan {
1287    pub name: String,
1288}
1289
1290#[derive(Debug)]
1291pub struct AlterSystemResetAllPlan {}
1292
1293#[derive(Debug)]
1294pub struct AlterRolePlan {
1295    pub id: RoleId,
1296    pub name: String,
1297    pub option: PlannedAlterRoleOption,
1298}
1299
1300#[derive(Debug)]
1301pub struct AlterOwnerPlan {
1302    pub id: ObjectId,
1303    pub object_type: ObjectType,
1304    pub new_owner: RoleId,
1305}
1306
1307#[derive(Debug)]
1308pub struct AlterTablePlan {
1309    pub relation_id: CatalogItemId,
1310    pub column_name: ColumnName,
1311    pub column_type: ColumnType,
1312    pub raw_sql_type: RawDataType,
1313}
1314
1315#[derive(Debug)]
1316pub struct DeclarePlan {
1317    pub name: String,
1318    pub stmt: Statement<Raw>,
1319    pub sql: String,
1320    pub params: Params,
1321}
1322
1323#[derive(Debug)]
1324pub struct FetchPlan {
1325    pub name: String,
1326    pub count: Option<FetchDirection>,
1327    pub timeout: ExecuteTimeout,
1328}
1329
1330#[derive(Debug)]
1331pub struct ClosePlan {
1332    pub name: String,
1333}
1334
1335#[derive(Debug)]
1336pub struct PreparePlan {
1337    pub name: String,
1338    pub stmt: Statement<Raw>,
1339    pub sql: String,
1340    pub desc: StatementDesc,
1341}
1342
1343#[derive(Debug)]
1344pub struct ExecutePlan {
1345    pub name: String,
1346    pub params: Params,
1347}
1348
1349#[derive(Debug)]
1350pub struct DeallocatePlan {
1351    pub name: Option<String>,
1352}
1353
1354#[derive(Debug)]
1355pub struct RaisePlan {
1356    pub severity: NoticeSeverity,
1357}
1358
1359#[derive(Debug)]
1360pub struct GrantRolePlan {
1361    /// The roles that are gaining members.
1362    pub role_ids: Vec<RoleId>,
1363    /// The roles that will be added to `role_id`.
1364    pub member_ids: Vec<RoleId>,
1365    /// The role that granted the membership.
1366    pub grantor_id: RoleId,
1367}
1368
1369#[derive(Debug)]
1370pub struct RevokeRolePlan {
1371    /// The roles that are losing members.
1372    pub role_ids: Vec<RoleId>,
1373    /// The roles that will be removed from `role_id`.
1374    pub member_ids: Vec<RoleId>,
1375    /// The role that revoked the membership.
1376    pub grantor_id: RoleId,
1377}
1378
1379#[derive(Debug)]
1380pub struct UpdatePrivilege {
1381    /// The privileges being granted/revoked on an object.
1382    pub acl_mode: AclMode,
1383    /// The ID of the object receiving privileges.
1384    pub target_id: SystemObjectId,
1385    /// The role that is granting the privileges.
1386    pub grantor: RoleId,
1387}
1388
1389#[derive(Debug)]
1390pub struct GrantPrivilegesPlan {
1391    /// Description of each privilege being granted.
1392    pub update_privileges: Vec<UpdatePrivilege>,
1393    /// The roles that will granted the privileges.
1394    pub grantees: Vec<RoleId>,
1395}
1396
1397#[derive(Debug)]
1398pub struct RevokePrivilegesPlan {
1399    /// Description of each privilege being revoked.
1400    pub update_privileges: Vec<UpdatePrivilege>,
1401    /// The roles that will have privileges revoked.
1402    pub revokees: Vec<RoleId>,
1403}
1404#[derive(Debug)]
1405pub struct AlterDefaultPrivilegesPlan {
1406    /// Description of objects that match this default privilege.
1407    pub privilege_objects: Vec<DefaultPrivilegeObject>,
1408    /// The privilege to be granted/revoked from the matching objects.
1409    pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1410    /// Whether this is a grant or revoke.
1411    pub is_grant: bool,
1412}
1413
1414#[derive(Debug)]
1415pub struct ReassignOwnedPlan {
1416    /// The roles whose owned objects are being reassigned.
1417    pub old_roles: Vec<RoleId>,
1418    /// The new owner of the objects.
1419    pub new_role: RoleId,
1420    /// All object IDs to reassign.
1421    pub reassign_ids: Vec<ObjectId>,
1422}
1423
1424#[derive(Debug)]
1425pub struct CommentPlan {
1426    /// The object that this comment is associated with.
1427    pub object_id: CommentObjectId,
1428    /// A sub-component of the object that this comment is associated with, e.g. a column.
1429    ///
1430    /// TODO(parkmycar): <https://github.com/MaterializeInc/database-issues/issues/6711>.
1431    pub sub_component: Option<usize>,
1432    /// The comment itself. If `None` that indicates we should clear the existing comment.
1433    pub comment: Option<String>,
1434}
1435
1436#[derive(Clone, Debug)]
1437pub enum TableDataSource {
1438    /// The table owns data created via INSERT/UPDATE/DELETE statements.
1439    TableWrites { defaults: Vec<Expr<Aug>> },
1440
1441    /// The table receives its data from the identified `DataSourceDesc`.
1442    /// This table type does not support INSERT/UPDATE/DELETE statements.
1443    DataSource {
1444        desc: DataSourceDesc,
1445        timeline: Timeline,
1446    },
1447}
1448
1449#[derive(Clone, Debug)]
1450pub struct Table {
1451    pub create_sql: String,
1452    pub desc: VersionedRelationDesc,
1453    pub temporary: bool,
1454    pub compaction_window: Option<CompactionWindow>,
1455    pub data_source: TableDataSource,
1456}
1457
1458#[derive(Clone, Debug)]
1459pub struct Source {
1460    pub create_sql: String,
1461    pub data_source: DataSourceDesc,
1462    pub desc: RelationDesc,
1463    pub compaction_window: Option<CompactionWindow>,
1464}
1465
1466#[derive(Debug, Clone)]
1467pub enum DataSourceDesc {
1468    /// Receives data from an external system.
1469    Ingestion(Ingestion),
1470    /// This source receives its data from the identified ingestion,
1471    /// specifically the output identified by `external_reference`.
1472    IngestionExport {
1473        ingestion_id: CatalogItemId,
1474        external_reference: UnresolvedItemName,
1475        details: SourceExportDetails,
1476        data_config: SourceExportDataConfig<ReferencedConnection>,
1477    },
1478    /// Receives data from the source's reclocking/remapping operations.
1479    Progress,
1480    /// Receives data from HTTP post requests.
1481    Webhook {
1482        validate_using: Option<WebhookValidation>,
1483        body_format: WebhookBodyFormat,
1484        headers: WebhookHeaders,
1485        /// Only `Some` when created via `CREATE TABLE ... FROM WEBHOOK`.
1486        cluster_id: Option<StorageInstanceId>,
1487    },
1488}
1489
1490#[derive(Clone, Debug, Serialize, Deserialize)]
1491pub struct Ingestion {
1492    pub desc: SourceDesc<ReferencedConnection>,
1493    pub progress_subsource: CatalogItemId,
1494}
1495
1496#[derive(Clone, Debug, Serialize)]
1497pub struct WebhookValidation {
1498    /// The expression used to validate a request.
1499    pub expression: MirScalarExpr,
1500    /// Description of the source that will be created.
1501    pub relation_desc: RelationDesc,
1502    /// The column index to provide the request body and whether to provide it as bytes.
1503    pub bodies: Vec<(usize, bool)>,
1504    /// The column index to provide the request headers and whether to provide the values as bytes.
1505    pub headers: Vec<(usize, bool)>,
1506    /// Any secrets that are used in that validation.
1507    pub secrets: Vec<WebhookValidationSecret>,
1508}
1509
1510impl WebhookValidation {
1511    const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1512
1513    /// Attempt to reduce the internal [`MirScalarExpr`] into a simpler expression.
1514    ///
1515    /// The reduction happens on a separate thread, we also only wait for
1516    /// `WebhookValidation::MAX_REDUCE_TIME` before timing out and returning an error.
1517    pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1518        let WebhookValidation {
1519            expression,
1520            relation_desc,
1521            ..
1522        } = self;
1523
1524        // On a different thread, attempt to reduce the expression.
1525        let mut expression_ = expression.clone();
1526        let desc_ = relation_desc.clone();
1527        let reduce_task = mz_ore::task::spawn_blocking(
1528            || "webhook-validation-reduce",
1529            move || {
1530                expression_.reduce(&desc_.typ().column_types);
1531                expression_
1532            },
1533        );
1534
1535        match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1536            Ok(Ok(reduced_expr)) => {
1537                *expression = reduced_expr;
1538                Ok(())
1539            }
1540            Ok(Err(_)) => Err("joining task"),
1541            Err(_) => Err("timeout"),
1542        }
1543    }
1544}
1545
1546#[derive(Clone, Debug, Default, Serialize)]
1547pub struct WebhookHeaders {
1548    /// Optionally include a column named `headers` whose content is possibly filtered.
1549    pub header_column: Option<WebhookHeaderFilters>,
1550    /// The column index to provide the specific request header, and whether to provide it as bytes.
1551    pub mapped_headers: BTreeMap<usize, (String, bool)>,
1552}
1553
1554impl WebhookHeaders {
1555    /// Returns the number of columns needed to represent our headers.
1556    pub fn num_columns(&self) -> usize {
1557        let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1558        let mapped_headers = self.mapped_headers.len();
1559
1560        header_column + mapped_headers
1561    }
1562}
1563
1564#[derive(Clone, Debug, Default, Serialize)]
1565pub struct WebhookHeaderFilters {
1566    pub block: BTreeSet<String>,
1567    pub allow: BTreeSet<String>,
1568}
1569
1570#[derive(Copy, Clone, Debug, Serialize, Arbitrary)]
1571pub enum WebhookBodyFormat {
1572    Json { array: bool },
1573    Bytes,
1574    Text,
1575}
1576
1577impl From<WebhookBodyFormat> for ScalarType {
1578    fn from(value: WebhookBodyFormat) -> Self {
1579        match value {
1580            WebhookBodyFormat::Json { .. } => ScalarType::Jsonb,
1581            WebhookBodyFormat::Bytes => ScalarType::Bytes,
1582            WebhookBodyFormat::Text => ScalarType::String,
1583        }
1584    }
1585}
1586
1587#[derive(Clone, Debug, Serialize)]
1588pub struct WebhookValidationSecret {
1589    /// Identifies the secret by [`CatalogItemId`].
1590    pub id: CatalogItemId,
1591    /// Column index for the expression context that this secret was originally evaluated in.
1592    pub column_idx: usize,
1593    /// Whether or not this secret should be provided to the expression as Bytes or a String.
1594    pub use_bytes: bool,
1595}
1596
1597#[derive(Clone, Debug)]
1598pub struct Connection {
1599    pub create_sql: String,
1600    pub details: ConnectionDetails,
1601}
1602
1603#[derive(Clone, Debug, Serialize)]
1604pub enum ConnectionDetails {
1605    Kafka(KafkaConnection<ReferencedConnection>),
1606    Csr(CsrConnection<ReferencedConnection>),
1607    Postgres(PostgresConnection<ReferencedConnection>),
1608    Ssh {
1609        connection: SshConnection,
1610        key_1: SshKey,
1611        key_2: SshKey,
1612    },
1613    Aws(AwsConnection),
1614    AwsPrivatelink(AwsPrivatelinkConnection),
1615    MySql(MySqlConnection<ReferencedConnection>),
1616    SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1617}
1618
1619impl ConnectionDetails {
1620    pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1621        match self {
1622            ConnectionDetails::Kafka(c) => {
1623                mz_storage_types::connections::Connection::Kafka(c.clone())
1624            }
1625            ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1626            ConnectionDetails::Postgres(c) => {
1627                mz_storage_types::connections::Connection::Postgres(c.clone())
1628            }
1629            ConnectionDetails::Ssh { connection, .. } => {
1630                mz_storage_types::connections::Connection::Ssh(connection.clone())
1631            }
1632            ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1633            ConnectionDetails::AwsPrivatelink(c) => {
1634                mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1635            }
1636            ConnectionDetails::MySql(c) => {
1637                mz_storage_types::connections::Connection::MySql(c.clone())
1638            }
1639            ConnectionDetails::SqlServer(c) => {
1640                mz_storage_types::connections::Connection::SqlServer(c.clone())
1641            }
1642        }
1643    }
1644}
1645
1646#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1647pub struct NetworkPolicyRule {
1648    pub name: String,
1649    pub action: NetworkPolicyRuleAction,
1650    pub address: PolicyAddress,
1651    pub direction: NetworkPolicyRuleDirection,
1652}
1653
1654#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1655pub enum NetworkPolicyRuleAction {
1656    Allow,
1657}
1658
1659impl std::fmt::Display for NetworkPolicyRuleAction {
1660    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1661        match self {
1662            Self::Allow => write!(f, "allow"),
1663        }
1664    }
1665}
1666impl TryFrom<&str> for NetworkPolicyRuleAction {
1667    type Error = PlanError;
1668    fn try_from(value: &str) -> Result<Self, Self::Error> {
1669        match value.to_uppercase().as_str() {
1670            "ALLOW" => Ok(Self::Allow),
1671            _ => Err(PlanError::Unstructured(
1672                "Allow is the only valid option".into(),
1673            )),
1674        }
1675    }
1676}
1677
1678#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1679pub enum NetworkPolicyRuleDirection {
1680    Ingress,
1681}
1682impl std::fmt::Display for NetworkPolicyRuleDirection {
1683    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1684        match self {
1685            Self::Ingress => write!(f, "ingress"),
1686        }
1687    }
1688}
1689impl TryFrom<&str> for NetworkPolicyRuleDirection {
1690    type Error = PlanError;
1691    fn try_from(value: &str) -> Result<Self, Self::Error> {
1692        match value.to_uppercase().as_str() {
1693            "INGRESS" => Ok(Self::Ingress),
1694            _ => Err(PlanError::Unstructured(
1695                "Ingress is the only valid option".into(),
1696            )),
1697        }
1698    }
1699}
1700
1701#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1702pub struct PolicyAddress(pub IpNet);
1703impl std::fmt::Display for PolicyAddress {
1704    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1705        write!(f, "{}", &self.0.to_string())
1706    }
1707}
1708impl From<String> for PolicyAddress {
1709    fn from(value: String) -> Self {
1710        Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1711    }
1712}
1713impl TryFrom<&str> for PolicyAddress {
1714    type Error = PlanError;
1715    fn try_from(value: &str) -> Result<Self, Self::Error> {
1716        let net = IpNet::from_str(value)
1717            .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1718        Ok(Self(net))
1719    }
1720}
1721
1722impl Serialize for PolicyAddress {
1723    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1724    where
1725        S: serde::Serializer,
1726    {
1727        serializer.serialize_str(&format!("{}", &self.0))
1728    }
1729}
1730
1731#[derive(Clone, Debug, Serialize)]
1732pub enum SshKey {
1733    PublicOnly(String),
1734    Both(SshKeyPair),
1735}
1736
1737impl SshKey {
1738    pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1739        match self {
1740            SshKey::PublicOnly(_) => None,
1741            SshKey::Both(key_pair) => Some(key_pair),
1742        }
1743    }
1744
1745    pub fn public_key(&self) -> String {
1746        match self {
1747            SshKey::PublicOnly(s) => s.into(),
1748            SshKey::Both(p) => p.ssh_public_key(),
1749        }
1750    }
1751}
1752
1753#[derive(Clone, Debug)]
1754pub struct Secret {
1755    pub create_sql: String,
1756    pub secret_as: MirScalarExpr,
1757}
1758
1759#[derive(Clone, Debug)]
1760pub struct Sink {
1761    /// Parse-able SQL that is stored durably and defines this sink.
1762    pub create_sql: String,
1763    /// Collection we read into this sink.
1764    pub from: GlobalId,
1765    /// Type of connection to the external service we sink into.
1766    pub connection: StorageSinkConnection<ReferencedConnection>,
1767    // TODO(guswynn): this probably should just be in the `connection`.
1768    pub envelope: SinkEnvelope,
1769    pub version: u64,
1770}
1771
1772#[derive(Clone, Debug)]
1773pub struct View {
1774    /// Parse-able SQL that is stored durably and defines this view.
1775    pub create_sql: String,
1776    /// Unoptimized high-level expression from parsing the `create_sql`.
1777    pub expr: HirRelationExpr,
1778    /// All of the catalog objects that are referenced by this view, according to the `expr`.
1779    pub dependencies: DependencyIds,
1780    /// Columns of this view.
1781    pub column_names: Vec<ColumnName>,
1782    /// If this view is created in the temporary schema, e.g. `CREATE TEMPORARY ...`.
1783    pub temporary: bool,
1784}
1785
1786#[derive(Clone, Debug)]
1787pub struct MaterializedView {
1788    /// Parse-able SQL that is stored durably and defines this materialized view.
1789    pub create_sql: String,
1790    /// Unoptimized high-level expression from parsing the `create_sql`.
1791    pub expr: HirRelationExpr,
1792    /// All of the catalog objects that are referenced by this materialized view, according to the `expr`.
1793    pub dependencies: DependencyIds,
1794    /// Columns of this view.
1795    pub column_names: Vec<ColumnName>,
1796    /// Cluster this materialized view will get installed on.
1797    pub cluster_id: ClusterId,
1798    pub non_null_assertions: Vec<usize>,
1799    pub compaction_window: Option<CompactionWindow>,
1800    pub refresh_schedule: Option<RefreshSchedule>,
1801    pub as_of: Option<Timestamp>,
1802}
1803
1804#[derive(Clone, Debug)]
1805pub struct Index {
1806    /// Parse-able SQL that is stored durably and defines this index.
1807    pub create_sql: String,
1808    /// Collection this index is on top of.
1809    pub on: GlobalId,
1810    pub keys: Vec<mz_expr::MirScalarExpr>,
1811    pub compaction_window: Option<CompactionWindow>,
1812    pub cluster_id: ClusterId,
1813}
1814
1815#[derive(Clone, Debug)]
1816pub struct Type {
1817    pub create_sql: String,
1818    pub inner: CatalogType<IdReference>,
1819}
1820
1821/// Specifies when a `Peek` or `Subscribe` should occur.
1822#[derive(Deserialize, Clone, Debug, PartialEq)]
1823pub enum QueryWhen {
1824    /// The peek should occur at the latest possible timestamp that allows the
1825    /// peek to complete immediately.
1826    Immediately,
1827    /// The peek should occur at a timestamp that allows the peek to see all
1828    /// data written to tables within Materialize.
1829    FreshestTableWrite,
1830    /// The peek should occur at the timestamp described by the specified
1831    /// expression.
1832    ///
1833    /// The expression may have any type.
1834    AtTimestamp(MirScalarExpr),
1835    /// Same as Immediately, but will also advance to at least the specified
1836    /// expression.
1837    AtLeastTimestamp(MirScalarExpr),
1838}
1839
1840impl QueryWhen {
1841    /// Returns a timestamp to which the candidate must be advanced.
1842    pub fn advance_to_timestamp(&self) -> Option<MirScalarExpr> {
1843        match self {
1844            QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1845            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1846        }
1847    }
1848    /// Returns whether the candidate's upper bound is constrained.
1849    /// This is only true for `AtTimestamp` since it is the only variant that
1850    /// specifies a timestamp.
1851    pub fn constrains_upper(&self) -> bool {
1852        match self {
1853            QueryWhen::AtTimestamp(_) => true,
1854            QueryWhen::AtLeastTimestamp(_)
1855            | QueryWhen::Immediately
1856            | QueryWhen::FreshestTableWrite => false,
1857        }
1858    }
1859    /// Returns whether the candidate must be advanced to the since.
1860    pub fn advance_to_since(&self) -> bool {
1861        match self {
1862            QueryWhen::Immediately
1863            | QueryWhen::AtLeastTimestamp(_)
1864            | QueryWhen::FreshestTableWrite => true,
1865            QueryWhen::AtTimestamp(_) => false,
1866        }
1867    }
1868    /// Returns whether the candidate can be advanced to the upper.
1869    pub fn can_advance_to_upper(&self) -> bool {
1870        match self {
1871            QueryWhen::Immediately => true,
1872            QueryWhen::FreshestTableWrite
1873            | QueryWhen::AtTimestamp(_)
1874            | QueryWhen::AtLeastTimestamp(_) => false,
1875        }
1876    }
1877
1878    /// Returns whether the candidate can be advanced to the timeline's timestamp.
1879    pub fn can_advance_to_timeline_ts(&self) -> bool {
1880        match self {
1881            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1882            QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1883        }
1884    }
1885    /// Returns whether the candidate must be advanced to the timeline's timestamp.
1886    pub fn must_advance_to_timeline_ts(&self) -> bool {
1887        match self {
1888            QueryWhen::FreshestTableWrite => true,
1889            QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1890                false
1891            }
1892        }
1893    }
1894    /// Returns whether the selected timestamp should be tracked within the current transaction.
1895    pub fn is_transactional(&self) -> bool {
1896        match self {
1897            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1898            QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1899        }
1900    }
1901}
1902
1903#[derive(Debug, Copy, Clone)]
1904pub enum MutationKind {
1905    Insert,
1906    Update,
1907    Delete,
1908}
1909
1910#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1911pub enum CopyFormat {
1912    Text,
1913    Csv,
1914    Binary,
1915    Parquet,
1916}
1917
1918#[derive(Debug, Copy, Clone)]
1919pub enum ExecuteTimeout {
1920    None,
1921    Seconds(f64),
1922    WaitOnce,
1923}
1924
1925#[derive(Clone, Debug)]
1926pub enum IndexOption {
1927    /// Configures the logical compaction window for an index.
1928    RetainHistory(CompactionWindow),
1929}
1930
1931#[derive(Clone, Debug)]
1932pub enum TableOption {
1933    /// Configures the logical compaction window for a table.
1934    RetainHistory(CompactionWindow),
1935}
1936
1937#[derive(Clone, Debug)]
1938pub struct PlanClusterOption {
1939    pub availability_zones: AlterOptionParameter<Vec<String>>,
1940    pub introspection_debugging: AlterOptionParameter<bool>,
1941    pub introspection_interval: AlterOptionParameter<OptionalDuration>,
1942    pub managed: AlterOptionParameter<bool>,
1943    pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
1944    pub replication_factor: AlterOptionParameter<u32>,
1945    pub size: AlterOptionParameter,
1946    pub disk: AlterOptionParameter<bool>,
1947    pub schedule: AlterOptionParameter<ClusterSchedule>,
1948    pub workload_class: AlterOptionParameter<Option<String>>,
1949}
1950
1951impl Default for PlanClusterOption {
1952    fn default() -> Self {
1953        Self {
1954            availability_zones: AlterOptionParameter::Unchanged,
1955            introspection_debugging: AlterOptionParameter::Unchanged,
1956            introspection_interval: AlterOptionParameter::Unchanged,
1957            managed: AlterOptionParameter::Unchanged,
1958            replicas: AlterOptionParameter::Unchanged,
1959            replication_factor: AlterOptionParameter::Unchanged,
1960            size: AlterOptionParameter::Unchanged,
1961            disk: AlterOptionParameter::Unchanged,
1962            schedule: AlterOptionParameter::Unchanged,
1963            workload_class: AlterOptionParameter::Unchanged,
1964        }
1965    }
1966}
1967
1968#[derive(Clone, Debug, PartialEq, Eq)]
1969pub enum AlterClusterPlanStrategy {
1970    None,
1971    For(Duration),
1972    UntilReady {
1973        on_timeout: OnTimeoutAction,
1974        timeout: Duration,
1975    },
1976}
1977
1978#[derive(Clone, Debug, PartialEq, Eq)]
1979pub enum OnTimeoutAction {
1980    Commit,
1981    Rollback,
1982}
1983
1984impl Default for OnTimeoutAction {
1985    fn default() -> Self {
1986        Self::Commit
1987    }
1988}
1989
1990impl TryFrom<&str> for OnTimeoutAction {
1991    type Error = PlanError;
1992    fn try_from(value: &str) -> Result<Self, Self::Error> {
1993        match value.to_uppercase().as_str() {
1994            "COMMIT" => Ok(Self::Commit),
1995            "ROLLBACK" => Ok(Self::Rollback),
1996            _ => Err(PlanError::Unstructured(
1997                "Valid options are COMMIT, ROLLBACK".into(),
1998            )),
1999        }
2000    }
2001}
2002
2003impl AlterClusterPlanStrategy {
2004    pub fn is_none(&self) -> bool {
2005        matches!(self, Self::None)
2006    }
2007    pub fn is_some(&self) -> bool {
2008        !matches!(self, Self::None)
2009    }
2010}
2011
2012impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2013    type Error = PlanError;
2014
2015    fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2016        Ok(match value.wait {
2017            Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2018            Some(ClusterAlterOptionValue::UntilReady(options)) => {
2019                let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2020                Self::UntilReady {
2021                    timeout: match extracted.timeout {
2022                        Some(d) => d,
2023                        None => Err(PlanError::UntilReadyTimeoutRequired)?,
2024                    },
2025                    on_timeout: match extracted.on_timeout {
2026                        Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2027                            PlanError::InvalidOptionValue {
2028                                option_name: "ON TIMEOUT".into(),
2029                                err: Box::new(e),
2030                            }
2031                        })?,
2032                        None => OnTimeoutAction::default(),
2033                    },
2034                }
2035            }
2036            None => Self::None,
2037        })
2038    }
2039}
2040
2041/// A vector of values to which parameter references should be bound.
2042#[derive(Debug, Clone)]
2043pub struct Params {
2044    pub datums: Row,
2045    pub types: Vec<ScalarType>,
2046}
2047
2048impl Params {
2049    /// Returns a `Params` with no parameters.
2050    pub fn empty() -> Params {
2051        Params {
2052            datums: Row::pack_slice(&[]),
2053            types: vec![],
2054        }
2055    }
2056}
2057
2058/// Controls planning of a SQL query.
2059#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, Copy)]
2060pub struct PlanContext {
2061    pub wall_time: DateTime<Utc>,
2062    pub ignore_if_exists_errors: bool,
2063}
2064
2065impl PlanContext {
2066    pub fn new(wall_time: DateTime<Utc>) -> Self {
2067        Self {
2068            wall_time,
2069            ignore_if_exists_errors: false,
2070        }
2071    }
2072
2073    /// Return a PlanContext with zero values. This should only be used when
2074    /// planning is required but unused (like in `plan_create_table()`) or in
2075    /// tests.
2076    pub fn zero() -> Self {
2077        PlanContext {
2078            wall_time: now::to_datetime(NOW_ZERO()),
2079            ignore_if_exists_errors: false,
2080        }
2081    }
2082
2083    pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2084        self.ignore_if_exists_errors = value;
2085        self
2086    }
2087}