1use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{CollectionPlan, ColumnOrder, MapFilterProject, MirScalarExpr, RowSetFinishing};
41use mz_ore::now::{self, NOW_ZERO};
42use mz_pgcopy::CopyFormatParams;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
44use mz_repr::explain::{ExplainConfig, ExplainFormat};
45use mz_repr::network_policy_id::NetworkPolicyId;
46use mz_repr::optimize::OptimizerFeatureOverrides;
47use mz_repr::refresh_schedule::RefreshSchedule;
48use mz_repr::role_id::RoleId;
49use mz_repr::{
50 CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, ReprColumnType, Row,
51 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
52};
53use mz_sql_parser::ast::{
54 AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
55 RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
56 Value, WithOptionValue,
57};
58use mz_ssh_util::keys::SshKeyPair;
59use mz_storage_types::connections::aws::AwsConnection;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::connections::{
62 AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
63 MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
64};
65use mz_storage_types::instances::StorageInstanceId;
66use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
67use mz_storage_types::sources::{
68 SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
69};
70use proptest_derive::Arbitrary;
71use serde::{Deserialize, Serialize};
72
73use crate::ast::{
74 ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
75 TransactionAccessMode,
76};
77use crate::catalog::{
78 CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
79 RoleAttributesRaw,
80};
81use crate::names::{
82 Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
83 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
84};
85
86pub(crate) mod error;
87pub(crate) mod explain;
88pub(crate) mod hir;
89pub(crate) mod literal;
90pub(crate) mod lowering;
91pub(crate) mod notice;
92pub(crate) mod plan_utils;
93pub(crate) mod query;
94pub(crate) mod scope;
95pub(crate) mod side_effecting_func;
96pub(crate) mod statement;
97pub(crate) mod transform_ast;
98pub(crate) mod transform_hir;
99pub(crate) mod typeconv;
100pub(crate) mod with_options;
101
102use crate::plan;
103use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
104use crate::plan::with_options::OptionalDuration;
105pub use error::PlanError;
106pub use explain::normalize_subqueries;
107pub use hir::{
108 AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
109 WindowExprType,
110};
111pub use lowering::Config as HirToMirConfig;
112pub use notice::PlanNotice;
113pub use query::{ExprContext, QueryContext, QueryLifetime};
114pub use scope::Scope;
115pub use side_effecting_func::SideEffectingFunc;
116pub use statement::ddl::{
117 AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
118 PlannedAlterRoleOption, PlannedRoleVariable, SqlServerConfigOptionExtracted,
119};
120pub use statement::{
121 StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
122 resolve_cluster_for_materialized_view,
123};
124
125use self::statement::ddl::ClusterAlterOptionExtracted;
126use self::with_options::TryFromValue;
127
128#[derive(Debug, EnumKind)]
130#[enum_kind(PlanKind)]
131pub enum Plan {
132 CreateConnection(CreateConnectionPlan),
133 CreateDatabase(CreateDatabasePlan),
134 CreateSchema(CreateSchemaPlan),
135 CreateRole(CreateRolePlan),
136 CreateCluster(CreateClusterPlan),
137 CreateClusterReplica(CreateClusterReplicaPlan),
138 CreateSource(CreateSourcePlan),
139 CreateSources(Vec<CreateSourcePlanBundle>),
140 CreateSecret(CreateSecretPlan),
141 CreateSink(CreateSinkPlan),
142 CreateTable(CreateTablePlan),
143 CreateView(CreateViewPlan),
144 CreateMaterializedView(CreateMaterializedViewPlan),
145 CreateContinualTask(CreateContinualTaskPlan),
146 CreateNetworkPolicy(CreateNetworkPolicyPlan),
147 CreateIndex(CreateIndexPlan),
148 CreateType(CreateTypePlan),
149 Comment(CommentPlan),
150 DiscardTemp,
151 DiscardAll,
152 DropObjects(DropObjectsPlan),
153 DropOwned(DropOwnedPlan),
154 EmptyQuery,
155 ShowAllVariables,
156 ShowCreate(ShowCreatePlan),
157 ShowColumns(ShowColumnsPlan),
158 ShowVariable(ShowVariablePlan),
159 InspectShard(InspectShardPlan),
160 SetVariable(SetVariablePlan),
161 ResetVariable(ResetVariablePlan),
162 SetTransaction(SetTransactionPlan),
163 StartTransaction(StartTransactionPlan),
164 CommitTransaction(CommitTransactionPlan),
165 AbortTransaction(AbortTransactionPlan),
166 Select(SelectPlan),
167 Subscribe(SubscribePlan),
168 CopyFrom(CopyFromPlan),
169 CopyTo(CopyToPlan),
170 ExplainPlan(ExplainPlanPlan),
171 ExplainPushdown(ExplainPushdownPlan),
172 ExplainTimestamp(ExplainTimestampPlan),
173 ExplainSinkSchema(ExplainSinkSchemaPlan),
174 Insert(InsertPlan),
175 AlterCluster(AlterClusterPlan),
176 AlterClusterSwap(AlterClusterSwapPlan),
177 AlterNoop(AlterNoopPlan),
178 AlterSetCluster(AlterSetClusterPlan),
179 AlterConnection(AlterConnectionPlan),
180 AlterSource(AlterSourcePlan),
181 AlterClusterRename(AlterClusterRenamePlan),
182 AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
183 AlterItemRename(AlterItemRenamePlan),
184 AlterSchemaRename(AlterSchemaRenamePlan),
185 AlterSchemaSwap(AlterSchemaSwapPlan),
186 AlterSecret(AlterSecretPlan),
187 AlterSink(AlterSinkPlan),
188 AlterSystemSet(AlterSystemSetPlan),
189 AlterSystemReset(AlterSystemResetPlan),
190 AlterSystemResetAll(AlterSystemResetAllPlan),
191 AlterRole(AlterRolePlan),
192 AlterOwner(AlterOwnerPlan),
193 AlterTableAddColumn(AlterTablePlan),
194 AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementPlan),
195 AlterNetworkPolicy(AlterNetworkPolicyPlan),
196 Declare(DeclarePlan),
197 Fetch(FetchPlan),
198 Close(ClosePlan),
199 ReadThenWrite(ReadThenWritePlan),
200 Prepare(PreparePlan),
201 Execute(ExecutePlan),
202 Deallocate(DeallocatePlan),
203 Raise(RaisePlan),
204 GrantRole(GrantRolePlan),
205 RevokeRole(RevokeRolePlan),
206 GrantPrivileges(GrantPrivilegesPlan),
207 RevokePrivileges(RevokePrivilegesPlan),
208 AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
209 ReassignOwned(ReassignOwnedPlan),
210 SideEffectingFunc(SideEffectingFunc),
211 ValidateConnection(ValidateConnectionPlan),
212 AlterRetainHistory(AlterRetainHistoryPlan),
213 AlterSourceTimestampInterval(AlterSourceTimestampIntervalPlan),
214}
215
216impl Plan {
217 pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
220 match stmt {
221 StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
222 StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
223 StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
224 StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
225 StatementKind::AlterObjectRename => &[
226 PlanKind::AlterClusterRename,
227 PlanKind::AlterClusterReplicaRename,
228 PlanKind::AlterItemRename,
229 PlanKind::AlterSchemaRename,
230 PlanKind::AlterNoop,
231 ],
232 StatementKind::AlterObjectSwap => &[
233 PlanKind::AlterClusterSwap,
234 PlanKind::AlterSchemaSwap,
235 PlanKind::AlterNoop,
236 ],
237 StatementKind::AlterRole => &[PlanKind::AlterRole],
238 StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
239 StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
240 StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
241 StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
242 StatementKind::AlterSource => &[
243 PlanKind::AlterNoop,
244 PlanKind::AlterSource,
245 PlanKind::AlterRetainHistory,
246 PlanKind::AlterSourceTimestampInterval,
247 ],
248 StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
249 StatementKind::AlterSystemResetAll => {
250 &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
251 }
252 StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
253 StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
254 StatementKind::AlterTableAddColumn => {
255 &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
256 }
257 StatementKind::AlterMaterializedViewApplyReplacement => &[
258 PlanKind::AlterNoop,
259 PlanKind::AlterMaterializedViewApplyReplacement,
260 ],
261 StatementKind::Close => &[PlanKind::Close],
262 StatementKind::Comment => &[PlanKind::Comment],
263 StatementKind::Commit => &[PlanKind::CommitTransaction],
264 StatementKind::Copy => &[
265 PlanKind::CopyFrom,
266 PlanKind::Select,
267 PlanKind::Subscribe,
268 PlanKind::CopyTo,
269 ],
270 StatementKind::CreateCluster => &[PlanKind::CreateCluster],
271 StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
272 StatementKind::CreateConnection => &[PlanKind::CreateConnection],
273 StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
274 StatementKind::CreateIndex => &[PlanKind::CreateIndex],
275 StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
276 StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
277 StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
278 StatementKind::CreateRole => &[PlanKind::CreateRole],
279 StatementKind::CreateSchema => &[PlanKind::CreateSchema],
280 StatementKind::CreateSecret => &[PlanKind::CreateSecret],
281 StatementKind::CreateSink => &[PlanKind::CreateSink],
282 StatementKind::CreateSource | StatementKind::CreateSubsource => {
283 &[PlanKind::CreateSource]
284 }
285 StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
286 StatementKind::CreateTable => &[PlanKind::CreateTable],
287 StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
288 StatementKind::CreateType => &[PlanKind::CreateType],
289 StatementKind::CreateView => &[PlanKind::CreateView],
290 StatementKind::Deallocate => &[PlanKind::Deallocate],
291 StatementKind::Declare => &[PlanKind::Declare],
292 StatementKind::Delete => &[PlanKind::ReadThenWrite],
293 StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
294 StatementKind::DropObjects => &[PlanKind::DropObjects],
295 StatementKind::DropOwned => &[PlanKind::DropOwned],
296 StatementKind::Execute => &[PlanKind::Execute],
297 StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
298 StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
299 StatementKind::ExplainAnalyzeObject => &[PlanKind::Select],
300 StatementKind::ExplainAnalyzeCluster => &[PlanKind::Select],
301 StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
302 StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
303 StatementKind::Fetch => &[PlanKind::Fetch],
304 StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
305 StatementKind::GrantRole => &[PlanKind::GrantRole],
306 StatementKind::Insert => &[PlanKind::Insert],
307 StatementKind::Prepare => &[PlanKind::Prepare],
308 StatementKind::Raise => &[PlanKind::Raise],
309 StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
310 StatementKind::ResetVariable => &[PlanKind::ResetVariable],
311 StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
312 StatementKind::RevokeRole => &[PlanKind::RevokeRole],
313 StatementKind::Rollback => &[PlanKind::AbortTransaction],
314 StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
315 StatementKind::SetTransaction => &[PlanKind::SetTransaction],
316 StatementKind::SetVariable => &[PlanKind::SetVariable],
317 StatementKind::Show => &[
318 PlanKind::Select,
319 PlanKind::ShowVariable,
320 PlanKind::ShowCreate,
321 PlanKind::ShowColumns,
322 PlanKind::ShowAllVariables,
323 PlanKind::InspectShard,
324 ],
325 StatementKind::StartTransaction => &[PlanKind::StartTransaction],
326 StatementKind::Subscribe => &[PlanKind::Subscribe],
327 StatementKind::Update => &[PlanKind::ReadThenWrite],
328 StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
329 StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
330 }
331 }
332
333 pub fn name(&self) -> &str {
335 match self {
336 Plan::CreateConnection(_) => "create connection",
337 Plan::CreateDatabase(_) => "create database",
338 Plan::CreateSchema(_) => "create schema",
339 Plan::CreateRole(_) => "create role",
340 Plan::CreateCluster(_) => "create cluster",
341 Plan::CreateClusterReplica(_) => "create cluster replica",
342 Plan::CreateSource(_) => "create source",
343 Plan::CreateSources(_) => "create source",
344 Plan::CreateSecret(_) => "create secret",
345 Plan::CreateSink(_) => "create sink",
346 Plan::CreateTable(_) => "create table",
347 Plan::CreateView(_) => "create view",
348 Plan::CreateMaterializedView(_) => "create materialized view",
349 Plan::CreateContinualTask(_) => "create continual task",
350 Plan::CreateIndex(_) => "create index",
351 Plan::CreateType(_) => "create type",
352 Plan::CreateNetworkPolicy(_) => "create network policy",
353 Plan::Comment(_) => "comment",
354 Plan::DiscardTemp => "discard temp",
355 Plan::DiscardAll => "discard all",
356 Plan::DropObjects(plan) => match plan.object_type {
357 ObjectType::Table => "drop table",
358 ObjectType::View => "drop view",
359 ObjectType::MaterializedView => "drop materialized view",
360 ObjectType::Source => "drop source",
361 ObjectType::Sink => "drop sink",
362 ObjectType::Index => "drop index",
363 ObjectType::Type => "drop type",
364 ObjectType::Role => "drop roles",
365 ObjectType::Cluster => "drop clusters",
366 ObjectType::ClusterReplica => "drop cluster replicas",
367 ObjectType::Secret => "drop secret",
368 ObjectType::Connection => "drop connection",
369 ObjectType::Database => "drop database",
370 ObjectType::Schema => "drop schema",
371 ObjectType::Func => "drop function",
372 ObjectType::ContinualTask => "drop continual task",
373 ObjectType::NetworkPolicy => "drop network policy",
374 },
375 Plan::DropOwned(_) => "drop owned",
376 Plan::EmptyQuery => "do nothing",
377 Plan::ShowAllVariables => "show all variables",
378 Plan::ShowCreate(_) => "show create",
379 Plan::ShowColumns(_) => "show columns",
380 Plan::ShowVariable(_) => "show variable",
381 Plan::InspectShard(_) => "inspect shard",
382 Plan::SetVariable(_) => "set variable",
383 Plan::ResetVariable(_) => "reset variable",
384 Plan::SetTransaction(_) => "set transaction",
385 Plan::StartTransaction(_) => "start transaction",
386 Plan::CommitTransaction(_) => "commit",
387 Plan::AbortTransaction(_) => "abort",
388 Plan::Select(_) => "select",
389 Plan::Subscribe(_) => "subscribe",
390 Plan::CopyFrom(_) => "copy from",
391 Plan::CopyTo(_) => "copy to",
392 Plan::ExplainPlan(_) => "explain plan",
393 Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
394 Plan::ExplainTimestamp(_) => "explain timestamp",
395 Plan::ExplainSinkSchema(_) => "explain schema",
396 Plan::Insert(_) => "insert",
397 Plan::AlterNoop(plan) => match plan.object_type {
398 ObjectType::Table => "alter table",
399 ObjectType::View => "alter view",
400 ObjectType::MaterializedView => "alter materialized view",
401 ObjectType::Source => "alter source",
402 ObjectType::Sink => "alter sink",
403 ObjectType::Index => "alter index",
404 ObjectType::Type => "alter type",
405 ObjectType::Role => "alter role",
406 ObjectType::Cluster => "alter cluster",
407 ObjectType::ClusterReplica => "alter cluster replica",
408 ObjectType::Secret => "alter secret",
409 ObjectType::Connection => "alter connection",
410 ObjectType::Database => "alter database",
411 ObjectType::Schema => "alter schema",
412 ObjectType::Func => "alter function",
413 ObjectType::ContinualTask => "alter continual task",
414 ObjectType::NetworkPolicy => "alter network policy",
415 },
416 Plan::AlterCluster(_) => "alter cluster",
417 Plan::AlterClusterRename(_) => "alter cluster rename",
418 Plan::AlterClusterSwap(_) => "alter cluster swap",
419 Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
420 Plan::AlterSetCluster(_) => "alter set cluster",
421 Plan::AlterConnection(_) => "alter connection",
422 Plan::AlterSource(_) => "alter source",
423 Plan::AlterItemRename(_) => "rename item",
424 Plan::AlterSchemaRename(_) => "alter rename schema",
425 Plan::AlterSchemaSwap(_) => "alter swap schema",
426 Plan::AlterSecret(_) => "alter secret",
427 Plan::AlterSink(_) => "alter sink",
428 Plan::AlterSystemSet(_) => "alter system",
429 Plan::AlterSystemReset(_) => "alter system",
430 Plan::AlterSystemResetAll(_) => "alter system",
431 Plan::AlterRole(_) => "alter role",
432 Plan::AlterNetworkPolicy(_) => "alter network policy",
433 Plan::AlterOwner(plan) => match plan.object_type {
434 ObjectType::Table => "alter table owner",
435 ObjectType::View => "alter view owner",
436 ObjectType::MaterializedView => "alter materialized view owner",
437 ObjectType::Source => "alter source owner",
438 ObjectType::Sink => "alter sink owner",
439 ObjectType::Index => "alter index owner",
440 ObjectType::Type => "alter type owner",
441 ObjectType::Role => "alter role owner",
442 ObjectType::Cluster => "alter cluster owner",
443 ObjectType::ClusterReplica => "alter cluster replica owner",
444 ObjectType::Secret => "alter secret owner",
445 ObjectType::Connection => "alter connection owner",
446 ObjectType::Database => "alter database owner",
447 ObjectType::Schema => "alter schema owner",
448 ObjectType::Func => "alter function owner",
449 ObjectType::ContinualTask => "alter continual task owner",
450 ObjectType::NetworkPolicy => "alter network policy owner",
451 },
452 Plan::AlterTableAddColumn(_) => "alter table add column",
453 Plan::AlterMaterializedViewApplyReplacement(_) => {
454 "alter materialized view apply replacement"
455 }
456 Plan::Declare(_) => "declare",
457 Plan::Fetch(_) => "fetch",
458 Plan::Close(_) => "close",
459 Plan::ReadThenWrite(plan) => match plan.kind {
460 MutationKind::Insert => "insert into select",
461 MutationKind::Update => "update",
462 MutationKind::Delete => "delete",
463 },
464 Plan::Prepare(_) => "prepare",
465 Plan::Execute(_) => "execute",
466 Plan::Deallocate(_) => "deallocate",
467 Plan::Raise(_) => "raise",
468 Plan::GrantRole(_) => "grant role",
469 Plan::RevokeRole(_) => "revoke role",
470 Plan::GrantPrivileges(_) => "grant privilege",
471 Plan::RevokePrivileges(_) => "revoke privilege",
472 Plan::AlterDefaultPrivileges(_) => "alter default privileges",
473 Plan::ReassignOwned(_) => "reassign owned",
474 Plan::SideEffectingFunc(_) => "side effecting func",
475 Plan::ValidateConnection(_) => "validate connection",
476 Plan::AlterRetainHistory(_) => "alter retain history",
477 Plan::AlterSourceTimestampInterval(_) => "alter source timestamp interval",
478 }
479 }
480
481 pub fn allowed_in_read_only(&self) -> bool {
487 match self {
488 Plan::SetVariable(_) => true,
491 Plan::ResetVariable(_) => true,
492 Plan::SetTransaction(_) => true,
493 Plan::StartTransaction(_) => true,
494 Plan::CommitTransaction(_) => true,
495 Plan::AbortTransaction(_) => true,
496 Plan::Select(_) => true,
497 Plan::EmptyQuery => true,
498 Plan::ShowAllVariables => true,
499 Plan::ShowCreate(_) => true,
500 Plan::ShowColumns(_) => true,
501 Plan::ShowVariable(_) => true,
502 Plan::InspectShard(_) => true,
503 Plan::Subscribe(_) => true,
504 Plan::CopyTo(_) => true,
505 Plan::ExplainPlan(_) => true,
506 Plan::ExplainPushdown(_) => true,
507 Plan::ExplainTimestamp(_) => true,
508 Plan::ExplainSinkSchema(_) => true,
509 Plan::ValidateConnection(_) => true,
510 _ => false,
511 }
512 }
513}
514
515#[derive(Debug)]
516pub struct StartTransactionPlan {
517 pub access: Option<TransactionAccessMode>,
518 pub isolation_level: Option<TransactionIsolationLevel>,
519}
520
521#[derive(Debug)]
522pub enum TransactionType {
523 Explicit,
524 Implicit,
525}
526
527impl TransactionType {
528 pub fn is_explicit(&self) -> bool {
529 matches!(self, TransactionType::Explicit)
530 }
531
532 pub fn is_implicit(&self) -> bool {
533 matches!(self, TransactionType::Implicit)
534 }
535}
536
537#[derive(Debug)]
538pub struct CommitTransactionPlan {
539 pub transaction_type: TransactionType,
540}
541
542#[derive(Debug)]
543pub struct AbortTransactionPlan {
544 pub transaction_type: TransactionType,
545}
546
547#[derive(Debug)]
548pub struct CreateDatabasePlan {
549 pub name: String,
550 pub if_not_exists: bool,
551}
552
553#[derive(Debug)]
554pub struct CreateSchemaPlan {
555 pub database_spec: ResolvedDatabaseSpecifier,
556 pub schema_name: String,
557 pub if_not_exists: bool,
558}
559
560#[derive(Debug)]
561pub struct CreateRolePlan {
562 pub name: String,
563 pub attributes: RoleAttributesRaw,
564}
565
566#[derive(Debug, PartialEq, Eq, Clone)]
567pub struct CreateClusterPlan {
568 pub name: String,
569 pub variant: CreateClusterVariant,
570 pub workload_class: Option<String>,
571}
572
573#[derive(Debug, PartialEq, Eq, Clone)]
574pub enum CreateClusterVariant {
575 Managed(CreateClusterManagedPlan),
576 Unmanaged(CreateClusterUnmanagedPlan),
577}
578
579#[derive(Debug, PartialEq, Eq, Clone)]
580pub struct CreateClusterUnmanagedPlan {
581 pub replicas: Vec<(String, ReplicaConfig)>,
582}
583
584#[derive(Debug, PartialEq, Eq, Clone)]
585pub struct CreateClusterManagedPlan {
586 pub replication_factor: u32,
587 pub size: String,
588 pub availability_zones: Vec<String>,
589 pub compute: ComputeReplicaConfig,
590 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
591 pub schedule: ClusterSchedule,
592}
593
594#[derive(Debug)]
595pub struct CreateClusterReplicaPlan {
596 pub cluster_id: ClusterId,
597 pub name: String,
598 pub config: ReplicaConfig,
599}
600
601#[derive(
603 Clone,
604 Copy,
605 Debug,
606 Serialize,
607 Deserialize,
608 PartialOrd,
609 Ord,
610 PartialEq,
611 Eq
612)]
613pub struct ComputeReplicaIntrospectionConfig {
614 pub debugging: bool,
616 pub interval: Duration,
618}
619
620#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
621pub struct ComputeReplicaConfig {
622 pub introspection: Option<ComputeReplicaIntrospectionConfig>,
623}
624
625#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
626pub enum ReplicaConfig {
627 Unorchestrated {
628 storagectl_addrs: Vec<String>,
629 computectl_addrs: Vec<String>,
630 compute: ComputeReplicaConfig,
631 },
632 Orchestrated {
633 size: String,
634 availability_zone: Option<String>,
635 compute: ComputeReplicaConfig,
636 internal: bool,
637 billed_as: Option<String>,
638 },
639}
640
641#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
642pub enum ClusterSchedule {
643 Manual,
645 Refresh { hydration_time_estimate: Duration },
649}
650
651impl Default for ClusterSchedule {
652 fn default() -> Self {
653 ClusterSchedule::Manual
655 }
656}
657
658#[derive(Debug)]
659pub struct CreateSourcePlan {
660 pub name: QualifiedItemName,
661 pub source: Source,
662 pub if_not_exists: bool,
663 pub timeline: Timeline,
664 pub in_cluster: Option<ClusterId>,
666}
667
668#[derive(Clone, Debug, PartialEq, Eq)]
669pub struct SourceReferences {
670 pub updated_at: u64,
671 pub references: Vec<SourceReference>,
672}
673
674#[derive(Clone, Debug, PartialEq, Eq)]
677pub struct SourceReference {
678 pub name: String,
679 pub namespace: Option<String>,
680 pub columns: Vec<String>,
681}
682
683#[derive(Debug)]
685pub struct CreateSourcePlanBundle {
686 pub item_id: CatalogItemId,
688 pub global_id: GlobalId,
690 pub plan: CreateSourcePlan,
692 pub resolved_ids: ResolvedIds,
694 pub available_source_references: Option<SourceReferences>,
698}
699
700#[derive(Debug)]
701pub struct CreateConnectionPlan {
702 pub name: QualifiedItemName,
703 pub if_not_exists: bool,
704 pub connection: Connection,
705 pub validate: bool,
706}
707
708#[derive(Debug)]
709pub struct ValidateConnectionPlan {
710 pub id: CatalogItemId,
712 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
714}
715
716#[derive(Debug)]
717pub struct CreateSecretPlan {
718 pub name: QualifiedItemName,
719 pub secret: Secret,
720 pub if_not_exists: bool,
721}
722
723#[derive(Debug)]
724pub struct CreateSinkPlan {
725 pub name: QualifiedItemName,
726 pub sink: Sink,
727 pub with_snapshot: bool,
728 pub if_not_exists: bool,
729 pub in_cluster: ClusterId,
730}
731
732#[derive(Debug)]
733pub struct CreateTablePlan {
734 pub name: QualifiedItemName,
735 pub table: Table,
736 pub if_not_exists: bool,
737}
738
739#[derive(Debug, Clone)]
740pub struct CreateViewPlan {
741 pub name: QualifiedItemName,
742 pub view: View,
743 pub replace: Option<CatalogItemId>,
745 pub drop_ids: Vec<CatalogItemId>,
747 pub if_not_exists: bool,
748 pub ambiguous_columns: bool,
751}
752
753#[derive(Debug, Clone)]
754pub struct CreateMaterializedViewPlan {
755 pub name: QualifiedItemName,
756 pub materialized_view: MaterializedView,
757 pub replace: Option<CatalogItemId>,
759 pub drop_ids: Vec<CatalogItemId>,
761 pub if_not_exists: bool,
762 pub ambiguous_columns: bool,
765}
766
767#[derive(Debug, Clone)]
768pub struct CreateContinualTaskPlan {
769 pub name: QualifiedItemName,
770 pub placeholder_id: Option<mz_expr::LocalId>,
773 pub desc: RelationDesc,
774 pub input_id: GlobalId,
776 pub with_snapshot: bool,
777 pub continual_task: MaterializedView,
779}
780
781#[derive(Debug, Clone)]
782pub struct CreateNetworkPolicyPlan {
783 pub name: String,
784 pub rules: Vec<NetworkPolicyRule>,
785}
786
787#[derive(Debug, Clone)]
788pub struct AlterNetworkPolicyPlan {
789 pub id: NetworkPolicyId,
790 pub name: String,
791 pub rules: Vec<NetworkPolicyRule>,
792}
793
794#[derive(Debug, Clone)]
795pub struct CreateIndexPlan {
796 pub name: QualifiedItemName,
797 pub index: Index,
798 pub if_not_exists: bool,
799}
800
801#[derive(Debug)]
802pub struct CreateTypePlan {
803 pub name: QualifiedItemName,
804 pub typ: Type,
805}
806
807#[derive(Debug)]
808pub struct DropObjectsPlan {
809 pub referenced_ids: Vec<ObjectId>,
811 pub drop_ids: Vec<ObjectId>,
813 pub object_type: ObjectType,
816}
817
818#[derive(Debug)]
819pub struct DropOwnedPlan {
820 pub role_ids: Vec<RoleId>,
822 pub drop_ids: Vec<ObjectId>,
824 pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
826 pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
828}
829
830#[derive(Debug)]
831pub struct ShowVariablePlan {
832 pub name: String,
833}
834
835#[derive(Debug)]
836pub struct InspectShardPlan {
837 pub id: GlobalId,
839}
840
841#[derive(Debug)]
842pub struct SetVariablePlan {
843 pub name: String,
844 pub value: VariableValue,
845 pub local: bool,
846}
847
848#[derive(Debug)]
849pub enum VariableValue {
850 Default,
851 Values(Vec<String>),
852}
853
854#[derive(Debug)]
855pub struct ResetVariablePlan {
856 pub name: String,
857}
858
859#[derive(Debug)]
860pub struct SetTransactionPlan {
861 pub local: bool,
862 pub modes: Vec<TransactionMode>,
863}
864
865#[derive(Clone, Debug)]
867pub struct SelectPlan {
868 pub select: Option<Box<SelectStatement<Aug>>>,
871 pub source: HirRelationExpr,
873 pub when: QueryWhen,
875 pub finishing: RowSetFinishing,
877 pub copy_to: Option<CopyFormat>,
879}
880
881impl SelectPlan {
882 pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
883 let arity = typ.arity();
884 SelectPlan {
885 select: None,
886 source: HirRelationExpr::Constant { rows, typ },
887 when: QueryWhen::Immediately,
888 finishing: RowSetFinishing::trivial(arity),
889 copy_to: None,
890 }
891 }
892}
893
894#[derive(Debug, Clone)]
895pub enum SubscribeOutput {
896 Diffs,
897 WithinTimestampOrderBy {
898 order_by: Vec<ColumnOrder>,
900 },
901 EnvelopeUpsert {
902 order_by_keys: Vec<ColumnOrder>,
904 },
905 EnvelopeDebezium {
906 order_by_keys: Vec<ColumnOrder>,
908 },
909}
910
911impl SubscribeOutput {
912 pub fn row_order(&self) -> &[ColumnOrder] {
913 match self {
914 SubscribeOutput::Diffs => &[],
915 SubscribeOutput::WithinTimestampOrderBy { .. } => &[],
917 SubscribeOutput::EnvelopeUpsert { order_by_keys } => order_by_keys,
918 SubscribeOutput::EnvelopeDebezium { order_by_keys } => order_by_keys,
919 }
920 }
921}
922
923#[derive(Debug, Clone)]
924pub struct SubscribePlan {
925 pub from: SubscribeFrom,
926 pub with_snapshot: bool,
927 pub when: QueryWhen,
928 pub up_to: Option<Timestamp>,
929 pub copy_to: Option<CopyFormat>,
930 pub emit_progress: bool,
931 pub output: SubscribeOutput,
932}
933
934#[derive(Debug, Clone)]
935pub enum SubscribeFrom {
936 Id(GlobalId),
938 Query {
940 expr: HirRelationExpr,
941 desc: RelationDesc,
942 },
943}
944
945impl SubscribeFrom {
946 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
947 match self {
948 SubscribeFrom::Id(id) => BTreeSet::from([*id]),
949 SubscribeFrom::Query { expr, .. } => expr.depends_on(),
950 }
951 }
952
953 pub fn contains_temporal(&self) -> bool {
954 match self {
955 SubscribeFrom::Id(_) => false,
956 SubscribeFrom::Query { expr, .. } => expr
957 .contains_temporal()
958 .expect("Unexpected error in `visit_scalars` call"),
959 }
960 }
961}
962
963#[derive(Debug)]
964pub struct ShowCreatePlan {
965 pub id: ObjectId,
966 pub row: Row,
967}
968
969#[derive(Debug)]
970pub struct ShowColumnsPlan {
971 pub id: CatalogItemId,
972 pub select_plan: SelectPlan,
973 pub new_resolved_ids: ResolvedIds,
974}
975
976#[derive(Debug)]
977pub struct CopyFromPlan {
978 pub target_id: CatalogItemId,
980 pub target_name: String,
982 pub source: CopyFromSource,
984 pub columns: Vec<ColumnIndex>,
988 pub source_desc: RelationDesc,
990 pub mfp: MapFilterProject,
992 pub params: CopyFormatParams<'static>,
994 pub filter: Option<CopyFromFilter>,
996}
997
998#[derive(Debug)]
999pub enum CopyFromSource {
1000 Stdin,
1002 Url(HirScalarExpr),
1006 AwsS3 {
1008 uri: HirScalarExpr,
1010 connection: AwsConnection,
1012 connection_id: CatalogItemId,
1014 },
1015}
1016
1017#[derive(Debug)]
1018pub enum CopyFromFilter {
1019 Files(Vec<String>),
1020 Pattern(String),
1021}
1022
1023#[derive(Debug, Clone)]
1028pub struct CopyToPlan {
1029 pub select_plan: SelectPlan,
1031 pub desc: RelationDesc,
1032 pub to: HirScalarExpr,
1034 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
1035 pub connection_id: CatalogItemId,
1037 pub format: S3SinkFormat,
1038 pub max_file_size: u64,
1039}
1040
1041#[derive(Clone, Debug)]
1042pub struct ExplainPlanPlan {
1043 pub stage: ExplainStage,
1044 pub format: ExplainFormat,
1045 pub config: ExplainConfig,
1046 pub explainee: Explainee,
1047}
1048
1049#[derive(Clone, Debug)]
1051pub enum Explainee {
1052 View(CatalogItemId),
1054 MaterializedView(CatalogItemId),
1056 Index(CatalogItemId),
1058 ReplanView(CatalogItemId),
1060 ReplanMaterializedView(CatalogItemId),
1062 ReplanIndex(CatalogItemId),
1064 Statement(ExplaineeStatement),
1066}
1067
1068#[derive(Clone, Debug, EnumKind)]
1070#[enum_kind(ExplaineeStatementKind)]
1071pub enum ExplaineeStatement {
1072 Select {
1074 broken: bool,
1076 plan: plan::SelectPlan,
1077 desc: RelationDesc,
1078 },
1079 CreateView {
1081 broken: bool,
1083 plan: plan::CreateViewPlan,
1084 },
1085 CreateMaterializedView {
1087 broken: bool,
1089 plan: plan::CreateMaterializedViewPlan,
1090 },
1091 CreateIndex {
1093 broken: bool,
1095 plan: plan::CreateIndexPlan,
1096 },
1097 Subscribe {
1099 broken: bool,
1101 plan: plan::SubscribePlan,
1102 },
1103}
1104
1105impl ExplaineeStatement {
1106 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1107 match self {
1108 Self::Select { plan, .. } => plan.source.depends_on(),
1109 Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1110 Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1111 Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1112 Self::Subscribe { plan, .. } => plan.from.depends_on(),
1113 }
1114 }
1115
1116 pub fn broken(&self) -> bool {
1127 match self {
1128 Self::Select { broken, .. } => *broken,
1129 Self::CreateView { broken, .. } => *broken,
1130 Self::CreateMaterializedView { broken, .. } => *broken,
1131 Self::CreateIndex { broken, .. } => *broken,
1132 Self::Subscribe { broken, .. } => *broken,
1133 }
1134 }
1135}
1136
1137impl ExplaineeStatementKind {
1138 pub fn supports(&self, stage: &ExplainStage) -> bool {
1139 use ExplainStage::*;
1140 match self {
1141 Self::Select => true,
1142 Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1143 Self::CreateMaterializedView => true,
1144 Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1145 Self::Subscribe => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1148 }
1149 }
1150}
1151
1152impl std::fmt::Display for ExplaineeStatementKind {
1153 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1154 match self {
1155 Self::Select => write!(f, "SELECT"),
1156 Self::CreateView => write!(f, "CREATE VIEW"),
1157 Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1158 Self::CreateIndex => write!(f, "CREATE INDEX"),
1159 Self::Subscribe => write!(f, "SUBSCRIBE"),
1160 }
1161 }
1162}
1163
1164#[derive(Clone, Debug)]
1165pub struct ExplainPushdownPlan {
1166 pub explainee: Explainee,
1167}
1168
1169#[derive(Clone, Debug)]
1170pub struct ExplainTimestampPlan {
1171 pub format: ExplainFormat,
1172 pub raw_plan: HirRelationExpr,
1173 pub when: QueryWhen,
1174}
1175
1176#[derive(Debug)]
1177pub struct ExplainSinkSchemaPlan {
1178 pub sink_from: GlobalId,
1179 pub json_schema: String,
1180}
1181
1182#[derive(Debug)]
1183pub struct SendDiffsPlan {
1184 pub id: CatalogItemId,
1185 pub updates: Vec<(Row, Diff)>,
1186 pub kind: MutationKind,
1187 pub returning: Vec<(Row, NonZeroUsize)>,
1188 pub max_result_size: u64,
1189}
1190
1191#[derive(Debug)]
1192pub struct InsertPlan {
1193 pub id: CatalogItemId,
1194 pub values: HirRelationExpr,
1195 pub returning: Vec<mz_expr::MirScalarExpr>,
1196}
1197
1198#[derive(Debug)]
1199pub struct ReadThenWritePlan {
1200 pub id: CatalogItemId,
1201 pub selection: HirRelationExpr,
1202 pub finishing: RowSetFinishing,
1203 pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1204 pub kind: MutationKind,
1205 pub returning: Vec<mz_expr::MirScalarExpr>,
1206}
1207
1208#[derive(Debug)]
1210pub struct AlterNoopPlan {
1211 pub object_type: ObjectType,
1212}
1213
1214#[derive(Debug)]
1215pub struct AlterSetClusterPlan {
1216 pub id: CatalogItemId,
1217 pub set_cluster: ClusterId,
1218}
1219
1220#[derive(Debug)]
1221pub struct AlterRetainHistoryPlan {
1222 pub id: CatalogItemId,
1223 pub value: Option<Value>,
1224 pub window: CompactionWindow,
1225 pub object_type: ObjectType,
1226}
1227
1228#[derive(Debug)]
1229pub struct AlterSourceTimestampIntervalPlan {
1230 pub id: CatalogItemId,
1231 pub value: Option<Value>,
1232 pub interval: Duration,
1233}
1234
1235#[derive(Debug, Clone)]
1236
1237pub enum AlterOptionParameter<T = String> {
1238 Set(T),
1239 Reset,
1240 Unchanged,
1241}
1242
1243#[derive(Debug)]
1244pub enum AlterConnectionAction {
1245 RotateKeys,
1246 AlterOptions {
1247 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1248 drop_options: BTreeSet<ConnectionOptionName>,
1249 validate: bool,
1250 },
1251}
1252
1253#[derive(Debug)]
1254pub struct AlterConnectionPlan {
1255 pub id: CatalogItemId,
1256 pub action: AlterConnectionAction,
1257}
1258
1259#[derive(Debug)]
1260pub enum AlterSourceAction {
1261 AddSubsourceExports {
1262 subsources: Vec<CreateSourcePlanBundle>,
1263 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1264 },
1265 RefreshReferences {
1266 references: SourceReferences,
1267 },
1268}
1269
1270#[derive(Debug)]
1271pub struct AlterSourcePlan {
1272 pub item_id: CatalogItemId,
1273 pub ingestion_id: GlobalId,
1274 pub action: AlterSourceAction,
1275}
1276
1277#[derive(Debug, Clone)]
1278pub struct AlterSinkPlan {
1279 pub item_id: CatalogItemId,
1280 pub global_id: GlobalId,
1281 pub sink: Sink,
1282 pub with_snapshot: bool,
1283 pub in_cluster: ClusterId,
1284}
1285
1286#[derive(Debug, Clone)]
1287pub struct AlterClusterPlan {
1288 pub id: ClusterId,
1289 pub name: String,
1290 pub options: PlanClusterOption,
1291 pub strategy: AlterClusterPlanStrategy,
1292}
1293
1294#[derive(Debug)]
1295pub struct AlterClusterRenamePlan {
1296 pub id: ClusterId,
1297 pub name: String,
1298 pub to_name: String,
1299}
1300
1301#[derive(Debug)]
1302pub struct AlterClusterReplicaRenamePlan {
1303 pub cluster_id: ClusterId,
1304 pub replica_id: ReplicaId,
1305 pub name: QualifiedReplica,
1306 pub to_name: String,
1307}
1308
1309#[derive(Debug)]
1310pub struct AlterItemRenamePlan {
1311 pub id: CatalogItemId,
1312 pub current_full_name: FullItemName,
1313 pub to_name: String,
1314 pub object_type: ObjectType,
1315}
1316
1317#[derive(Debug)]
1318pub struct AlterSchemaRenamePlan {
1319 pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1320 pub new_schema_name: String,
1321}
1322
1323#[derive(Debug)]
1324pub struct AlterSchemaSwapPlan {
1325 pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1326 pub schema_a_name: String,
1327 pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1328 pub schema_b_name: String,
1329 pub name_temp: String,
1330}
1331
1332#[derive(Debug)]
1333pub struct AlterClusterSwapPlan {
1334 pub id_a: ClusterId,
1335 pub id_b: ClusterId,
1336 pub name_a: String,
1337 pub name_b: String,
1338 pub name_temp: String,
1339}
1340
1341#[derive(Debug)]
1342pub struct AlterSecretPlan {
1343 pub id: CatalogItemId,
1344 pub secret_as: MirScalarExpr,
1345}
1346
1347#[derive(Debug)]
1348pub struct AlterSystemSetPlan {
1349 pub name: String,
1350 pub value: VariableValue,
1351}
1352
1353#[derive(Debug)]
1354pub struct AlterSystemResetPlan {
1355 pub name: String,
1356}
1357
1358#[derive(Debug)]
1359pub struct AlterSystemResetAllPlan {}
1360
1361#[derive(Debug)]
1362pub struct AlterRolePlan {
1363 pub id: RoleId,
1364 pub name: String,
1365 pub option: PlannedAlterRoleOption,
1366}
1367
1368#[derive(Debug)]
1369pub struct AlterOwnerPlan {
1370 pub id: ObjectId,
1371 pub object_type: ObjectType,
1372 pub new_owner: RoleId,
1373}
1374
1375#[derive(Debug)]
1376pub struct AlterTablePlan {
1377 pub relation_id: CatalogItemId,
1378 pub column_name: ColumnName,
1379 pub column_type: SqlColumnType,
1380 pub raw_sql_type: RawDataType,
1381}
1382
1383#[derive(Debug, Clone)]
1384pub struct AlterMaterializedViewApplyReplacementPlan {
1385 pub id: CatalogItemId,
1386 pub replacement_id: CatalogItemId,
1387}
1388
1389#[derive(Debug)]
1390pub struct DeclarePlan {
1391 pub name: String,
1392 pub stmt: Statement<Raw>,
1393 pub sql: String,
1394 pub params: Params,
1395}
1396
1397#[derive(Debug)]
1398pub struct FetchPlan {
1399 pub name: String,
1400 pub count: Option<FetchDirection>,
1401 pub timeout: ExecuteTimeout,
1402}
1403
1404#[derive(Debug)]
1405pub struct ClosePlan {
1406 pub name: String,
1407}
1408
1409#[derive(Debug)]
1410pub struct PreparePlan {
1411 pub name: String,
1412 pub stmt: Statement<Raw>,
1413 pub sql: String,
1414 pub desc: StatementDesc,
1415}
1416
1417#[derive(Debug)]
1418pub struct ExecutePlan {
1419 pub name: String,
1420 pub params: Params,
1421}
1422
1423#[derive(Debug)]
1424pub struct DeallocatePlan {
1425 pub name: Option<String>,
1426}
1427
1428#[derive(Debug)]
1429pub struct RaisePlan {
1430 pub severity: NoticeSeverity,
1431}
1432
1433#[derive(Debug)]
1434pub struct GrantRolePlan {
1435 pub role_ids: Vec<RoleId>,
1437 pub member_ids: Vec<RoleId>,
1439 pub grantor_id: RoleId,
1441}
1442
1443#[derive(Debug)]
1444pub struct RevokeRolePlan {
1445 pub role_ids: Vec<RoleId>,
1447 pub member_ids: Vec<RoleId>,
1449 pub grantor_id: RoleId,
1451}
1452
1453#[derive(Debug)]
1454pub struct UpdatePrivilege {
1455 pub acl_mode: AclMode,
1457 pub target_id: SystemObjectId,
1459 pub grantor: RoleId,
1461}
1462
1463#[derive(Debug)]
1464pub struct GrantPrivilegesPlan {
1465 pub update_privileges: Vec<UpdatePrivilege>,
1467 pub grantees: Vec<RoleId>,
1469}
1470
1471#[derive(Debug)]
1472pub struct RevokePrivilegesPlan {
1473 pub update_privileges: Vec<UpdatePrivilege>,
1475 pub revokees: Vec<RoleId>,
1477}
1478#[derive(Debug)]
1479pub struct AlterDefaultPrivilegesPlan {
1480 pub privilege_objects: Vec<DefaultPrivilegeObject>,
1482 pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1484 pub is_grant: bool,
1486}
1487
1488#[derive(Debug)]
1489pub struct ReassignOwnedPlan {
1490 pub old_roles: Vec<RoleId>,
1492 pub new_role: RoleId,
1494 pub reassign_ids: Vec<ObjectId>,
1496}
1497
1498#[derive(Debug)]
1499pub struct CommentPlan {
1500 pub object_id: CommentObjectId,
1502 pub sub_component: Option<usize>,
1506 pub comment: Option<String>,
1508}
1509
1510#[derive(Clone, Debug)]
1511pub enum TableDataSource {
1512 TableWrites { defaults: Vec<Expr<Aug>> },
1514
1515 DataSource {
1518 desc: DataSourceDesc,
1519 timeline: Timeline,
1520 },
1521}
1522
1523#[derive(Clone, Debug)]
1524pub struct Table {
1525 pub create_sql: String,
1526 pub desc: VersionedRelationDesc,
1527 pub temporary: bool,
1528 pub compaction_window: Option<CompactionWindow>,
1529 pub data_source: TableDataSource,
1530}
1531
1532#[derive(Clone, Debug)]
1533pub struct Source {
1534 pub create_sql: String,
1535 pub data_source: DataSourceDesc,
1536 pub desc: RelationDesc,
1537 pub compaction_window: Option<CompactionWindow>,
1538}
1539
1540#[derive(Debug, Clone)]
1541pub enum DataSourceDesc {
1542 Ingestion(SourceDesc<ReferencedConnection>),
1544 OldSyntaxIngestion {
1546 desc: SourceDesc<ReferencedConnection>,
1547 progress_subsource: CatalogItemId,
1550 data_config: SourceExportDataConfig<ReferencedConnection>,
1551 details: SourceExportDetails,
1552 },
1553 IngestionExport {
1556 ingestion_id: CatalogItemId,
1557 external_reference: UnresolvedItemName,
1558 details: SourceExportDetails,
1559 data_config: SourceExportDataConfig<ReferencedConnection>,
1560 },
1561 Progress,
1563 Webhook {
1565 validate_using: Option<WebhookValidation>,
1566 body_format: WebhookBodyFormat,
1567 headers: WebhookHeaders,
1568 cluster_id: Option<StorageInstanceId>,
1570 },
1571}
1572
1573#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1574pub struct WebhookValidation {
1575 pub expression: MirScalarExpr,
1577 pub relation_desc: RelationDesc,
1579 pub bodies: Vec<(usize, bool)>,
1581 pub headers: Vec<(usize, bool)>,
1583 pub secrets: Vec<WebhookValidationSecret>,
1585}
1586
1587impl WebhookValidation {
1588 const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1589
1590 pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1595 let WebhookValidation {
1596 expression,
1597 relation_desc,
1598 ..
1599 } = self;
1600
1601 let mut expression_ = expression.clone();
1603 let desc_ = relation_desc.clone();
1604 let reduce_task = mz_ore::task::spawn_blocking(
1605 || "webhook-validation-reduce",
1606 move || {
1607 let repr_col_types: Vec<ReprColumnType> = desc_
1608 .typ()
1609 .column_types
1610 .iter()
1611 .map(ReprColumnType::from)
1612 .collect();
1613 expression_.reduce(&repr_col_types);
1614 expression_
1615 },
1616 );
1617
1618 match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1619 Ok(reduced_expr) => {
1620 *expression = reduced_expr;
1621 Ok(())
1622 }
1623 Err(_) => Err("timeout"),
1624 }
1625 }
1626}
1627
1628#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1629pub struct WebhookHeaders {
1630 pub header_column: Option<WebhookHeaderFilters>,
1632 pub mapped_headers: BTreeMap<usize, (String, bool)>,
1634}
1635
1636impl WebhookHeaders {
1637 pub fn num_columns(&self) -> usize {
1639 let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1640 let mapped_headers = self.mapped_headers.len();
1641
1642 header_column + mapped_headers
1643 }
1644}
1645
1646#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1647pub struct WebhookHeaderFilters {
1648 pub block: BTreeSet<String>,
1649 pub allow: BTreeSet<String>,
1650}
1651
1652#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Arbitrary)]
1653pub enum WebhookBodyFormat {
1654 Json { array: bool },
1655 Bytes,
1656 Text,
1657}
1658
1659impl From<WebhookBodyFormat> for SqlScalarType {
1660 fn from(value: WebhookBodyFormat) -> Self {
1661 match value {
1662 WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1663 WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1664 WebhookBodyFormat::Text => SqlScalarType::String,
1665 }
1666 }
1667}
1668
1669#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1670pub struct WebhookValidationSecret {
1671 pub id: CatalogItemId,
1673 pub column_idx: usize,
1675 pub use_bytes: bool,
1677}
1678
1679#[derive(Clone, Debug)]
1680pub struct Connection {
1681 pub create_sql: String,
1682 pub details: ConnectionDetails,
1683}
1684
1685#[derive(Clone, Debug, Serialize)]
1686pub enum ConnectionDetails {
1687 Kafka(KafkaConnection<ReferencedConnection>),
1688 Csr(CsrConnection<ReferencedConnection>),
1689 Postgres(PostgresConnection<ReferencedConnection>),
1690 Ssh {
1691 connection: SshConnection,
1692 key_1: SshKey,
1693 key_2: SshKey,
1694 },
1695 Aws(AwsConnection),
1696 AwsPrivatelink(AwsPrivatelinkConnection),
1697 MySql(MySqlConnection<ReferencedConnection>),
1698 SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1699 IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1700}
1701
1702impl ConnectionDetails {
1703 pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1704 match self {
1705 ConnectionDetails::Kafka(c) => {
1706 mz_storage_types::connections::Connection::Kafka(c.clone())
1707 }
1708 ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1709 ConnectionDetails::Postgres(c) => {
1710 mz_storage_types::connections::Connection::Postgres(c.clone())
1711 }
1712 ConnectionDetails::Ssh { connection, .. } => {
1713 mz_storage_types::connections::Connection::Ssh(connection.clone())
1714 }
1715 ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1716 ConnectionDetails::AwsPrivatelink(c) => {
1717 mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1718 }
1719 ConnectionDetails::MySql(c) => {
1720 mz_storage_types::connections::Connection::MySql(c.clone())
1721 }
1722 ConnectionDetails::SqlServer(c) => {
1723 mz_storage_types::connections::Connection::SqlServer(c.clone())
1724 }
1725 ConnectionDetails::IcebergCatalog(c) => {
1726 mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1727 }
1728 }
1729 }
1730}
1731
1732#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1733pub struct NetworkPolicyRule {
1734 pub name: String,
1735 pub action: NetworkPolicyRuleAction,
1736 pub address: PolicyAddress,
1737 pub direction: NetworkPolicyRuleDirection,
1738}
1739
1740#[derive(
1741 Debug,
1742 Clone,
1743 Serialize,
1744 Deserialize,
1745 PartialEq,
1746 Eq,
1747 Ord,
1748 PartialOrd,
1749 Hash
1750)]
1751pub enum NetworkPolicyRuleAction {
1752 Allow,
1753}
1754
1755impl std::fmt::Display for NetworkPolicyRuleAction {
1756 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1757 match self {
1758 Self::Allow => write!(f, "allow"),
1759 }
1760 }
1761}
1762impl TryFrom<&str> for NetworkPolicyRuleAction {
1763 type Error = PlanError;
1764 fn try_from(value: &str) -> Result<Self, Self::Error> {
1765 match value.to_uppercase().as_str() {
1766 "ALLOW" => Ok(Self::Allow),
1767 _ => Err(PlanError::Unstructured(
1768 "Allow is the only valid option".into(),
1769 )),
1770 }
1771 }
1772}
1773
1774#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1775pub enum NetworkPolicyRuleDirection {
1776 Ingress,
1777}
1778impl std::fmt::Display for NetworkPolicyRuleDirection {
1779 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1780 match self {
1781 Self::Ingress => write!(f, "ingress"),
1782 }
1783 }
1784}
1785impl TryFrom<&str> for NetworkPolicyRuleDirection {
1786 type Error = PlanError;
1787 fn try_from(value: &str) -> Result<Self, Self::Error> {
1788 match value.to_uppercase().as_str() {
1789 "INGRESS" => Ok(Self::Ingress),
1790 _ => Err(PlanError::Unstructured(
1791 "Ingress is the only valid option".into(),
1792 )),
1793 }
1794 }
1795}
1796
1797#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1798pub struct PolicyAddress(pub IpNet);
1799impl std::fmt::Display for PolicyAddress {
1800 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1801 write!(f, "{}", &self.0.to_string())
1802 }
1803}
1804impl From<String> for PolicyAddress {
1805 fn from(value: String) -> Self {
1806 Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1807 }
1808}
1809impl TryFrom<&str> for PolicyAddress {
1810 type Error = PlanError;
1811 fn try_from(value: &str) -> Result<Self, Self::Error> {
1812 let net = IpNet::from_str(value)
1813 .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1814 Ok(Self(net))
1815 }
1816}
1817
1818impl Serialize for PolicyAddress {
1819 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1820 where
1821 S: serde::Serializer,
1822 {
1823 serializer.serialize_str(&format!("{}", &self.0))
1824 }
1825}
1826
1827#[derive(Clone, Debug, Serialize)]
1828pub enum SshKey {
1829 PublicOnly(String),
1830 Both(SshKeyPair),
1831}
1832
1833impl SshKey {
1834 pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1835 match self {
1836 SshKey::PublicOnly(_) => None,
1837 SshKey::Both(key_pair) => Some(key_pair),
1838 }
1839 }
1840
1841 pub fn public_key(&self) -> String {
1842 match self {
1843 SshKey::PublicOnly(s) => s.into(),
1844 SshKey::Both(p) => p.ssh_public_key(),
1845 }
1846 }
1847}
1848
1849#[derive(Clone, Debug)]
1850pub struct Secret {
1851 pub create_sql: String,
1852 pub secret_as: MirScalarExpr,
1853}
1854
1855#[derive(Clone, Debug)]
1856pub struct Sink {
1857 pub create_sql: String,
1859 pub from: GlobalId,
1861 pub connection: StorageSinkConnection<ReferencedConnection>,
1863 pub envelope: SinkEnvelope,
1865 pub version: u64,
1866 pub commit_interval: Option<Duration>,
1867}
1868
1869#[derive(Clone, Debug)]
1870pub struct View {
1871 pub create_sql: String,
1873 pub expr: HirRelationExpr,
1875 pub dependencies: DependencyIds,
1877 pub column_names: Vec<ColumnName>,
1879 pub temporary: bool,
1881}
1882
1883#[derive(Clone, Debug)]
1884pub struct MaterializedView {
1885 pub create_sql: String,
1887 pub expr: HirRelationExpr,
1889 pub dependencies: DependencyIds,
1891 pub column_names: Vec<ColumnName>,
1893 pub replacement_target: Option<CatalogItemId>,
1894 pub cluster_id: ClusterId,
1896 pub target_replica: Option<ReplicaId>,
1898 pub non_null_assertions: Vec<usize>,
1899 pub compaction_window: Option<CompactionWindow>,
1900 pub refresh_schedule: Option<RefreshSchedule>,
1901 pub as_of: Option<Timestamp>,
1902}
1903
1904#[derive(Clone, Debug)]
1905pub struct Index {
1906 pub create_sql: String,
1908 pub on: GlobalId,
1910 pub keys: Vec<mz_expr::MirScalarExpr>,
1911 pub compaction_window: Option<CompactionWindow>,
1912 pub cluster_id: ClusterId,
1913}
1914
1915#[derive(Clone, Debug)]
1916pub struct Type {
1917 pub create_sql: String,
1918 pub inner: CatalogType<IdReference>,
1919}
1920
1921#[derive(Deserialize, Clone, Debug, PartialEq)]
1923pub enum QueryWhen {
1924 Immediately,
1927 FreshestTableWrite,
1930 AtTimestamp(Timestamp),
1935 AtLeastTimestamp(Timestamp),
1938}
1939
1940impl QueryWhen {
1941 pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1943 match self {
1944 QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1945 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1946 }
1947 }
1948 pub fn constrains_upper(&self) -> bool {
1952 match self {
1953 QueryWhen::AtTimestamp(_) => true,
1954 QueryWhen::AtLeastTimestamp(_)
1955 | QueryWhen::Immediately
1956 | QueryWhen::FreshestTableWrite => false,
1957 }
1958 }
1959 pub fn advance_to_since(&self) -> bool {
1961 match self {
1962 QueryWhen::Immediately
1963 | QueryWhen::AtLeastTimestamp(_)
1964 | QueryWhen::FreshestTableWrite => true,
1965 QueryWhen::AtTimestamp(_) => false,
1966 }
1967 }
1968 pub fn can_advance_to_upper(&self) -> bool {
1970 match self {
1971 QueryWhen::Immediately => true,
1972 QueryWhen::FreshestTableWrite
1973 | QueryWhen::AtTimestamp(_)
1974 | QueryWhen::AtLeastTimestamp(_) => false,
1975 }
1976 }
1977
1978 pub fn can_advance_to_timeline_ts(&self) -> bool {
1980 match self {
1981 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1982 QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1983 }
1984 }
1985 pub fn must_advance_to_timeline_ts(&self) -> bool {
1987 match self {
1988 QueryWhen::FreshestTableWrite => true,
1989 QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1990 false
1991 }
1992 }
1993 }
1994 pub fn is_transactional(&self) -> bool {
1996 match self {
1997 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1998 QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1999 }
2000 }
2001}
2002
2003#[derive(Debug, Copy, Clone)]
2004pub enum MutationKind {
2005 Insert,
2006 Update,
2007 Delete,
2008}
2009
2010#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
2011pub enum CopyFormat {
2012 Text,
2013 Csv,
2014 Binary,
2015 Parquet,
2016}
2017
2018#[derive(Debug, Copy, Clone)]
2019pub enum ExecuteTimeout {
2020 None,
2021 Seconds(f64),
2022 WaitOnce,
2023}
2024
2025#[derive(Clone, Debug)]
2026pub enum IndexOption {
2027 RetainHistory(CompactionWindow),
2029}
2030
2031#[derive(Clone, Debug)]
2032pub enum TableOption {
2033 RetainHistory(CompactionWindow),
2035}
2036
2037#[derive(Clone, Debug)]
2038pub struct PlanClusterOption {
2039 pub availability_zones: AlterOptionParameter<Vec<String>>,
2040 pub introspection_debugging: AlterOptionParameter<bool>,
2041 pub introspection_interval: AlterOptionParameter<OptionalDuration>,
2042 pub managed: AlterOptionParameter<bool>,
2043 pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
2044 pub replication_factor: AlterOptionParameter<u32>,
2045 pub size: AlterOptionParameter,
2046 pub schedule: AlterOptionParameter<ClusterSchedule>,
2047 pub workload_class: AlterOptionParameter<Option<String>>,
2048}
2049
2050impl Default for PlanClusterOption {
2051 fn default() -> Self {
2052 Self {
2053 availability_zones: AlterOptionParameter::Unchanged,
2054 introspection_debugging: AlterOptionParameter::Unchanged,
2055 introspection_interval: AlterOptionParameter::Unchanged,
2056 managed: AlterOptionParameter::Unchanged,
2057 replicas: AlterOptionParameter::Unchanged,
2058 replication_factor: AlterOptionParameter::Unchanged,
2059 size: AlterOptionParameter::Unchanged,
2060 schedule: AlterOptionParameter::Unchanged,
2061 workload_class: AlterOptionParameter::Unchanged,
2062 }
2063 }
2064}
2065
2066#[derive(Clone, Debug, PartialEq, Eq)]
2067pub enum AlterClusterPlanStrategy {
2068 None,
2069 For(Duration),
2070 UntilReady {
2071 on_timeout: OnTimeoutAction,
2072 timeout: Duration,
2073 },
2074}
2075
2076#[derive(Clone, Debug, PartialEq, Eq)]
2077pub enum OnTimeoutAction {
2078 Commit,
2079 Rollback,
2080}
2081
2082impl Default for OnTimeoutAction {
2083 fn default() -> Self {
2084 Self::Commit
2085 }
2086}
2087
2088impl TryFrom<&str> for OnTimeoutAction {
2089 type Error = PlanError;
2090 fn try_from(value: &str) -> Result<Self, Self::Error> {
2091 match value.to_uppercase().as_str() {
2092 "COMMIT" => Ok(Self::Commit),
2093 "ROLLBACK" => Ok(Self::Rollback),
2094 _ => Err(PlanError::Unstructured(
2095 "Valid options are COMMIT, ROLLBACK".into(),
2096 )),
2097 }
2098 }
2099}
2100
2101impl AlterClusterPlanStrategy {
2102 pub fn is_none(&self) -> bool {
2103 matches!(self, Self::None)
2104 }
2105 pub fn is_some(&self) -> bool {
2106 !matches!(self, Self::None)
2107 }
2108}
2109
2110impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2111 type Error = PlanError;
2112
2113 fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2114 Ok(match value.wait {
2115 Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2116 Some(ClusterAlterOptionValue::UntilReady(options)) => {
2117 let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2118 Self::UntilReady {
2119 timeout: match extracted.timeout {
2120 Some(d) => d,
2121 None => Err(PlanError::UntilReadyTimeoutRequired)?,
2122 },
2123 on_timeout: match extracted.on_timeout {
2124 Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2125 PlanError::InvalidOptionValue {
2126 option_name: "ON TIMEOUT".into(),
2127 err: Box::new(e),
2128 }
2129 })?,
2130 None => OnTimeoutAction::default(),
2131 },
2132 }
2133 }
2134 None => Self::None,
2135 })
2136 }
2137}
2138
2139#[derive(Debug, Clone)]
2141pub struct Params {
2142 pub datums: Row,
2144 pub execute_types: Vec<SqlScalarType>,
2146 pub expected_types: Vec<SqlScalarType>,
2148}
2149
2150impl Params {
2151 pub fn empty() -> Params {
2153 Params {
2154 datums: Row::pack_slice(&[]),
2155 execute_types: vec![],
2156 expected_types: vec![],
2157 }
2158 }
2159}
2160
2161#[derive(
2163 Ord,
2164 PartialOrd,
2165 Clone,
2166 Debug,
2167 Eq,
2168 PartialEq,
2169 Serialize,
2170 Deserialize,
2171 Hash,
2172 Copy
2173)]
2174pub struct PlanContext {
2175 pub wall_time: DateTime<Utc>,
2176 pub ignore_if_exists_errors: bool,
2177}
2178
2179impl PlanContext {
2180 pub fn new(wall_time: DateTime<Utc>) -> Self {
2181 Self {
2182 wall_time,
2183 ignore_if_exists_errors: false,
2184 }
2185 }
2186
2187 pub fn zero() -> Self {
2191 PlanContext {
2192 wall_time: now::to_datetime(NOW_ZERO()),
2193 ignore_if_exists_errors: false,
2194 }
2195 }
2196
2197 pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2198 self.ignore_if_exists_errors = value;
2199 self
2200 }
2201}