1use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{
41 CollectionPlan, ColumnOrder, MapFilterProject, MirRelationExpr, MirScalarExpr, RowSetFinishing,
42};
43use mz_ore::now::{self, NOW_ZERO};
44use mz_pgcopy::CopyFormatParams;
45use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
46use mz_repr::explain::{ExplainConfig, ExplainFormat};
47use mz_repr::network_policy_id::NetworkPolicyId;
48use mz_repr::optimize::OptimizerFeatureOverrides;
49use mz_repr::refresh_schedule::RefreshSchedule;
50use mz_repr::role_id::RoleId;
51use mz_repr::{
52 CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, Row, SqlColumnType,
53 SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
54};
55use mz_sql_parser::ast::{
56 AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
57 RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
58 Value, WithOptionValue,
59};
60use mz_ssh_util::keys::SshKeyPair;
61use mz_storage_types::connections::aws::AwsConnection;
62use mz_storage_types::connections::inline::ReferencedConnection;
63use mz_storage_types::connections::{
64 AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
65 MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
66};
67use mz_storage_types::instances::StorageInstanceId;
68use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
69use mz_storage_types::sources::{
70 SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
71};
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74
75use crate::ast::{
76 ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
77 TransactionAccessMode,
78};
79use crate::catalog::{
80 CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
81 RoleAttributesRaw,
82};
83use crate::names::{
84 Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
85 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
86};
87
88pub(crate) mod error;
89pub(crate) mod explain;
90pub(crate) mod hir;
91pub(crate) mod literal;
92pub(crate) mod lowering;
93pub(crate) mod notice;
94pub(crate) mod plan_utils;
95pub(crate) mod query;
96pub(crate) mod scope;
97pub(crate) mod side_effecting_func;
98pub(crate) mod statement;
99pub(crate) mod transform_ast;
100pub(crate) mod transform_hir;
101pub(crate) mod typeconv;
102pub(crate) mod with_options;
103
104use crate::plan;
105use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
106use crate::plan::with_options::OptionalDuration;
107pub use error::PlanError;
108pub use explain::normalize_subqueries;
109pub use hir::{
110 AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
111 WindowExprType,
112};
113pub use lowering::Config as HirToMirConfig;
114pub use notice::PlanNotice;
115pub use query::{ExprContext, QueryContext, QueryLifetime};
116pub use scope::Scope;
117pub use side_effecting_func::SideEffectingFunc;
118pub use statement::ddl::{
119 AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
120 PlannedAlterRoleOption, PlannedRoleVariable, SqlServerConfigOptionExtracted,
121};
122pub use statement::{
123 StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
124 resolve_cluster_for_materialized_view,
125};
126
127use self::statement::ddl::ClusterAlterOptionExtracted;
128use self::with_options::TryFromValue;
129
130#[derive(Debug, EnumKind)]
132#[enum_kind(PlanKind)]
133pub enum Plan {
134 CreateConnection(CreateConnectionPlan),
135 CreateDatabase(CreateDatabasePlan),
136 CreateSchema(CreateSchemaPlan),
137 CreateRole(CreateRolePlan),
138 CreateCluster(CreateClusterPlan),
139 CreateClusterReplica(CreateClusterReplicaPlan),
140 CreateSource(CreateSourcePlan),
141 CreateSources(Vec<CreateSourcePlanBundle>),
142 CreateSecret(CreateSecretPlan),
143 CreateSink(CreateSinkPlan),
144 CreateTable(CreateTablePlan),
145 CreateView(CreateViewPlan),
146 CreateMaterializedView(CreateMaterializedViewPlan),
147 CreateContinualTask(CreateContinualTaskPlan),
148 CreateNetworkPolicy(CreateNetworkPolicyPlan),
149 CreateIndex(CreateIndexPlan),
150 CreateType(CreateTypePlan),
151 Comment(CommentPlan),
152 DiscardTemp,
153 DiscardAll,
154 DropObjects(DropObjectsPlan),
155 DropOwned(DropOwnedPlan),
156 EmptyQuery,
157 ShowAllVariables,
158 ShowCreate(ShowCreatePlan),
159 ShowColumns(ShowColumnsPlan),
160 ShowVariable(ShowVariablePlan),
161 InspectShard(InspectShardPlan),
162 SetVariable(SetVariablePlan),
163 ResetVariable(ResetVariablePlan),
164 SetTransaction(SetTransactionPlan),
165 StartTransaction(StartTransactionPlan),
166 CommitTransaction(CommitTransactionPlan),
167 AbortTransaction(AbortTransactionPlan),
168 Select(SelectPlan),
169 Subscribe(SubscribePlan),
170 CopyFrom(CopyFromPlan),
171 CopyTo(CopyToPlan),
172 ExplainPlan(ExplainPlanPlan),
173 ExplainPushdown(ExplainPushdownPlan),
174 ExplainTimestamp(ExplainTimestampPlan),
175 ExplainSinkSchema(ExplainSinkSchemaPlan),
176 Insert(InsertPlan),
177 AlterCluster(AlterClusterPlan),
178 AlterClusterSwap(AlterClusterSwapPlan),
179 AlterNoop(AlterNoopPlan),
180 AlterSetCluster(AlterSetClusterPlan),
181 AlterConnection(AlterConnectionPlan),
182 AlterSource(AlterSourcePlan),
183 AlterClusterRename(AlterClusterRenamePlan),
184 AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
185 AlterItemRename(AlterItemRenamePlan),
186 AlterSchemaRename(AlterSchemaRenamePlan),
187 AlterSchemaSwap(AlterSchemaSwapPlan),
188 AlterSecret(AlterSecretPlan),
189 AlterSink(AlterSinkPlan),
190 AlterSystemSet(AlterSystemSetPlan),
191 AlterSystemReset(AlterSystemResetPlan),
192 AlterSystemResetAll(AlterSystemResetAllPlan),
193 AlterRole(AlterRolePlan),
194 AlterOwner(AlterOwnerPlan),
195 AlterTableAddColumn(AlterTablePlan),
196 AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementPlan),
197 AlterNetworkPolicy(AlterNetworkPolicyPlan),
198 Declare(DeclarePlan),
199 Fetch(FetchPlan),
200 Close(ClosePlan),
201 ReadThenWrite(ReadThenWritePlan),
202 Prepare(PreparePlan),
203 Execute(ExecutePlan),
204 Deallocate(DeallocatePlan),
205 Raise(RaisePlan),
206 GrantRole(GrantRolePlan),
207 RevokeRole(RevokeRolePlan),
208 GrantPrivileges(GrantPrivilegesPlan),
209 RevokePrivileges(RevokePrivilegesPlan),
210 AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
211 ReassignOwned(ReassignOwnedPlan),
212 SideEffectingFunc(SideEffectingFunc),
213 ValidateConnection(ValidateConnectionPlan),
214 AlterRetainHistory(AlterRetainHistoryPlan),
215}
216
217impl Plan {
218 pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
221 match stmt {
222 StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
223 StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
224 StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
225 StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
226 StatementKind::AlterObjectRename => &[
227 PlanKind::AlterClusterRename,
228 PlanKind::AlterClusterReplicaRename,
229 PlanKind::AlterItemRename,
230 PlanKind::AlterSchemaRename,
231 PlanKind::AlterNoop,
232 ],
233 StatementKind::AlterObjectSwap => &[
234 PlanKind::AlterClusterSwap,
235 PlanKind::AlterSchemaSwap,
236 PlanKind::AlterNoop,
237 ],
238 StatementKind::AlterRole => &[PlanKind::AlterRole],
239 StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
240 StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
241 StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
242 StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
243 StatementKind::AlterSource => &[
244 PlanKind::AlterNoop,
245 PlanKind::AlterSource,
246 PlanKind::AlterRetainHistory,
247 ],
248 StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
249 StatementKind::AlterSystemResetAll => {
250 &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
251 }
252 StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
253 StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
254 StatementKind::AlterTableAddColumn => {
255 &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
256 }
257 StatementKind::AlterMaterializedViewApplyReplacement => &[
258 PlanKind::AlterNoop,
259 PlanKind::AlterMaterializedViewApplyReplacement,
260 ],
261 StatementKind::Close => &[PlanKind::Close],
262 StatementKind::Comment => &[PlanKind::Comment],
263 StatementKind::Commit => &[PlanKind::CommitTransaction],
264 StatementKind::Copy => &[
265 PlanKind::CopyFrom,
266 PlanKind::Select,
267 PlanKind::Subscribe,
268 PlanKind::CopyTo,
269 ],
270 StatementKind::CreateCluster => &[PlanKind::CreateCluster],
271 StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
272 StatementKind::CreateConnection => &[PlanKind::CreateConnection],
273 StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
274 StatementKind::CreateIndex => &[PlanKind::CreateIndex],
275 StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
276 StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
277 StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
278 StatementKind::CreateRole => &[PlanKind::CreateRole],
279 StatementKind::CreateSchema => &[PlanKind::CreateSchema],
280 StatementKind::CreateSecret => &[PlanKind::CreateSecret],
281 StatementKind::CreateSink => &[PlanKind::CreateSink],
282 StatementKind::CreateSource | StatementKind::CreateSubsource => {
283 &[PlanKind::CreateSource]
284 }
285 StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
286 StatementKind::CreateTable => &[PlanKind::CreateTable],
287 StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
288 StatementKind::CreateType => &[PlanKind::CreateType],
289 StatementKind::CreateView => &[PlanKind::CreateView],
290 StatementKind::Deallocate => &[PlanKind::Deallocate],
291 StatementKind::Declare => &[PlanKind::Declare],
292 StatementKind::Delete => &[PlanKind::ReadThenWrite],
293 StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
294 StatementKind::DropObjects => &[PlanKind::DropObjects],
295 StatementKind::DropOwned => &[PlanKind::DropOwned],
296 StatementKind::Execute => &[PlanKind::Execute],
297 StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
298 StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
299 StatementKind::ExplainAnalyzeObject => &[PlanKind::Select],
300 StatementKind::ExplainAnalyzeCluster => &[PlanKind::Select],
301 StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
302 StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
303 StatementKind::Fetch => &[PlanKind::Fetch],
304 StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
305 StatementKind::GrantRole => &[PlanKind::GrantRole],
306 StatementKind::Insert => &[PlanKind::Insert],
307 StatementKind::Prepare => &[PlanKind::Prepare],
308 StatementKind::Raise => &[PlanKind::Raise],
309 StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
310 StatementKind::ResetVariable => &[PlanKind::ResetVariable],
311 StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
312 StatementKind::RevokeRole => &[PlanKind::RevokeRole],
313 StatementKind::Rollback => &[PlanKind::AbortTransaction],
314 StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
315 StatementKind::SetTransaction => &[PlanKind::SetTransaction],
316 StatementKind::SetVariable => &[PlanKind::SetVariable],
317 StatementKind::Show => &[
318 PlanKind::Select,
319 PlanKind::ShowVariable,
320 PlanKind::ShowCreate,
321 PlanKind::ShowColumns,
322 PlanKind::ShowAllVariables,
323 PlanKind::InspectShard,
324 ],
325 StatementKind::StartTransaction => &[PlanKind::StartTransaction],
326 StatementKind::Subscribe => &[PlanKind::Subscribe],
327 StatementKind::Update => &[PlanKind::ReadThenWrite],
328 StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
329 StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
330 }
331 }
332
333 pub fn name(&self) -> &str {
335 match self {
336 Plan::CreateConnection(_) => "create connection",
337 Plan::CreateDatabase(_) => "create database",
338 Plan::CreateSchema(_) => "create schema",
339 Plan::CreateRole(_) => "create role",
340 Plan::CreateCluster(_) => "create cluster",
341 Plan::CreateClusterReplica(_) => "create cluster replica",
342 Plan::CreateSource(_) => "create source",
343 Plan::CreateSources(_) => "create source",
344 Plan::CreateSecret(_) => "create secret",
345 Plan::CreateSink(_) => "create sink",
346 Plan::CreateTable(_) => "create table",
347 Plan::CreateView(_) => "create view",
348 Plan::CreateMaterializedView(_) => "create materialized view",
349 Plan::CreateContinualTask(_) => "create continual task",
350 Plan::CreateIndex(_) => "create index",
351 Plan::CreateType(_) => "create type",
352 Plan::CreateNetworkPolicy(_) => "create network policy",
353 Plan::Comment(_) => "comment",
354 Plan::DiscardTemp => "discard temp",
355 Plan::DiscardAll => "discard all",
356 Plan::DropObjects(plan) => match plan.object_type {
357 ObjectType::Table => "drop table",
358 ObjectType::View => "drop view",
359 ObjectType::MaterializedView => "drop materialized view",
360 ObjectType::Source => "drop source",
361 ObjectType::Sink => "drop sink",
362 ObjectType::Index => "drop index",
363 ObjectType::Type => "drop type",
364 ObjectType::Role => "drop roles",
365 ObjectType::Cluster => "drop clusters",
366 ObjectType::ClusterReplica => "drop cluster replicas",
367 ObjectType::Secret => "drop secret",
368 ObjectType::Connection => "drop connection",
369 ObjectType::Database => "drop database",
370 ObjectType::Schema => "drop schema",
371 ObjectType::Func => "drop function",
372 ObjectType::ContinualTask => "drop continual task",
373 ObjectType::NetworkPolicy => "drop network policy",
374 },
375 Plan::DropOwned(_) => "drop owned",
376 Plan::EmptyQuery => "do nothing",
377 Plan::ShowAllVariables => "show all variables",
378 Plan::ShowCreate(_) => "show create",
379 Plan::ShowColumns(_) => "show columns",
380 Plan::ShowVariable(_) => "show variable",
381 Plan::InspectShard(_) => "inspect shard",
382 Plan::SetVariable(_) => "set variable",
383 Plan::ResetVariable(_) => "reset variable",
384 Plan::SetTransaction(_) => "set transaction",
385 Plan::StartTransaction(_) => "start transaction",
386 Plan::CommitTransaction(_) => "commit",
387 Plan::AbortTransaction(_) => "abort",
388 Plan::Select(_) => "select",
389 Plan::Subscribe(_) => "subscribe",
390 Plan::CopyFrom(_) => "copy from",
391 Plan::CopyTo(_) => "copy to",
392 Plan::ExplainPlan(_) => "explain plan",
393 Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
394 Plan::ExplainTimestamp(_) => "explain timestamp",
395 Plan::ExplainSinkSchema(_) => "explain schema",
396 Plan::Insert(_) => "insert",
397 Plan::AlterNoop(plan) => match plan.object_type {
398 ObjectType::Table => "alter table",
399 ObjectType::View => "alter view",
400 ObjectType::MaterializedView => "alter materialized view",
401 ObjectType::Source => "alter source",
402 ObjectType::Sink => "alter sink",
403 ObjectType::Index => "alter index",
404 ObjectType::Type => "alter type",
405 ObjectType::Role => "alter role",
406 ObjectType::Cluster => "alter cluster",
407 ObjectType::ClusterReplica => "alter cluster replica",
408 ObjectType::Secret => "alter secret",
409 ObjectType::Connection => "alter connection",
410 ObjectType::Database => "alter database",
411 ObjectType::Schema => "alter schema",
412 ObjectType::Func => "alter function",
413 ObjectType::ContinualTask => "alter continual task",
414 ObjectType::NetworkPolicy => "alter network policy",
415 },
416 Plan::AlterCluster(_) => "alter cluster",
417 Plan::AlterClusterRename(_) => "alter cluster rename",
418 Plan::AlterClusterSwap(_) => "alter cluster swap",
419 Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
420 Plan::AlterSetCluster(_) => "alter set cluster",
421 Plan::AlterConnection(_) => "alter connection",
422 Plan::AlterSource(_) => "alter source",
423 Plan::AlterItemRename(_) => "rename item",
424 Plan::AlterSchemaRename(_) => "alter rename schema",
425 Plan::AlterSchemaSwap(_) => "alter swap schema",
426 Plan::AlterSecret(_) => "alter secret",
427 Plan::AlterSink(_) => "alter sink",
428 Plan::AlterSystemSet(_) => "alter system",
429 Plan::AlterSystemReset(_) => "alter system",
430 Plan::AlterSystemResetAll(_) => "alter system",
431 Plan::AlterRole(_) => "alter role",
432 Plan::AlterNetworkPolicy(_) => "alter network policy",
433 Plan::AlterOwner(plan) => match plan.object_type {
434 ObjectType::Table => "alter table owner",
435 ObjectType::View => "alter view owner",
436 ObjectType::MaterializedView => "alter materialized view owner",
437 ObjectType::Source => "alter source owner",
438 ObjectType::Sink => "alter sink owner",
439 ObjectType::Index => "alter index owner",
440 ObjectType::Type => "alter type owner",
441 ObjectType::Role => "alter role owner",
442 ObjectType::Cluster => "alter cluster owner",
443 ObjectType::ClusterReplica => "alter cluster replica owner",
444 ObjectType::Secret => "alter secret owner",
445 ObjectType::Connection => "alter connection owner",
446 ObjectType::Database => "alter database owner",
447 ObjectType::Schema => "alter schema owner",
448 ObjectType::Func => "alter function owner",
449 ObjectType::ContinualTask => "alter continual task owner",
450 ObjectType::NetworkPolicy => "alter network policy owner",
451 },
452 Plan::AlterTableAddColumn(_) => "alter table add column",
453 Plan::AlterMaterializedViewApplyReplacement(_) => {
454 "alter materialized view apply replacement"
455 }
456 Plan::Declare(_) => "declare",
457 Plan::Fetch(_) => "fetch",
458 Plan::Close(_) => "close",
459 Plan::ReadThenWrite(plan) => match plan.kind {
460 MutationKind::Insert => "insert into select",
461 MutationKind::Update => "update",
462 MutationKind::Delete => "delete",
463 },
464 Plan::Prepare(_) => "prepare",
465 Plan::Execute(_) => "execute",
466 Plan::Deallocate(_) => "deallocate",
467 Plan::Raise(_) => "raise",
468 Plan::GrantRole(_) => "grant role",
469 Plan::RevokeRole(_) => "revoke role",
470 Plan::GrantPrivileges(_) => "grant privilege",
471 Plan::RevokePrivileges(_) => "revoke privilege",
472 Plan::AlterDefaultPrivileges(_) => "alter default privileges",
473 Plan::ReassignOwned(_) => "reassign owned",
474 Plan::SideEffectingFunc(_) => "side effecting func",
475 Plan::ValidateConnection(_) => "validate connection",
476 Plan::AlterRetainHistory(_) => "alter retain history",
477 }
478 }
479
480 pub fn allowed_in_read_only(&self) -> bool {
486 match self {
487 Plan::SetVariable(_) => true,
490 Plan::ResetVariable(_) => true,
491 Plan::SetTransaction(_) => true,
492 Plan::StartTransaction(_) => true,
493 Plan::CommitTransaction(_) => true,
494 Plan::AbortTransaction(_) => true,
495 Plan::Select(_) => true,
496 Plan::EmptyQuery => true,
497 Plan::ShowAllVariables => true,
498 Plan::ShowCreate(_) => true,
499 Plan::ShowColumns(_) => true,
500 Plan::ShowVariable(_) => true,
501 Plan::InspectShard(_) => true,
502 Plan::Subscribe(_) => true,
503 Plan::CopyTo(_) => true,
504 Plan::ExplainPlan(_) => true,
505 Plan::ExplainPushdown(_) => true,
506 Plan::ExplainTimestamp(_) => true,
507 Plan::ExplainSinkSchema(_) => true,
508 Plan::ValidateConnection(_) => true,
509 _ => false,
510 }
511 }
512}
513
514#[derive(Debug)]
515pub struct StartTransactionPlan {
516 pub access: Option<TransactionAccessMode>,
517 pub isolation_level: Option<TransactionIsolationLevel>,
518}
519
520#[derive(Debug)]
521pub enum TransactionType {
522 Explicit,
523 Implicit,
524}
525
526impl TransactionType {
527 pub fn is_explicit(&self) -> bool {
528 matches!(self, TransactionType::Explicit)
529 }
530
531 pub fn is_implicit(&self) -> bool {
532 matches!(self, TransactionType::Implicit)
533 }
534}
535
536#[derive(Debug)]
537pub struct CommitTransactionPlan {
538 pub transaction_type: TransactionType,
539}
540
541#[derive(Debug)]
542pub struct AbortTransactionPlan {
543 pub transaction_type: TransactionType,
544}
545
546#[derive(Debug)]
547pub struct CreateDatabasePlan {
548 pub name: String,
549 pub if_not_exists: bool,
550}
551
552#[derive(Debug)]
553pub struct CreateSchemaPlan {
554 pub database_spec: ResolvedDatabaseSpecifier,
555 pub schema_name: String,
556 pub if_not_exists: bool,
557}
558
559#[derive(Debug)]
560pub struct CreateRolePlan {
561 pub name: String,
562 pub attributes: RoleAttributesRaw,
563}
564
565#[derive(Debug, PartialEq, Eq, Clone)]
566pub struct CreateClusterPlan {
567 pub name: String,
568 pub variant: CreateClusterVariant,
569 pub workload_class: Option<String>,
570}
571
572#[derive(Debug, PartialEq, Eq, Clone)]
573pub enum CreateClusterVariant {
574 Managed(CreateClusterManagedPlan),
575 Unmanaged(CreateClusterUnmanagedPlan),
576}
577
578#[derive(Debug, PartialEq, Eq, Clone)]
579pub struct CreateClusterUnmanagedPlan {
580 pub replicas: Vec<(String, ReplicaConfig)>,
581}
582
583#[derive(Debug, PartialEq, Eq, Clone)]
584pub struct CreateClusterManagedPlan {
585 pub replication_factor: u32,
586 pub size: String,
587 pub availability_zones: Vec<String>,
588 pub compute: ComputeReplicaConfig,
589 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
590 pub schedule: ClusterSchedule,
591}
592
593#[derive(Debug)]
594pub struct CreateClusterReplicaPlan {
595 pub cluster_id: ClusterId,
596 pub name: String,
597 pub config: ReplicaConfig,
598}
599
600#[derive(
602 Clone,
603 Copy,
604 Debug,
605 Serialize,
606 Deserialize,
607 PartialOrd,
608 Ord,
609 PartialEq,
610 Eq
611)]
612pub struct ComputeReplicaIntrospectionConfig {
613 pub debugging: bool,
615 pub interval: Duration,
617}
618
619#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
620pub struct ComputeReplicaConfig {
621 pub introspection: Option<ComputeReplicaIntrospectionConfig>,
622}
623
624#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
625pub enum ReplicaConfig {
626 Unorchestrated {
627 storagectl_addrs: Vec<String>,
628 computectl_addrs: Vec<String>,
629 compute: ComputeReplicaConfig,
630 },
631 Orchestrated {
632 size: String,
633 availability_zone: Option<String>,
634 compute: ComputeReplicaConfig,
635 internal: bool,
636 billed_as: Option<String>,
637 },
638}
639
640#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
641pub enum ClusterSchedule {
642 Manual,
644 Refresh { hydration_time_estimate: Duration },
648}
649
650impl Default for ClusterSchedule {
651 fn default() -> Self {
652 ClusterSchedule::Manual
654 }
655}
656
657#[derive(Debug)]
658pub struct CreateSourcePlan {
659 pub name: QualifiedItemName,
660 pub source: Source,
661 pub if_not_exists: bool,
662 pub timeline: Timeline,
663 pub in_cluster: Option<ClusterId>,
665}
666
667#[derive(Clone, Debug, PartialEq, Eq)]
668pub struct SourceReferences {
669 pub updated_at: u64,
670 pub references: Vec<SourceReference>,
671}
672
673#[derive(Clone, Debug, PartialEq, Eq)]
676pub struct SourceReference {
677 pub name: String,
678 pub namespace: Option<String>,
679 pub columns: Vec<String>,
680}
681
682#[derive(Debug)]
684pub struct CreateSourcePlanBundle {
685 pub item_id: CatalogItemId,
687 pub global_id: GlobalId,
689 pub plan: CreateSourcePlan,
691 pub resolved_ids: ResolvedIds,
693 pub available_source_references: Option<SourceReferences>,
697}
698
699#[derive(Debug)]
700pub struct CreateConnectionPlan {
701 pub name: QualifiedItemName,
702 pub if_not_exists: bool,
703 pub connection: Connection,
704 pub validate: bool,
705}
706
707#[derive(Debug)]
708pub struct ValidateConnectionPlan {
709 pub id: CatalogItemId,
711 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
713}
714
715#[derive(Debug)]
716pub struct CreateSecretPlan {
717 pub name: QualifiedItemName,
718 pub secret: Secret,
719 pub if_not_exists: bool,
720}
721
722#[derive(Debug)]
723pub struct CreateSinkPlan {
724 pub name: QualifiedItemName,
725 pub sink: Sink,
726 pub with_snapshot: bool,
727 pub if_not_exists: bool,
728 pub in_cluster: ClusterId,
729}
730
731#[derive(Debug)]
732pub struct CreateTablePlan {
733 pub name: QualifiedItemName,
734 pub table: Table,
735 pub if_not_exists: bool,
736}
737
738#[derive(Debug, Clone)]
739pub struct CreateViewPlan {
740 pub name: QualifiedItemName,
741 pub view: View,
742 pub replace: Option<CatalogItemId>,
744 pub drop_ids: Vec<CatalogItemId>,
746 pub if_not_exists: bool,
747 pub ambiguous_columns: bool,
750}
751
752#[derive(Debug, Clone)]
753pub struct CreateMaterializedViewPlan {
754 pub name: QualifiedItemName,
755 pub materialized_view: MaterializedView,
756 pub replace: Option<CatalogItemId>,
758 pub drop_ids: Vec<CatalogItemId>,
760 pub if_not_exists: bool,
761 pub ambiguous_columns: bool,
764}
765
766#[derive(Debug, Clone)]
767pub struct CreateContinualTaskPlan {
768 pub name: QualifiedItemName,
769 pub placeholder_id: Option<mz_expr::LocalId>,
772 pub desc: RelationDesc,
773 pub input_id: GlobalId,
775 pub with_snapshot: bool,
776 pub continual_task: MaterializedView,
778}
779
780#[derive(Debug, Clone)]
781pub struct CreateNetworkPolicyPlan {
782 pub name: String,
783 pub rules: Vec<NetworkPolicyRule>,
784}
785
786#[derive(Debug, Clone)]
787pub struct AlterNetworkPolicyPlan {
788 pub id: NetworkPolicyId,
789 pub name: String,
790 pub rules: Vec<NetworkPolicyRule>,
791}
792
793#[derive(Debug, Clone)]
794pub struct CreateIndexPlan {
795 pub name: QualifiedItemName,
796 pub index: Index,
797 pub if_not_exists: bool,
798}
799
800#[derive(Debug)]
801pub struct CreateTypePlan {
802 pub name: QualifiedItemName,
803 pub typ: Type,
804}
805
806#[derive(Debug)]
807pub struct DropObjectsPlan {
808 pub referenced_ids: Vec<ObjectId>,
810 pub drop_ids: Vec<ObjectId>,
812 pub object_type: ObjectType,
815}
816
817#[derive(Debug)]
818pub struct DropOwnedPlan {
819 pub role_ids: Vec<RoleId>,
821 pub drop_ids: Vec<ObjectId>,
823 pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
825 pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
827}
828
829#[derive(Debug)]
830pub struct ShowVariablePlan {
831 pub name: String,
832}
833
834#[derive(Debug)]
835pub struct InspectShardPlan {
836 pub id: GlobalId,
838}
839
840#[derive(Debug)]
841pub struct SetVariablePlan {
842 pub name: String,
843 pub value: VariableValue,
844 pub local: bool,
845}
846
847#[derive(Debug)]
848pub enum VariableValue {
849 Default,
850 Values(Vec<String>),
851}
852
853#[derive(Debug)]
854pub struct ResetVariablePlan {
855 pub name: String,
856}
857
858#[derive(Debug)]
859pub struct SetTransactionPlan {
860 pub local: bool,
861 pub modes: Vec<TransactionMode>,
862}
863
864#[derive(Clone, Debug)]
866pub struct SelectPlan {
867 pub select: Option<Box<SelectStatement<Aug>>>,
870 pub source: HirRelationExpr,
872 pub when: QueryWhen,
874 pub finishing: RowSetFinishing,
876 pub copy_to: Option<CopyFormat>,
878}
879
880impl SelectPlan {
881 pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
882 let arity = typ.arity();
883 SelectPlan {
884 select: None,
885 source: HirRelationExpr::Constant { rows, typ },
886 when: QueryWhen::Immediately,
887 finishing: RowSetFinishing::trivial(arity),
888 copy_to: None,
889 }
890 }
891}
892
893#[derive(Debug, Clone)]
894pub enum SubscribeOutput {
895 Diffs,
896 WithinTimestampOrderBy {
897 order_by: Vec<ColumnOrder>,
899 },
900 EnvelopeUpsert {
901 order_by_keys: Vec<ColumnOrder>,
903 },
904 EnvelopeDebezium {
905 order_by_keys: Vec<ColumnOrder>,
907 },
908}
909
910#[derive(Debug, Clone)]
911pub struct SubscribePlan {
912 pub from: SubscribeFrom,
913 pub with_snapshot: bool,
914 pub when: QueryWhen,
915 pub up_to: Option<Timestamp>,
916 pub copy_to: Option<CopyFormat>,
917 pub emit_progress: bool,
918 pub output: SubscribeOutput,
919}
920
921#[derive(Debug, Clone)]
922pub enum SubscribeFrom {
923 Id(GlobalId),
925 Query {
927 expr: MirRelationExpr,
928 desc: RelationDesc,
929 },
930}
931
932impl SubscribeFrom {
933 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
934 match self {
935 SubscribeFrom::Id(id) => BTreeSet::from([*id]),
936 SubscribeFrom::Query { expr, .. } => expr.depends_on(),
937 }
938 }
939
940 pub fn contains_temporal(&self) -> bool {
941 match self {
942 SubscribeFrom::Id(_) => false,
943 SubscribeFrom::Query { expr, .. } => expr.contains_temporal(),
944 }
945 }
946}
947
948#[derive(Debug)]
949pub struct ShowCreatePlan {
950 pub id: ObjectId,
951 pub row: Row,
952}
953
954#[derive(Debug)]
955pub struct ShowColumnsPlan {
956 pub id: CatalogItemId,
957 pub select_plan: SelectPlan,
958 pub new_resolved_ids: ResolvedIds,
959}
960
961#[derive(Debug)]
962pub struct CopyFromPlan {
963 pub target_id: CatalogItemId,
965 pub target_name: String,
967 pub source: CopyFromSource,
969 pub columns: Vec<ColumnIndex>,
973 pub source_desc: RelationDesc,
975 pub mfp: MapFilterProject,
977 pub params: CopyFormatParams<'static>,
979 pub filter: Option<CopyFromFilter>,
981}
982
983#[derive(Debug)]
984pub enum CopyFromSource {
985 Stdin,
987 Url(HirScalarExpr),
991 AwsS3 {
993 uri: HirScalarExpr,
995 connection: AwsConnection,
997 connection_id: CatalogItemId,
999 },
1000}
1001
1002#[derive(Debug)]
1003pub enum CopyFromFilter {
1004 Files(Vec<String>),
1005 Pattern(String),
1006}
1007
1008#[derive(Debug, Clone)]
1013pub struct CopyToPlan {
1014 pub select_plan: SelectPlan,
1016 pub desc: RelationDesc,
1017 pub to: HirScalarExpr,
1019 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
1020 pub connection_id: CatalogItemId,
1022 pub format: S3SinkFormat,
1023 pub max_file_size: u64,
1024}
1025
1026#[derive(Clone, Debug)]
1027pub struct ExplainPlanPlan {
1028 pub stage: ExplainStage,
1029 pub format: ExplainFormat,
1030 pub config: ExplainConfig,
1031 pub explainee: Explainee,
1032}
1033
1034#[derive(Clone, Debug)]
1036pub enum Explainee {
1037 View(CatalogItemId),
1039 MaterializedView(CatalogItemId),
1041 Index(CatalogItemId),
1043 ReplanView(CatalogItemId),
1045 ReplanMaterializedView(CatalogItemId),
1047 ReplanIndex(CatalogItemId),
1049 Statement(ExplaineeStatement),
1051}
1052
1053#[derive(Clone, Debug, EnumKind)]
1055#[enum_kind(ExplaineeStatementKind)]
1056pub enum ExplaineeStatement {
1057 Select {
1059 broken: bool,
1061 plan: plan::SelectPlan,
1062 desc: RelationDesc,
1063 },
1064 CreateView {
1066 broken: bool,
1068 plan: plan::CreateViewPlan,
1069 },
1070 CreateMaterializedView {
1072 broken: bool,
1074 plan: plan::CreateMaterializedViewPlan,
1075 },
1076 CreateIndex {
1078 broken: bool,
1080 plan: plan::CreateIndexPlan,
1081 },
1082 Subscribe {
1084 broken: bool,
1086 plan: plan::SubscribePlan,
1087 },
1088}
1089
1090impl ExplaineeStatement {
1091 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1092 match self {
1093 Self::Select { plan, .. } => plan.source.depends_on(),
1094 Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1095 Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1096 Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1097 Self::Subscribe { plan, .. } => plan.from.depends_on(),
1098 }
1099 }
1100
1101 pub fn broken(&self) -> bool {
1112 match self {
1113 Self::Select { broken, .. } => *broken,
1114 Self::CreateView { broken, .. } => *broken,
1115 Self::CreateMaterializedView { broken, .. } => *broken,
1116 Self::CreateIndex { broken, .. } => *broken,
1117 Self::Subscribe { broken, .. } => *broken,
1118 }
1119 }
1120}
1121
1122impl ExplaineeStatementKind {
1123 pub fn supports(&self, stage: &ExplainStage) -> bool {
1124 use ExplainStage::*;
1125 match self {
1126 Self::Select => true,
1127 Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1128 Self::CreateMaterializedView => true,
1129 Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1130 Self::Subscribe => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1133 }
1134 }
1135}
1136
1137impl std::fmt::Display for ExplaineeStatementKind {
1138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1139 match self {
1140 Self::Select => write!(f, "SELECT"),
1141 Self::CreateView => write!(f, "CREATE VIEW"),
1142 Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1143 Self::CreateIndex => write!(f, "CREATE INDEX"),
1144 Self::Subscribe => write!(f, "SUBSCRIBE"),
1145 }
1146 }
1147}
1148
1149#[derive(Clone, Debug)]
1150pub struct ExplainPushdownPlan {
1151 pub explainee: Explainee,
1152}
1153
1154#[derive(Clone, Debug)]
1155pub struct ExplainTimestampPlan {
1156 pub format: ExplainFormat,
1157 pub raw_plan: HirRelationExpr,
1158 pub when: QueryWhen,
1159}
1160
1161#[derive(Debug)]
1162pub struct ExplainSinkSchemaPlan {
1163 pub sink_from: GlobalId,
1164 pub json_schema: String,
1165}
1166
1167#[derive(Debug)]
1168pub struct SendDiffsPlan {
1169 pub id: CatalogItemId,
1170 pub updates: Vec<(Row, Diff)>,
1171 pub kind: MutationKind,
1172 pub returning: Vec<(Row, NonZeroUsize)>,
1173 pub max_result_size: u64,
1174}
1175
1176#[derive(Debug)]
1177pub struct InsertPlan {
1178 pub id: CatalogItemId,
1179 pub values: HirRelationExpr,
1180 pub returning: Vec<mz_expr::MirScalarExpr>,
1181}
1182
1183#[derive(Debug)]
1184pub struct ReadThenWritePlan {
1185 pub id: CatalogItemId,
1186 pub selection: HirRelationExpr,
1187 pub finishing: RowSetFinishing,
1188 pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1189 pub kind: MutationKind,
1190 pub returning: Vec<mz_expr::MirScalarExpr>,
1191}
1192
1193#[derive(Debug)]
1195pub struct AlterNoopPlan {
1196 pub object_type: ObjectType,
1197}
1198
1199#[derive(Debug)]
1200pub struct AlterSetClusterPlan {
1201 pub id: CatalogItemId,
1202 pub set_cluster: ClusterId,
1203}
1204
1205#[derive(Debug)]
1206pub struct AlterRetainHistoryPlan {
1207 pub id: CatalogItemId,
1208 pub value: Option<Value>,
1209 pub window: CompactionWindow,
1210 pub object_type: ObjectType,
1211}
1212
1213#[derive(Debug, Clone)]
1214
1215pub enum AlterOptionParameter<T = String> {
1216 Set(T),
1217 Reset,
1218 Unchanged,
1219}
1220
1221#[derive(Debug)]
1222pub enum AlterConnectionAction {
1223 RotateKeys,
1224 AlterOptions {
1225 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1226 drop_options: BTreeSet<ConnectionOptionName>,
1227 validate: bool,
1228 },
1229}
1230
1231#[derive(Debug)]
1232pub struct AlterConnectionPlan {
1233 pub id: CatalogItemId,
1234 pub action: AlterConnectionAction,
1235}
1236
1237#[derive(Debug)]
1238pub enum AlterSourceAction {
1239 AddSubsourceExports {
1240 subsources: Vec<CreateSourcePlanBundle>,
1241 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1242 },
1243 RefreshReferences {
1244 references: SourceReferences,
1245 },
1246}
1247
1248#[derive(Debug)]
1249pub struct AlterSourcePlan {
1250 pub item_id: CatalogItemId,
1251 pub ingestion_id: GlobalId,
1252 pub action: AlterSourceAction,
1253}
1254
1255#[derive(Debug, Clone)]
1256pub struct AlterSinkPlan {
1257 pub item_id: CatalogItemId,
1258 pub global_id: GlobalId,
1259 pub sink: Sink,
1260 pub with_snapshot: bool,
1261 pub in_cluster: ClusterId,
1262}
1263
1264#[derive(Debug, Clone)]
1265pub struct AlterClusterPlan {
1266 pub id: ClusterId,
1267 pub name: String,
1268 pub options: PlanClusterOption,
1269 pub strategy: AlterClusterPlanStrategy,
1270}
1271
1272#[derive(Debug)]
1273pub struct AlterClusterRenamePlan {
1274 pub id: ClusterId,
1275 pub name: String,
1276 pub to_name: String,
1277}
1278
1279#[derive(Debug)]
1280pub struct AlterClusterReplicaRenamePlan {
1281 pub cluster_id: ClusterId,
1282 pub replica_id: ReplicaId,
1283 pub name: QualifiedReplica,
1284 pub to_name: String,
1285}
1286
1287#[derive(Debug)]
1288pub struct AlterItemRenamePlan {
1289 pub id: CatalogItemId,
1290 pub current_full_name: FullItemName,
1291 pub to_name: String,
1292 pub object_type: ObjectType,
1293}
1294
1295#[derive(Debug)]
1296pub struct AlterSchemaRenamePlan {
1297 pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1298 pub new_schema_name: String,
1299}
1300
1301#[derive(Debug)]
1302pub struct AlterSchemaSwapPlan {
1303 pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1304 pub schema_a_name: String,
1305 pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1306 pub schema_b_name: String,
1307 pub name_temp: String,
1308}
1309
1310#[derive(Debug)]
1311pub struct AlterClusterSwapPlan {
1312 pub id_a: ClusterId,
1313 pub id_b: ClusterId,
1314 pub name_a: String,
1315 pub name_b: String,
1316 pub name_temp: String,
1317}
1318
1319#[derive(Debug)]
1320pub struct AlterSecretPlan {
1321 pub id: CatalogItemId,
1322 pub secret_as: MirScalarExpr,
1323}
1324
1325#[derive(Debug)]
1326pub struct AlterSystemSetPlan {
1327 pub name: String,
1328 pub value: VariableValue,
1329}
1330
1331#[derive(Debug)]
1332pub struct AlterSystemResetPlan {
1333 pub name: String,
1334}
1335
1336#[derive(Debug)]
1337pub struct AlterSystemResetAllPlan {}
1338
1339#[derive(Debug)]
1340pub struct AlterRolePlan {
1341 pub id: RoleId,
1342 pub name: String,
1343 pub option: PlannedAlterRoleOption,
1344}
1345
1346#[derive(Debug)]
1347pub struct AlterOwnerPlan {
1348 pub id: ObjectId,
1349 pub object_type: ObjectType,
1350 pub new_owner: RoleId,
1351}
1352
1353#[derive(Debug)]
1354pub struct AlterTablePlan {
1355 pub relation_id: CatalogItemId,
1356 pub column_name: ColumnName,
1357 pub column_type: SqlColumnType,
1358 pub raw_sql_type: RawDataType,
1359}
1360
1361#[derive(Debug, Clone)]
1362pub struct AlterMaterializedViewApplyReplacementPlan {
1363 pub id: CatalogItemId,
1364 pub replacement_id: CatalogItemId,
1365}
1366
1367#[derive(Debug)]
1368pub struct DeclarePlan {
1369 pub name: String,
1370 pub stmt: Statement<Raw>,
1371 pub sql: String,
1372 pub params: Params,
1373}
1374
1375#[derive(Debug)]
1376pub struct FetchPlan {
1377 pub name: String,
1378 pub count: Option<FetchDirection>,
1379 pub timeout: ExecuteTimeout,
1380}
1381
1382#[derive(Debug)]
1383pub struct ClosePlan {
1384 pub name: String,
1385}
1386
1387#[derive(Debug)]
1388pub struct PreparePlan {
1389 pub name: String,
1390 pub stmt: Statement<Raw>,
1391 pub sql: String,
1392 pub desc: StatementDesc,
1393}
1394
1395#[derive(Debug)]
1396pub struct ExecutePlan {
1397 pub name: String,
1398 pub params: Params,
1399}
1400
1401#[derive(Debug)]
1402pub struct DeallocatePlan {
1403 pub name: Option<String>,
1404}
1405
1406#[derive(Debug)]
1407pub struct RaisePlan {
1408 pub severity: NoticeSeverity,
1409}
1410
1411#[derive(Debug)]
1412pub struct GrantRolePlan {
1413 pub role_ids: Vec<RoleId>,
1415 pub member_ids: Vec<RoleId>,
1417 pub grantor_id: RoleId,
1419}
1420
1421#[derive(Debug)]
1422pub struct RevokeRolePlan {
1423 pub role_ids: Vec<RoleId>,
1425 pub member_ids: Vec<RoleId>,
1427 pub grantor_id: RoleId,
1429}
1430
1431#[derive(Debug)]
1432pub struct UpdatePrivilege {
1433 pub acl_mode: AclMode,
1435 pub target_id: SystemObjectId,
1437 pub grantor: RoleId,
1439}
1440
1441#[derive(Debug)]
1442pub struct GrantPrivilegesPlan {
1443 pub update_privileges: Vec<UpdatePrivilege>,
1445 pub grantees: Vec<RoleId>,
1447}
1448
1449#[derive(Debug)]
1450pub struct RevokePrivilegesPlan {
1451 pub update_privileges: Vec<UpdatePrivilege>,
1453 pub revokees: Vec<RoleId>,
1455}
1456#[derive(Debug)]
1457pub struct AlterDefaultPrivilegesPlan {
1458 pub privilege_objects: Vec<DefaultPrivilegeObject>,
1460 pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1462 pub is_grant: bool,
1464}
1465
1466#[derive(Debug)]
1467pub struct ReassignOwnedPlan {
1468 pub old_roles: Vec<RoleId>,
1470 pub new_role: RoleId,
1472 pub reassign_ids: Vec<ObjectId>,
1474}
1475
1476#[derive(Debug)]
1477pub struct CommentPlan {
1478 pub object_id: CommentObjectId,
1480 pub sub_component: Option<usize>,
1484 pub comment: Option<String>,
1486}
1487
1488#[derive(Clone, Debug)]
1489pub enum TableDataSource {
1490 TableWrites { defaults: Vec<Expr<Aug>> },
1492
1493 DataSource {
1496 desc: DataSourceDesc,
1497 timeline: Timeline,
1498 },
1499}
1500
1501#[derive(Clone, Debug)]
1502pub struct Table {
1503 pub create_sql: String,
1504 pub desc: VersionedRelationDesc,
1505 pub temporary: bool,
1506 pub compaction_window: Option<CompactionWindow>,
1507 pub data_source: TableDataSource,
1508}
1509
1510#[derive(Clone, Debug)]
1511pub struct Source {
1512 pub create_sql: String,
1513 pub data_source: DataSourceDesc,
1514 pub desc: RelationDesc,
1515 pub compaction_window: Option<CompactionWindow>,
1516}
1517
1518#[derive(Debug, Clone)]
1519pub enum DataSourceDesc {
1520 Ingestion(SourceDesc<ReferencedConnection>),
1522 OldSyntaxIngestion {
1524 desc: SourceDesc<ReferencedConnection>,
1525 progress_subsource: CatalogItemId,
1528 data_config: SourceExportDataConfig<ReferencedConnection>,
1529 details: SourceExportDetails,
1530 },
1531 IngestionExport {
1534 ingestion_id: CatalogItemId,
1535 external_reference: UnresolvedItemName,
1536 details: SourceExportDetails,
1537 data_config: SourceExportDataConfig<ReferencedConnection>,
1538 },
1539 Progress,
1541 Webhook {
1543 validate_using: Option<WebhookValidation>,
1544 body_format: WebhookBodyFormat,
1545 headers: WebhookHeaders,
1546 cluster_id: Option<StorageInstanceId>,
1548 },
1549}
1550
1551#[derive(Clone, Debug, Serialize)]
1552pub struct WebhookValidation {
1553 pub expression: MirScalarExpr,
1555 pub relation_desc: RelationDesc,
1557 pub bodies: Vec<(usize, bool)>,
1559 pub headers: Vec<(usize, bool)>,
1561 pub secrets: Vec<WebhookValidationSecret>,
1563}
1564
1565impl WebhookValidation {
1566 const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1567
1568 pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1573 let WebhookValidation {
1574 expression,
1575 relation_desc,
1576 ..
1577 } = self;
1578
1579 let mut expression_ = expression.clone();
1581 let desc_ = relation_desc.clone();
1582 let reduce_task = mz_ore::task::spawn_blocking(
1583 || "webhook-validation-reduce",
1584 move || {
1585 expression_.reduce(&desc_.typ().column_types);
1586 expression_
1587 },
1588 );
1589
1590 match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1591 Ok(reduced_expr) => {
1592 *expression = reduced_expr;
1593 Ok(())
1594 }
1595 Err(_) => Err("timeout"),
1596 }
1597 }
1598}
1599
1600#[derive(Clone, Debug, Default, Serialize)]
1601pub struct WebhookHeaders {
1602 pub header_column: Option<WebhookHeaderFilters>,
1604 pub mapped_headers: BTreeMap<usize, (String, bool)>,
1606}
1607
1608impl WebhookHeaders {
1609 pub fn num_columns(&self) -> usize {
1611 let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1612 let mapped_headers = self.mapped_headers.len();
1613
1614 header_column + mapped_headers
1615 }
1616}
1617
1618#[derive(Clone, Debug, Default, Serialize)]
1619pub struct WebhookHeaderFilters {
1620 pub block: BTreeSet<String>,
1621 pub allow: BTreeSet<String>,
1622}
1623
1624#[derive(Copy, Clone, Debug, Serialize, Arbitrary)]
1625pub enum WebhookBodyFormat {
1626 Json { array: bool },
1627 Bytes,
1628 Text,
1629}
1630
1631impl From<WebhookBodyFormat> for SqlScalarType {
1632 fn from(value: WebhookBodyFormat) -> Self {
1633 match value {
1634 WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1635 WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1636 WebhookBodyFormat::Text => SqlScalarType::String,
1637 }
1638 }
1639}
1640
1641#[derive(Clone, Debug, Serialize)]
1642pub struct WebhookValidationSecret {
1643 pub id: CatalogItemId,
1645 pub column_idx: usize,
1647 pub use_bytes: bool,
1649}
1650
1651#[derive(Clone, Debug)]
1652pub struct Connection {
1653 pub create_sql: String,
1654 pub details: ConnectionDetails,
1655}
1656
1657#[derive(Clone, Debug, Serialize)]
1658pub enum ConnectionDetails {
1659 Kafka(KafkaConnection<ReferencedConnection>),
1660 Csr(CsrConnection<ReferencedConnection>),
1661 Postgres(PostgresConnection<ReferencedConnection>),
1662 Ssh {
1663 connection: SshConnection,
1664 key_1: SshKey,
1665 key_2: SshKey,
1666 },
1667 Aws(AwsConnection),
1668 AwsPrivatelink(AwsPrivatelinkConnection),
1669 MySql(MySqlConnection<ReferencedConnection>),
1670 SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1671 IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1672}
1673
1674impl ConnectionDetails {
1675 pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1676 match self {
1677 ConnectionDetails::Kafka(c) => {
1678 mz_storage_types::connections::Connection::Kafka(c.clone())
1679 }
1680 ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1681 ConnectionDetails::Postgres(c) => {
1682 mz_storage_types::connections::Connection::Postgres(c.clone())
1683 }
1684 ConnectionDetails::Ssh { connection, .. } => {
1685 mz_storage_types::connections::Connection::Ssh(connection.clone())
1686 }
1687 ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1688 ConnectionDetails::AwsPrivatelink(c) => {
1689 mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1690 }
1691 ConnectionDetails::MySql(c) => {
1692 mz_storage_types::connections::Connection::MySql(c.clone())
1693 }
1694 ConnectionDetails::SqlServer(c) => {
1695 mz_storage_types::connections::Connection::SqlServer(c.clone())
1696 }
1697 ConnectionDetails::IcebergCatalog(c) => {
1698 mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1699 }
1700 }
1701 }
1702}
1703
1704#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1705pub struct NetworkPolicyRule {
1706 pub name: String,
1707 pub action: NetworkPolicyRuleAction,
1708 pub address: PolicyAddress,
1709 pub direction: NetworkPolicyRuleDirection,
1710}
1711
1712#[derive(
1713 Debug,
1714 Clone,
1715 Serialize,
1716 Deserialize,
1717 PartialEq,
1718 Eq,
1719 Ord,
1720 PartialOrd,
1721 Hash
1722)]
1723pub enum NetworkPolicyRuleAction {
1724 Allow,
1725}
1726
1727impl std::fmt::Display for NetworkPolicyRuleAction {
1728 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1729 match self {
1730 Self::Allow => write!(f, "allow"),
1731 }
1732 }
1733}
1734impl TryFrom<&str> for NetworkPolicyRuleAction {
1735 type Error = PlanError;
1736 fn try_from(value: &str) -> Result<Self, Self::Error> {
1737 match value.to_uppercase().as_str() {
1738 "ALLOW" => Ok(Self::Allow),
1739 _ => Err(PlanError::Unstructured(
1740 "Allow is the only valid option".into(),
1741 )),
1742 }
1743 }
1744}
1745
1746#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1747pub enum NetworkPolicyRuleDirection {
1748 Ingress,
1749}
1750impl std::fmt::Display for NetworkPolicyRuleDirection {
1751 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1752 match self {
1753 Self::Ingress => write!(f, "ingress"),
1754 }
1755 }
1756}
1757impl TryFrom<&str> for NetworkPolicyRuleDirection {
1758 type Error = PlanError;
1759 fn try_from(value: &str) -> Result<Self, Self::Error> {
1760 match value.to_uppercase().as_str() {
1761 "INGRESS" => Ok(Self::Ingress),
1762 _ => Err(PlanError::Unstructured(
1763 "Ingress is the only valid option".into(),
1764 )),
1765 }
1766 }
1767}
1768
1769#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1770pub struct PolicyAddress(pub IpNet);
1771impl std::fmt::Display for PolicyAddress {
1772 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1773 write!(f, "{}", &self.0.to_string())
1774 }
1775}
1776impl From<String> for PolicyAddress {
1777 fn from(value: String) -> Self {
1778 Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1779 }
1780}
1781impl TryFrom<&str> for PolicyAddress {
1782 type Error = PlanError;
1783 fn try_from(value: &str) -> Result<Self, Self::Error> {
1784 let net = IpNet::from_str(value)
1785 .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1786 Ok(Self(net))
1787 }
1788}
1789
1790impl Serialize for PolicyAddress {
1791 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1792 where
1793 S: serde::Serializer,
1794 {
1795 serializer.serialize_str(&format!("{}", &self.0))
1796 }
1797}
1798
1799#[derive(Clone, Debug, Serialize)]
1800pub enum SshKey {
1801 PublicOnly(String),
1802 Both(SshKeyPair),
1803}
1804
1805impl SshKey {
1806 pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1807 match self {
1808 SshKey::PublicOnly(_) => None,
1809 SshKey::Both(key_pair) => Some(key_pair),
1810 }
1811 }
1812
1813 pub fn public_key(&self) -> String {
1814 match self {
1815 SshKey::PublicOnly(s) => s.into(),
1816 SshKey::Both(p) => p.ssh_public_key(),
1817 }
1818 }
1819}
1820
1821#[derive(Clone, Debug)]
1822pub struct Secret {
1823 pub create_sql: String,
1824 pub secret_as: MirScalarExpr,
1825}
1826
1827#[derive(Clone, Debug)]
1828pub struct Sink {
1829 pub create_sql: String,
1831 pub from: GlobalId,
1833 pub connection: StorageSinkConnection<ReferencedConnection>,
1835 pub envelope: SinkEnvelope,
1837 pub version: u64,
1838 pub commit_interval: Option<Duration>,
1839}
1840
1841#[derive(Clone, Debug)]
1842pub struct View {
1843 pub create_sql: String,
1845 pub expr: HirRelationExpr,
1847 pub dependencies: DependencyIds,
1849 pub column_names: Vec<ColumnName>,
1851 pub temporary: bool,
1853}
1854
1855#[derive(Clone, Debug)]
1856pub struct MaterializedView {
1857 pub create_sql: String,
1859 pub expr: HirRelationExpr,
1861 pub dependencies: DependencyIds,
1863 pub column_names: Vec<ColumnName>,
1865 pub replacement_target: Option<CatalogItemId>,
1866 pub cluster_id: ClusterId,
1868 pub non_null_assertions: Vec<usize>,
1869 pub compaction_window: Option<CompactionWindow>,
1870 pub refresh_schedule: Option<RefreshSchedule>,
1871 pub as_of: Option<Timestamp>,
1872}
1873
1874#[derive(Clone, Debug)]
1875pub struct Index {
1876 pub create_sql: String,
1878 pub on: GlobalId,
1880 pub keys: Vec<mz_expr::MirScalarExpr>,
1881 pub compaction_window: Option<CompactionWindow>,
1882 pub cluster_id: ClusterId,
1883}
1884
1885#[derive(Clone, Debug)]
1886pub struct Type {
1887 pub create_sql: String,
1888 pub inner: CatalogType<IdReference>,
1889}
1890
1891#[derive(Deserialize, Clone, Debug, PartialEq)]
1893pub enum QueryWhen {
1894 Immediately,
1897 FreshestTableWrite,
1900 AtTimestamp(Timestamp),
1905 AtLeastTimestamp(Timestamp),
1908}
1909
1910impl QueryWhen {
1911 pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1913 match self {
1914 QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1915 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1916 }
1917 }
1918 pub fn constrains_upper(&self) -> bool {
1922 match self {
1923 QueryWhen::AtTimestamp(_) => true,
1924 QueryWhen::AtLeastTimestamp(_)
1925 | QueryWhen::Immediately
1926 | QueryWhen::FreshestTableWrite => false,
1927 }
1928 }
1929 pub fn advance_to_since(&self) -> bool {
1931 match self {
1932 QueryWhen::Immediately
1933 | QueryWhen::AtLeastTimestamp(_)
1934 | QueryWhen::FreshestTableWrite => true,
1935 QueryWhen::AtTimestamp(_) => false,
1936 }
1937 }
1938 pub fn can_advance_to_upper(&self) -> bool {
1940 match self {
1941 QueryWhen::Immediately => true,
1942 QueryWhen::FreshestTableWrite
1943 | QueryWhen::AtTimestamp(_)
1944 | QueryWhen::AtLeastTimestamp(_) => false,
1945 }
1946 }
1947
1948 pub fn can_advance_to_timeline_ts(&self) -> bool {
1950 match self {
1951 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1952 QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1953 }
1954 }
1955 pub fn must_advance_to_timeline_ts(&self) -> bool {
1957 match self {
1958 QueryWhen::FreshestTableWrite => true,
1959 QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1960 false
1961 }
1962 }
1963 }
1964 pub fn is_transactional(&self) -> bool {
1966 match self {
1967 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1968 QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1969 }
1970 }
1971}
1972
1973#[derive(Debug, Copy, Clone)]
1974pub enum MutationKind {
1975 Insert,
1976 Update,
1977 Delete,
1978}
1979
1980#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1981pub enum CopyFormat {
1982 Text,
1983 Csv,
1984 Binary,
1985 Parquet,
1986}
1987
1988#[derive(Debug, Copy, Clone)]
1989pub enum ExecuteTimeout {
1990 None,
1991 Seconds(f64),
1992 WaitOnce,
1993}
1994
1995#[derive(Clone, Debug)]
1996pub enum IndexOption {
1997 RetainHistory(CompactionWindow),
1999}
2000
2001#[derive(Clone, Debug)]
2002pub enum TableOption {
2003 RetainHistory(CompactionWindow),
2005}
2006
2007#[derive(Clone, Debug)]
2008pub struct PlanClusterOption {
2009 pub availability_zones: AlterOptionParameter<Vec<String>>,
2010 pub introspection_debugging: AlterOptionParameter<bool>,
2011 pub introspection_interval: AlterOptionParameter<OptionalDuration>,
2012 pub managed: AlterOptionParameter<bool>,
2013 pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
2014 pub replication_factor: AlterOptionParameter<u32>,
2015 pub size: AlterOptionParameter,
2016 pub schedule: AlterOptionParameter<ClusterSchedule>,
2017 pub workload_class: AlterOptionParameter<Option<String>>,
2018}
2019
2020impl Default for PlanClusterOption {
2021 fn default() -> Self {
2022 Self {
2023 availability_zones: AlterOptionParameter::Unchanged,
2024 introspection_debugging: AlterOptionParameter::Unchanged,
2025 introspection_interval: AlterOptionParameter::Unchanged,
2026 managed: AlterOptionParameter::Unchanged,
2027 replicas: AlterOptionParameter::Unchanged,
2028 replication_factor: AlterOptionParameter::Unchanged,
2029 size: AlterOptionParameter::Unchanged,
2030 schedule: AlterOptionParameter::Unchanged,
2031 workload_class: AlterOptionParameter::Unchanged,
2032 }
2033 }
2034}
2035
2036#[derive(Clone, Debug, PartialEq, Eq)]
2037pub enum AlterClusterPlanStrategy {
2038 None,
2039 For(Duration),
2040 UntilReady {
2041 on_timeout: OnTimeoutAction,
2042 timeout: Duration,
2043 },
2044}
2045
2046#[derive(Clone, Debug, PartialEq, Eq)]
2047pub enum OnTimeoutAction {
2048 Commit,
2049 Rollback,
2050}
2051
2052impl Default for OnTimeoutAction {
2053 fn default() -> Self {
2054 Self::Commit
2055 }
2056}
2057
2058impl TryFrom<&str> for OnTimeoutAction {
2059 type Error = PlanError;
2060 fn try_from(value: &str) -> Result<Self, Self::Error> {
2061 match value.to_uppercase().as_str() {
2062 "COMMIT" => Ok(Self::Commit),
2063 "ROLLBACK" => Ok(Self::Rollback),
2064 _ => Err(PlanError::Unstructured(
2065 "Valid options are COMMIT, ROLLBACK".into(),
2066 )),
2067 }
2068 }
2069}
2070
2071impl AlterClusterPlanStrategy {
2072 pub fn is_none(&self) -> bool {
2073 matches!(self, Self::None)
2074 }
2075 pub fn is_some(&self) -> bool {
2076 !matches!(self, Self::None)
2077 }
2078}
2079
2080impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2081 type Error = PlanError;
2082
2083 fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2084 Ok(match value.wait {
2085 Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2086 Some(ClusterAlterOptionValue::UntilReady(options)) => {
2087 let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2088 Self::UntilReady {
2089 timeout: match extracted.timeout {
2090 Some(d) => d,
2091 None => Err(PlanError::UntilReadyTimeoutRequired)?,
2092 },
2093 on_timeout: match extracted.on_timeout {
2094 Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2095 PlanError::InvalidOptionValue {
2096 option_name: "ON TIMEOUT".into(),
2097 err: Box::new(e),
2098 }
2099 })?,
2100 None => OnTimeoutAction::default(),
2101 },
2102 }
2103 }
2104 None => Self::None,
2105 })
2106 }
2107}
2108
2109#[derive(Debug, Clone)]
2111pub struct Params {
2112 pub datums: Row,
2114 pub execute_types: Vec<SqlScalarType>,
2116 pub expected_types: Vec<SqlScalarType>,
2118}
2119
2120impl Params {
2121 pub fn empty() -> Params {
2123 Params {
2124 datums: Row::pack_slice(&[]),
2125 execute_types: vec![],
2126 expected_types: vec![],
2127 }
2128 }
2129}
2130
2131#[derive(
2133 Ord,
2134 PartialOrd,
2135 Clone,
2136 Debug,
2137 Eq,
2138 PartialEq,
2139 Serialize,
2140 Deserialize,
2141 Hash,
2142 Copy
2143)]
2144pub struct PlanContext {
2145 pub wall_time: DateTime<Utc>,
2146 pub ignore_if_exists_errors: bool,
2147}
2148
2149impl PlanContext {
2150 pub fn new(wall_time: DateTime<Utc>) -> Self {
2151 Self {
2152 wall_time,
2153 ignore_if_exists_errors: false,
2154 }
2155 }
2156
2157 pub fn zero() -> Self {
2161 PlanContext {
2162 wall_time: now::to_datetime(NOW_ZERO()),
2163 ignore_if_exists_errors: false,
2164 }
2165 }
2166
2167 pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2168 self.ignore_if_exists_errors = value;
2169 self
2170 }
2171}