1use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{CollectionPlan, ColumnOrder, MapFilterProject, MirScalarExpr, RowSetFinishing};
41use mz_ore::now::{self, NOW_ZERO};
42use mz_pgcopy::CopyFormatParams;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
44use mz_repr::explain::{ExplainConfig, ExplainFormat};
45use mz_repr::network_policy_id::NetworkPolicyId;
46use mz_repr::optimize::OptimizerFeatureOverrides;
47use mz_repr::refresh_schedule::RefreshSchedule;
48use mz_repr::role_id::RoleId;
49use mz_repr::{
50 CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, ReprColumnType, Row,
51 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
52};
53use mz_sql_parser::ast::{
54 AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
55 RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
56 Value, WithOptionValue,
57};
58use mz_ssh_util::keys::SshKeyPair;
59use mz_storage_types::connections::aws::AwsConnection;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::connections::{
62 AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
63 MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
64};
65use mz_storage_types::instances::StorageInstanceId;
66use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
67use mz_storage_types::sources::{
68 SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
69};
70use proptest_derive::Arbitrary;
71use serde::{Deserialize, Serialize};
72
73use crate::ast::{
74 ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
75 TransactionAccessMode,
76};
77use crate::catalog::{
78 CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
79 RoleAttributesRaw,
80};
81use crate::names::{
82 Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
83 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
84};
85
86pub(crate) mod error;
87pub(crate) mod explain;
88pub(crate) mod hir;
89pub(crate) mod literal;
90pub(crate) mod lowering;
91pub(crate) mod notice;
92pub(crate) mod plan_utils;
93pub(crate) mod query;
94pub(crate) mod scope;
95pub(crate) mod side_effecting_func;
96pub(crate) mod statement;
97pub(crate) mod transform_ast;
98pub(crate) mod transform_hir;
99pub(crate) mod typeconv;
100pub(crate) mod with_options;
101
102use crate::plan;
103use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
104use crate::plan::with_options::OptionalDuration;
105pub use error::PlanError;
106pub use explain::normalize_subqueries;
107pub use hir::{
108 AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
109 WindowExprType,
110};
111pub use lowering::Config as HirToMirConfig;
112pub use notice::PlanNotice;
113pub use query::{ExprContext, QueryContext, QueryLifetime};
114pub use scope::Scope;
115pub use side_effecting_func::SideEffectingFunc;
116pub use statement::ddl::{
117 AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
118 PlannedAlterRoleOption, PlannedRoleAttributes, PlannedRoleVariable,
119 SqlServerConfigOptionExtracted,
120};
121pub use statement::{
122 StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
123 resolve_cluster_for_materialized_view,
124};
125
126use self::statement::ddl::ClusterAlterOptionExtracted;
127use self::with_options::TryFromValue;
128
129#[derive(Debug, EnumKind)]
131#[enum_kind(PlanKind)]
132pub enum Plan {
133 CreateConnection(CreateConnectionPlan),
134 CreateDatabase(CreateDatabasePlan),
135 CreateSchema(CreateSchemaPlan),
136 CreateRole(CreateRolePlan),
137 CreateCluster(CreateClusterPlan),
138 CreateClusterReplica(CreateClusterReplicaPlan),
139 CreateSource(CreateSourcePlan),
140 CreateSources(Vec<CreateSourcePlanBundle>),
141 CreateSecret(CreateSecretPlan),
142 CreateSink(CreateSinkPlan),
143 CreateTable(CreateTablePlan),
144 CreateView(CreateViewPlan),
145 CreateMaterializedView(CreateMaterializedViewPlan),
146 CreateContinualTask(CreateContinualTaskPlan),
147 CreateNetworkPolicy(CreateNetworkPolicyPlan),
148 CreateIndex(CreateIndexPlan),
149 CreateType(CreateTypePlan),
150 Comment(CommentPlan),
151 DiscardTemp,
152 DiscardAll,
153 DropObjects(DropObjectsPlan),
154 DropOwned(DropOwnedPlan),
155 EmptyQuery,
156 ShowAllVariables,
157 ShowCreate(ShowCreatePlan),
158 ShowColumns(ShowColumnsPlan),
159 ShowVariable(ShowVariablePlan),
160 InspectShard(InspectShardPlan),
161 SetVariable(SetVariablePlan),
162 ResetVariable(ResetVariablePlan),
163 SetTransaction(SetTransactionPlan),
164 StartTransaction(StartTransactionPlan),
165 CommitTransaction(CommitTransactionPlan),
166 AbortTransaction(AbortTransactionPlan),
167 Select(SelectPlan),
168 Subscribe(SubscribePlan),
169 CopyFrom(CopyFromPlan),
170 CopyTo(CopyToPlan),
171 ExplainPlan(ExplainPlanPlan),
172 ExplainPushdown(ExplainPushdownPlan),
173 ExplainTimestamp(ExplainTimestampPlan),
174 ExplainSinkSchema(ExplainSinkSchemaPlan),
175 Insert(InsertPlan),
176 AlterCluster(AlterClusterPlan),
177 AlterClusterSwap(AlterClusterSwapPlan),
178 AlterNoop(AlterNoopPlan),
179 AlterSetCluster(AlterSetClusterPlan),
180 AlterConnection(AlterConnectionPlan),
181 AlterSource(AlterSourcePlan),
182 AlterClusterRename(AlterClusterRenamePlan),
183 AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
184 AlterItemRename(AlterItemRenamePlan),
185 AlterSchemaRename(AlterSchemaRenamePlan),
186 AlterSchemaSwap(AlterSchemaSwapPlan),
187 AlterSecret(AlterSecretPlan),
188 AlterSink(AlterSinkPlan),
189 AlterSystemSet(AlterSystemSetPlan),
190 AlterSystemReset(AlterSystemResetPlan),
191 AlterSystemResetAll(AlterSystemResetAllPlan),
192 AlterRole(AlterRolePlan),
193 AlterOwner(AlterOwnerPlan),
194 AlterTableAddColumn(AlterTablePlan),
195 AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementPlan),
196 AlterNetworkPolicy(AlterNetworkPolicyPlan),
197 Declare(DeclarePlan),
198 Fetch(FetchPlan),
199 Close(ClosePlan),
200 ReadThenWrite(ReadThenWritePlan),
201 Prepare(PreparePlan),
202 Execute(ExecutePlan),
203 Deallocate(DeallocatePlan),
204 Raise(RaisePlan),
205 GrantRole(GrantRolePlan),
206 RevokeRole(RevokeRolePlan),
207 GrantPrivileges(GrantPrivilegesPlan),
208 RevokePrivileges(RevokePrivilegesPlan),
209 AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
210 ReassignOwned(ReassignOwnedPlan),
211 SideEffectingFunc(SideEffectingFunc),
212 ValidateConnection(ValidateConnectionPlan),
213 AlterRetainHistory(AlterRetainHistoryPlan),
214 AlterSourceTimestampInterval(AlterSourceTimestampIntervalPlan),
215}
216
217impl Plan {
218 pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
221 match stmt {
222 StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
223 StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
224 StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
225 StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
226 StatementKind::AlterObjectRename => &[
227 PlanKind::AlterClusterRename,
228 PlanKind::AlterClusterReplicaRename,
229 PlanKind::AlterItemRename,
230 PlanKind::AlterSchemaRename,
231 PlanKind::AlterNoop,
232 ],
233 StatementKind::AlterObjectSwap => &[
234 PlanKind::AlterClusterSwap,
235 PlanKind::AlterSchemaSwap,
236 PlanKind::AlterNoop,
237 ],
238 StatementKind::AlterRole => &[PlanKind::AlterRole],
239 StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
240 StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
241 StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
242 StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
243 StatementKind::AlterSource => &[
244 PlanKind::AlterNoop,
245 PlanKind::AlterSource,
246 PlanKind::AlterRetainHistory,
247 PlanKind::AlterSourceTimestampInterval,
248 ],
249 StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
250 StatementKind::AlterSystemResetAll => {
251 &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
252 }
253 StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
254 StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
255 StatementKind::AlterTableAddColumn => {
256 &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
257 }
258 StatementKind::AlterMaterializedViewApplyReplacement => &[
259 PlanKind::AlterNoop,
260 PlanKind::AlterMaterializedViewApplyReplacement,
261 ],
262 StatementKind::Close => &[PlanKind::Close],
263 StatementKind::Comment => &[PlanKind::Comment],
264 StatementKind::Commit => &[PlanKind::CommitTransaction],
265 StatementKind::Copy => &[
266 PlanKind::CopyFrom,
267 PlanKind::Select,
268 PlanKind::Subscribe,
269 PlanKind::CopyTo,
270 ],
271 StatementKind::CreateCluster => &[PlanKind::CreateCluster],
272 StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
273 StatementKind::CreateConnection => &[PlanKind::CreateConnection],
274 StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
275 StatementKind::CreateIndex => &[PlanKind::CreateIndex],
276 StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
277 StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
278 StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
279 StatementKind::CreateRole => &[PlanKind::CreateRole],
280 StatementKind::CreateSchema => &[PlanKind::CreateSchema],
281 StatementKind::CreateSecret => &[PlanKind::CreateSecret],
282 StatementKind::CreateSink => &[PlanKind::CreateSink],
283 StatementKind::CreateSource | StatementKind::CreateSubsource => {
284 &[PlanKind::CreateSource]
285 }
286 StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
287 StatementKind::CreateTable => &[PlanKind::CreateTable],
288 StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
289 StatementKind::CreateType => &[PlanKind::CreateType],
290 StatementKind::CreateView => &[PlanKind::CreateView],
291 StatementKind::Deallocate => &[PlanKind::Deallocate],
292 StatementKind::Declare => &[PlanKind::Declare],
293 StatementKind::Delete => &[PlanKind::ReadThenWrite],
294 StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
295 StatementKind::DropObjects => &[PlanKind::DropObjects],
296 StatementKind::DropOwned => &[PlanKind::DropOwned],
297 StatementKind::Execute => &[PlanKind::Execute],
298 StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
299 StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
300 StatementKind::ExplainAnalyzeObject => &[PlanKind::Select],
301 StatementKind::ExplainAnalyzeCluster => &[PlanKind::Select],
302 StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
303 StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
304 StatementKind::Fetch => &[PlanKind::Fetch],
305 StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
306 StatementKind::GrantRole => &[PlanKind::GrantRole],
307 StatementKind::Insert => &[PlanKind::Insert],
308 StatementKind::Prepare => &[PlanKind::Prepare],
309 StatementKind::Raise => &[PlanKind::Raise],
310 StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
311 StatementKind::ResetVariable => &[PlanKind::ResetVariable],
312 StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
313 StatementKind::RevokeRole => &[PlanKind::RevokeRole],
314 StatementKind::Rollback => &[PlanKind::AbortTransaction],
315 StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
316 StatementKind::SetTransaction => &[PlanKind::SetTransaction],
317 StatementKind::SetVariable => &[PlanKind::SetVariable],
318 StatementKind::Show => &[
319 PlanKind::Select,
320 PlanKind::ShowVariable,
321 PlanKind::ShowCreate,
322 PlanKind::ShowColumns,
323 PlanKind::ShowAllVariables,
324 PlanKind::InspectShard,
325 ],
326 StatementKind::StartTransaction => &[PlanKind::StartTransaction],
327 StatementKind::Subscribe => &[PlanKind::Subscribe],
328 StatementKind::Update => &[PlanKind::ReadThenWrite],
329 StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
330 StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
331 }
332 }
333
334 pub fn name(&self) -> &str {
336 match self {
337 Plan::CreateConnection(_) => "create connection",
338 Plan::CreateDatabase(_) => "create database",
339 Plan::CreateSchema(_) => "create schema",
340 Plan::CreateRole(_) => "create role",
341 Plan::CreateCluster(_) => "create cluster",
342 Plan::CreateClusterReplica(_) => "create cluster replica",
343 Plan::CreateSource(_) => "create source",
344 Plan::CreateSources(_) => "create source",
345 Plan::CreateSecret(_) => "create secret",
346 Plan::CreateSink(_) => "create sink",
347 Plan::CreateTable(_) => "create table",
348 Plan::CreateView(_) => "create view",
349 Plan::CreateMaterializedView(_) => "create materialized view",
350 Plan::CreateContinualTask(_) => "create continual task",
351 Plan::CreateIndex(_) => "create index",
352 Plan::CreateType(_) => "create type",
353 Plan::CreateNetworkPolicy(_) => "create network policy",
354 Plan::Comment(_) => "comment",
355 Plan::DiscardTemp => "discard temp",
356 Plan::DiscardAll => "discard all",
357 Plan::DropObjects(plan) => match plan.object_type {
358 ObjectType::Table => "drop table",
359 ObjectType::View => "drop view",
360 ObjectType::MaterializedView => "drop materialized view",
361 ObjectType::Source => "drop source",
362 ObjectType::Sink => "drop sink",
363 ObjectType::Index => "drop index",
364 ObjectType::Type => "drop type",
365 ObjectType::Role => "drop roles",
366 ObjectType::Cluster => "drop clusters",
367 ObjectType::ClusterReplica => "drop cluster replicas",
368 ObjectType::Secret => "drop secret",
369 ObjectType::Connection => "drop connection",
370 ObjectType::Database => "drop database",
371 ObjectType::Schema => "drop schema",
372 ObjectType::Func => "drop function",
373 ObjectType::ContinualTask => "drop continual task",
374 ObjectType::NetworkPolicy => "drop network policy",
375 },
376 Plan::DropOwned(_) => "drop owned",
377 Plan::EmptyQuery => "do nothing",
378 Plan::ShowAllVariables => "show all variables",
379 Plan::ShowCreate(_) => "show create",
380 Plan::ShowColumns(_) => "show columns",
381 Plan::ShowVariable(_) => "show variable",
382 Plan::InspectShard(_) => "inspect shard",
383 Plan::SetVariable(_) => "set variable",
384 Plan::ResetVariable(_) => "reset variable",
385 Plan::SetTransaction(_) => "set transaction",
386 Plan::StartTransaction(_) => "start transaction",
387 Plan::CommitTransaction(_) => "commit",
388 Plan::AbortTransaction(_) => "abort",
389 Plan::Select(_) => "select",
390 Plan::Subscribe(_) => "subscribe",
391 Plan::CopyFrom(_) => "copy from",
392 Plan::CopyTo(_) => "copy to",
393 Plan::ExplainPlan(_) => "explain plan",
394 Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
395 Plan::ExplainTimestamp(_) => "explain timestamp",
396 Plan::ExplainSinkSchema(_) => "explain schema",
397 Plan::Insert(_) => "insert",
398 Plan::AlterNoop(plan) => match plan.object_type {
399 ObjectType::Table => "alter table",
400 ObjectType::View => "alter view",
401 ObjectType::MaterializedView => "alter materialized view",
402 ObjectType::Source => "alter source",
403 ObjectType::Sink => "alter sink",
404 ObjectType::Index => "alter index",
405 ObjectType::Type => "alter type",
406 ObjectType::Role => "alter role",
407 ObjectType::Cluster => "alter cluster",
408 ObjectType::ClusterReplica => "alter cluster replica",
409 ObjectType::Secret => "alter secret",
410 ObjectType::Connection => "alter connection",
411 ObjectType::Database => "alter database",
412 ObjectType::Schema => "alter schema",
413 ObjectType::Func => "alter function",
414 ObjectType::ContinualTask => "alter continual task",
415 ObjectType::NetworkPolicy => "alter network policy",
416 },
417 Plan::AlterCluster(_) => "alter cluster",
418 Plan::AlterClusterRename(_) => "alter cluster rename",
419 Plan::AlterClusterSwap(_) => "alter cluster swap",
420 Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
421 Plan::AlterSetCluster(_) => "alter set cluster",
422 Plan::AlterConnection(_) => "alter connection",
423 Plan::AlterSource(_) => "alter source",
424 Plan::AlterItemRename(_) => "rename item",
425 Plan::AlterSchemaRename(_) => "alter rename schema",
426 Plan::AlterSchemaSwap(_) => "alter swap schema",
427 Plan::AlterSecret(_) => "alter secret",
428 Plan::AlterSink(_) => "alter sink",
429 Plan::AlterSystemSet(_) => "alter system",
430 Plan::AlterSystemReset(_) => "alter system",
431 Plan::AlterSystemResetAll(_) => "alter system",
432 Plan::AlterRole(_) => "alter role",
433 Plan::AlterNetworkPolicy(_) => "alter network policy",
434 Plan::AlterOwner(plan) => match plan.object_type {
435 ObjectType::Table => "alter table owner",
436 ObjectType::View => "alter view owner",
437 ObjectType::MaterializedView => "alter materialized view owner",
438 ObjectType::Source => "alter source owner",
439 ObjectType::Sink => "alter sink owner",
440 ObjectType::Index => "alter index owner",
441 ObjectType::Type => "alter type owner",
442 ObjectType::Role => "alter role owner",
443 ObjectType::Cluster => "alter cluster owner",
444 ObjectType::ClusterReplica => "alter cluster replica owner",
445 ObjectType::Secret => "alter secret owner",
446 ObjectType::Connection => "alter connection owner",
447 ObjectType::Database => "alter database owner",
448 ObjectType::Schema => "alter schema owner",
449 ObjectType::Func => "alter function owner",
450 ObjectType::ContinualTask => "alter continual task owner",
451 ObjectType::NetworkPolicy => "alter network policy owner",
452 },
453 Plan::AlterTableAddColumn(_) => "alter table add column",
454 Plan::AlterMaterializedViewApplyReplacement(_) => {
455 "alter materialized view apply replacement"
456 }
457 Plan::Declare(_) => "declare",
458 Plan::Fetch(_) => "fetch",
459 Plan::Close(_) => "close",
460 Plan::ReadThenWrite(plan) => match plan.kind {
461 MutationKind::Insert => "insert into select",
462 MutationKind::Update => "update",
463 MutationKind::Delete => "delete",
464 },
465 Plan::Prepare(_) => "prepare",
466 Plan::Execute(_) => "execute",
467 Plan::Deallocate(_) => "deallocate",
468 Plan::Raise(_) => "raise",
469 Plan::GrantRole(_) => "grant role",
470 Plan::RevokeRole(_) => "revoke role",
471 Plan::GrantPrivileges(_) => "grant privilege",
472 Plan::RevokePrivileges(_) => "revoke privilege",
473 Plan::AlterDefaultPrivileges(_) => "alter default privileges",
474 Plan::ReassignOwned(_) => "reassign owned",
475 Plan::SideEffectingFunc(_) => "side effecting func",
476 Plan::ValidateConnection(_) => "validate connection",
477 Plan::AlterRetainHistory(_) => "alter retain history",
478 Plan::AlterSourceTimestampInterval(_) => "alter source timestamp interval",
479 }
480 }
481
482 pub fn allowed_in_read_only(&self) -> bool {
488 match self {
489 Plan::SetVariable(_) => true,
492 Plan::ResetVariable(_) => true,
493 Plan::SetTransaction(_) => true,
494 Plan::StartTransaction(_) => true,
495 Plan::CommitTransaction(_) => true,
496 Plan::AbortTransaction(_) => true,
497 Plan::Select(_) => true,
498 Plan::EmptyQuery => true,
499 Plan::ShowAllVariables => true,
500 Plan::ShowCreate(_) => true,
501 Plan::ShowColumns(_) => true,
502 Plan::ShowVariable(_) => true,
503 Plan::InspectShard(_) => true,
504 Plan::Subscribe(_) => true,
505 Plan::CopyTo(_) => true,
506 Plan::ExplainPlan(_) => true,
507 Plan::ExplainPushdown(_) => true,
508 Plan::ExplainTimestamp(_) => true,
509 Plan::ExplainSinkSchema(_) => true,
510 Plan::ValidateConnection(_) => true,
511 _ => false,
512 }
513 }
514}
515
516#[derive(Debug)]
517pub struct StartTransactionPlan {
518 pub access: Option<TransactionAccessMode>,
519 pub isolation_level: Option<TransactionIsolationLevel>,
520}
521
522#[derive(Debug)]
523pub enum TransactionType {
524 Explicit,
525 Implicit,
526}
527
528impl TransactionType {
529 pub fn is_explicit(&self) -> bool {
530 matches!(self, TransactionType::Explicit)
531 }
532
533 pub fn is_implicit(&self) -> bool {
534 matches!(self, TransactionType::Implicit)
535 }
536}
537
538#[derive(Debug)]
539pub struct CommitTransactionPlan {
540 pub transaction_type: TransactionType,
541}
542
543#[derive(Debug)]
544pub struct AbortTransactionPlan {
545 pub transaction_type: TransactionType,
546}
547
548#[derive(Debug)]
549pub struct CreateDatabasePlan {
550 pub name: String,
551 pub if_not_exists: bool,
552}
553
554#[derive(Debug)]
555pub struct CreateSchemaPlan {
556 pub database_spec: ResolvedDatabaseSpecifier,
557 pub schema_name: String,
558 pub if_not_exists: bool,
559}
560
561#[derive(Debug)]
562pub struct CreateRolePlan {
563 pub name: String,
564 pub attributes: RoleAttributesRaw,
565}
566
567#[derive(Debug, PartialEq, Eq, Clone)]
568pub struct CreateClusterPlan {
569 pub name: String,
570 pub variant: CreateClusterVariant,
571 pub workload_class: Option<String>,
572}
573
574#[derive(Debug, PartialEq, Eq, Clone)]
575pub enum CreateClusterVariant {
576 Managed(CreateClusterManagedPlan),
577 Unmanaged(CreateClusterUnmanagedPlan),
578}
579
580#[derive(Debug, PartialEq, Eq, Clone)]
581pub struct CreateClusterUnmanagedPlan {
582 pub replicas: Vec<(String, ReplicaConfig)>,
583}
584
585#[derive(Debug, PartialEq, Eq, Clone)]
586pub struct CreateClusterManagedPlan {
587 pub replication_factor: u32,
588 pub size: String,
589 pub availability_zones: Vec<String>,
590 pub compute: ComputeReplicaConfig,
591 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
592 pub schedule: ClusterSchedule,
593}
594
595#[derive(Debug)]
596pub struct CreateClusterReplicaPlan {
597 pub cluster_id: ClusterId,
598 pub name: String,
599 pub config: ReplicaConfig,
600}
601
602#[derive(
604 Clone,
605 Copy,
606 Debug,
607 Serialize,
608 Deserialize,
609 PartialOrd,
610 Ord,
611 PartialEq,
612 Eq
613)]
614pub struct ComputeReplicaIntrospectionConfig {
615 pub debugging: bool,
617 pub interval: Duration,
619}
620
621#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
622pub struct ComputeReplicaConfig {
623 pub introspection: Option<ComputeReplicaIntrospectionConfig>,
624}
625
626#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
627pub enum ReplicaConfig {
628 Unorchestrated {
629 storagectl_addrs: Vec<String>,
630 computectl_addrs: Vec<String>,
631 compute: ComputeReplicaConfig,
632 },
633 Orchestrated {
634 size: String,
635 availability_zone: Option<String>,
636 compute: ComputeReplicaConfig,
637 internal: bool,
638 billed_as: Option<String>,
639 },
640}
641
642#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
643pub enum ClusterSchedule {
644 Manual,
646 Refresh { hydration_time_estimate: Duration },
650}
651
652impl Default for ClusterSchedule {
653 fn default() -> Self {
654 ClusterSchedule::Manual
656 }
657}
658
659#[derive(Debug)]
660pub struct CreateSourcePlan {
661 pub name: QualifiedItemName,
662 pub source: Source,
663 pub if_not_exists: bool,
664 pub timeline: Timeline,
665 pub in_cluster: Option<ClusterId>,
667}
668
669#[derive(Clone, Debug, PartialEq, Eq)]
670pub struct SourceReferences {
671 pub updated_at: u64,
672 pub references: Vec<SourceReference>,
673}
674
675#[derive(Clone, Debug, PartialEq, Eq)]
678pub struct SourceReference {
679 pub name: String,
680 pub namespace: Option<String>,
681 pub columns: Vec<String>,
682}
683
684#[derive(Debug)]
686pub struct CreateSourcePlanBundle {
687 pub item_id: CatalogItemId,
689 pub global_id: GlobalId,
691 pub plan: CreateSourcePlan,
693 pub resolved_ids: ResolvedIds,
695 pub available_source_references: Option<SourceReferences>,
699}
700
701#[derive(Debug)]
702pub struct CreateConnectionPlan {
703 pub name: QualifiedItemName,
704 pub if_not_exists: bool,
705 pub connection: Connection,
706 pub validate: bool,
707}
708
709#[derive(Debug)]
710pub struct ValidateConnectionPlan {
711 pub id: CatalogItemId,
713 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
715}
716
717#[derive(Debug)]
718pub struct CreateSecretPlan {
719 pub name: QualifiedItemName,
720 pub secret: Secret,
721 pub if_not_exists: bool,
722}
723
724#[derive(Debug)]
725pub struct CreateSinkPlan {
726 pub name: QualifiedItemName,
727 pub sink: Sink,
728 pub with_snapshot: bool,
729 pub if_not_exists: bool,
730 pub in_cluster: ClusterId,
731}
732
733#[derive(Debug)]
734pub struct CreateTablePlan {
735 pub name: QualifiedItemName,
736 pub table: Table,
737 pub if_not_exists: bool,
738}
739
740#[derive(Debug, Clone)]
741pub struct CreateViewPlan {
742 pub name: QualifiedItemName,
743 pub view: View,
744 pub replace: Option<CatalogItemId>,
746 pub drop_ids: Vec<CatalogItemId>,
748 pub if_not_exists: bool,
749 pub ambiguous_columns: bool,
752}
753
754#[derive(Debug, Clone)]
755pub struct CreateMaterializedViewPlan {
756 pub name: QualifiedItemName,
757 pub materialized_view: MaterializedView,
758 pub replace: Option<CatalogItemId>,
760 pub drop_ids: Vec<CatalogItemId>,
762 pub if_not_exists: bool,
763 pub ambiguous_columns: bool,
766}
767
768#[derive(Debug, Clone)]
769pub struct CreateContinualTaskPlan {
770 pub name: QualifiedItemName,
771 pub placeholder_id: Option<mz_expr::LocalId>,
774 pub desc: RelationDesc,
775 pub input_id: GlobalId,
777 pub with_snapshot: bool,
778 pub continual_task: MaterializedView,
780}
781
782#[derive(Debug, Clone)]
783pub struct CreateNetworkPolicyPlan {
784 pub name: String,
785 pub rules: Vec<NetworkPolicyRule>,
786}
787
788#[derive(Debug, Clone)]
789pub struct AlterNetworkPolicyPlan {
790 pub id: NetworkPolicyId,
791 pub name: String,
792 pub rules: Vec<NetworkPolicyRule>,
793}
794
795#[derive(Debug, Clone)]
796pub struct CreateIndexPlan {
797 pub name: QualifiedItemName,
798 pub index: Index,
799 pub if_not_exists: bool,
800}
801
802#[derive(Debug)]
803pub struct CreateTypePlan {
804 pub name: QualifiedItemName,
805 pub typ: Type,
806}
807
808#[derive(Debug)]
809pub struct DropObjectsPlan {
810 pub referenced_ids: Vec<ObjectId>,
812 pub drop_ids: Vec<ObjectId>,
814 pub object_type: ObjectType,
817}
818
819#[derive(Debug)]
820pub struct DropOwnedPlan {
821 pub role_ids: Vec<RoleId>,
823 pub drop_ids: Vec<ObjectId>,
825 pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
827 pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
829}
830
831#[derive(Debug)]
832pub struct ShowVariablePlan {
833 pub name: String,
834}
835
836#[derive(Debug)]
837pub struct InspectShardPlan {
838 pub id: GlobalId,
840}
841
842#[derive(Debug)]
843pub struct SetVariablePlan {
844 pub name: String,
845 pub value: VariableValue,
846 pub local: bool,
847}
848
849#[derive(Debug)]
850pub enum VariableValue {
851 Default,
852 Values(Vec<String>),
853}
854
855#[derive(Debug)]
856pub struct ResetVariablePlan {
857 pub name: String,
858}
859
860#[derive(Debug)]
861pub struct SetTransactionPlan {
862 pub local: bool,
863 pub modes: Vec<TransactionMode>,
864}
865
866#[derive(Clone, Debug)]
868pub struct SelectPlan {
869 pub select: Option<Box<SelectStatement<Aug>>>,
872 pub source: HirRelationExpr,
874 pub when: QueryWhen,
876 pub finishing: RowSetFinishing,
878 pub copy_to: Option<CopyFormat>,
880}
881
882impl SelectPlan {
883 pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
884 let arity = typ.arity();
885 SelectPlan {
886 select: None,
887 source: HirRelationExpr::Constant { rows, typ },
888 when: QueryWhen::Immediately,
889 finishing: RowSetFinishing::trivial(arity),
890 copy_to: None,
891 }
892 }
893}
894
895#[derive(Debug, Clone)]
896pub enum SubscribeOutput {
897 Diffs,
898 WithinTimestampOrderBy {
899 order_by: Vec<ColumnOrder>,
901 },
902 EnvelopeUpsert {
903 order_by_keys: Vec<ColumnOrder>,
905 },
906 EnvelopeDebezium {
907 order_by_keys: Vec<ColumnOrder>,
909 },
910}
911
912impl SubscribeOutput {
913 pub fn row_order(&self) -> &[ColumnOrder] {
914 match self {
915 SubscribeOutput::Diffs => &[],
916 SubscribeOutput::WithinTimestampOrderBy { .. } => &[],
918 SubscribeOutput::EnvelopeUpsert { order_by_keys } => order_by_keys,
919 SubscribeOutput::EnvelopeDebezium { order_by_keys } => order_by_keys,
920 }
921 }
922}
923
924#[derive(Debug, Clone)]
925pub struct SubscribePlan {
926 pub from: SubscribeFrom,
927 pub with_snapshot: bool,
928 pub when: QueryWhen,
929 pub up_to: Option<Timestamp>,
930 pub copy_to: Option<CopyFormat>,
931 pub emit_progress: bool,
932 pub output: SubscribeOutput,
933}
934
935#[derive(Debug, Clone)]
936pub enum SubscribeFrom {
937 Id(GlobalId),
939 Query {
941 expr: HirRelationExpr,
942 desc: RelationDesc,
943 },
944}
945
946impl SubscribeFrom {
947 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
948 match self {
949 SubscribeFrom::Id(id) => BTreeSet::from([*id]),
950 SubscribeFrom::Query { expr, .. } => expr.depends_on(),
951 }
952 }
953
954 pub fn contains_temporal(&self) -> bool {
955 match self {
956 SubscribeFrom::Id(_) => false,
957 SubscribeFrom::Query { expr, .. } => expr
958 .contains_temporal()
959 .expect("Unexpected error in `visit_scalars` call"),
960 }
961 }
962}
963
964#[derive(Debug)]
965pub struct ShowCreatePlan {
966 pub id: ObjectId,
967 pub row: Row,
968}
969
970#[derive(Debug)]
971pub struct ShowColumnsPlan {
972 pub id: CatalogItemId,
973 pub select_plan: SelectPlan,
974 pub new_resolved_ids: ResolvedIds,
975}
976
977#[derive(Debug)]
978pub struct CopyFromPlan {
979 pub target_id: CatalogItemId,
981 pub target_name: String,
983 pub source: CopyFromSource,
985 pub columns: Vec<ColumnIndex>,
989 pub source_desc: RelationDesc,
991 pub mfp: MapFilterProject,
993 pub params: CopyFormatParams<'static>,
995 pub filter: Option<CopyFromFilter>,
997}
998
999#[derive(Debug)]
1000pub enum CopyFromSource {
1001 Stdin,
1003 Url(HirScalarExpr),
1007 AwsS3 {
1009 uri: HirScalarExpr,
1011 connection: AwsConnection,
1013 connection_id: CatalogItemId,
1015 },
1016}
1017
1018#[derive(Debug)]
1019pub enum CopyFromFilter {
1020 Files(Vec<String>),
1021 Pattern(String),
1022}
1023
1024#[derive(Debug, Clone)]
1029pub struct CopyToPlan {
1030 pub select_plan: SelectPlan,
1032 pub desc: RelationDesc,
1033 pub to: HirScalarExpr,
1035 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
1036 pub connection_id: CatalogItemId,
1038 pub format: S3SinkFormat,
1039 pub max_file_size: u64,
1040}
1041
1042#[derive(Clone, Debug)]
1043pub struct ExplainPlanPlan {
1044 pub stage: ExplainStage,
1045 pub format: ExplainFormat,
1046 pub config: ExplainConfig,
1047 pub explainee: Explainee,
1048}
1049
1050#[derive(Clone, Debug)]
1052pub enum Explainee {
1053 View(CatalogItemId),
1055 MaterializedView(CatalogItemId),
1057 Index(CatalogItemId),
1059 ReplanView(CatalogItemId),
1061 ReplanMaterializedView(CatalogItemId),
1063 ReplanIndex(CatalogItemId),
1065 Statement(ExplaineeStatement),
1067}
1068
1069#[derive(Clone, Debug, EnumKind)]
1071#[enum_kind(ExplaineeStatementKind)]
1072pub enum ExplaineeStatement {
1073 Select {
1075 broken: bool,
1077 plan: plan::SelectPlan,
1078 desc: RelationDesc,
1079 },
1080 CreateView {
1082 broken: bool,
1084 plan: plan::CreateViewPlan,
1085 },
1086 CreateMaterializedView {
1088 broken: bool,
1090 plan: plan::CreateMaterializedViewPlan,
1091 },
1092 CreateIndex {
1094 broken: bool,
1096 plan: plan::CreateIndexPlan,
1097 },
1098 Subscribe {
1100 broken: bool,
1102 plan: plan::SubscribePlan,
1103 },
1104}
1105
1106impl ExplaineeStatement {
1107 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1108 match self {
1109 Self::Select { plan, .. } => plan.source.depends_on(),
1110 Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1111 Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1112 Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1113 Self::Subscribe { plan, .. } => plan.from.depends_on(),
1114 }
1115 }
1116
1117 pub fn broken(&self) -> bool {
1128 match self {
1129 Self::Select { broken, .. } => *broken,
1130 Self::CreateView { broken, .. } => *broken,
1131 Self::CreateMaterializedView { broken, .. } => *broken,
1132 Self::CreateIndex { broken, .. } => *broken,
1133 Self::Subscribe { broken, .. } => *broken,
1134 }
1135 }
1136}
1137
1138impl ExplaineeStatementKind {
1139 pub fn supports(&self, stage: &ExplainStage) -> bool {
1140 use ExplainStage::*;
1141 match self {
1142 Self::Select => true,
1143 Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1144 Self::CreateMaterializedView => true,
1145 Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1146 Self::Subscribe => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1149 }
1150 }
1151}
1152
1153impl std::fmt::Display for ExplaineeStatementKind {
1154 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1155 match self {
1156 Self::Select => write!(f, "SELECT"),
1157 Self::CreateView => write!(f, "CREATE VIEW"),
1158 Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1159 Self::CreateIndex => write!(f, "CREATE INDEX"),
1160 Self::Subscribe => write!(f, "SUBSCRIBE"),
1161 }
1162 }
1163}
1164
1165#[derive(Clone, Debug)]
1166pub struct ExplainPushdownPlan {
1167 pub explainee: Explainee,
1168}
1169
1170#[derive(Clone, Debug)]
1171pub struct ExplainTimestampPlan {
1172 pub format: ExplainFormat,
1173 pub raw_plan: HirRelationExpr,
1174 pub when: QueryWhen,
1175}
1176
1177#[derive(Debug)]
1178pub struct ExplainSinkSchemaPlan {
1179 pub sink_from: GlobalId,
1180 pub json_schema: String,
1181}
1182
1183#[derive(Debug)]
1184pub struct SendDiffsPlan {
1185 pub id: CatalogItemId,
1186 pub updates: Vec<(Row, Diff)>,
1187 pub kind: MutationKind,
1188 pub returning: Vec<(Row, NonZeroUsize)>,
1189 pub max_result_size: u64,
1190}
1191
1192#[derive(Debug)]
1193pub struct InsertPlan {
1194 pub id: CatalogItemId,
1195 pub values: HirRelationExpr,
1196 pub returning: Vec<mz_expr::MirScalarExpr>,
1197}
1198
1199#[derive(Debug)]
1200pub struct ReadThenWritePlan {
1201 pub id: CatalogItemId,
1202 pub selection: HirRelationExpr,
1203 pub finishing: RowSetFinishing,
1204 pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1205 pub kind: MutationKind,
1206 pub returning: Vec<mz_expr::MirScalarExpr>,
1207}
1208
1209#[derive(Debug)]
1211pub struct AlterNoopPlan {
1212 pub object_type: ObjectType,
1213}
1214
1215#[derive(Debug)]
1216pub struct AlterSetClusterPlan {
1217 pub id: CatalogItemId,
1218 pub set_cluster: ClusterId,
1219}
1220
1221#[derive(Debug)]
1222pub struct AlterRetainHistoryPlan {
1223 pub id: CatalogItemId,
1224 pub value: Option<Value>,
1225 pub window: CompactionWindow,
1226 pub object_type: ObjectType,
1227}
1228
1229#[derive(Debug)]
1230pub struct AlterSourceTimestampIntervalPlan {
1231 pub id: CatalogItemId,
1232 pub value: Option<Value>,
1233 pub interval: Duration,
1234}
1235
1236#[derive(Debug, Clone)]
1237
1238pub enum AlterOptionParameter<T = String> {
1239 Set(T),
1240 Reset,
1241 Unchanged,
1242}
1243
1244#[derive(Debug)]
1245pub enum AlterConnectionAction {
1246 RotateKeys,
1247 AlterOptions {
1248 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1249 drop_options: BTreeSet<ConnectionOptionName>,
1250 validate: bool,
1251 },
1252}
1253
1254#[derive(Debug)]
1255pub struct AlterConnectionPlan {
1256 pub id: CatalogItemId,
1257 pub action: AlterConnectionAction,
1258}
1259
1260#[derive(Debug)]
1261pub enum AlterSourceAction {
1262 AddSubsourceExports {
1263 subsources: Vec<CreateSourcePlanBundle>,
1264 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1265 },
1266 RefreshReferences {
1267 references: SourceReferences,
1268 },
1269}
1270
1271#[derive(Debug)]
1272pub struct AlterSourcePlan {
1273 pub item_id: CatalogItemId,
1274 pub ingestion_id: GlobalId,
1275 pub action: AlterSourceAction,
1276}
1277
1278#[derive(Debug, Clone)]
1279pub struct AlterSinkPlan {
1280 pub item_id: CatalogItemId,
1281 pub global_id: GlobalId,
1282 pub sink: Sink,
1283 pub with_snapshot: bool,
1284 pub in_cluster: ClusterId,
1285}
1286
1287#[derive(Debug, Clone)]
1288pub struct AlterClusterPlan {
1289 pub id: ClusterId,
1290 pub name: String,
1291 pub options: PlanClusterOption,
1292 pub strategy: AlterClusterPlanStrategy,
1293}
1294
1295#[derive(Debug)]
1296pub struct AlterClusterRenamePlan {
1297 pub id: ClusterId,
1298 pub name: String,
1299 pub to_name: String,
1300}
1301
1302#[derive(Debug)]
1303pub struct AlterClusterReplicaRenamePlan {
1304 pub cluster_id: ClusterId,
1305 pub replica_id: ReplicaId,
1306 pub name: QualifiedReplica,
1307 pub to_name: String,
1308}
1309
1310#[derive(Debug)]
1311pub struct AlterItemRenamePlan {
1312 pub id: CatalogItemId,
1313 pub current_full_name: FullItemName,
1314 pub to_name: String,
1315 pub object_type: ObjectType,
1316}
1317
1318#[derive(Debug)]
1319pub struct AlterSchemaRenamePlan {
1320 pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1321 pub new_schema_name: String,
1322}
1323
1324#[derive(Debug)]
1325pub struct AlterSchemaSwapPlan {
1326 pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1327 pub schema_a_name: String,
1328 pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1329 pub schema_b_name: String,
1330 pub name_temp: String,
1331}
1332
1333#[derive(Debug)]
1334pub struct AlterClusterSwapPlan {
1335 pub id_a: ClusterId,
1336 pub id_b: ClusterId,
1337 pub name_a: String,
1338 pub name_b: String,
1339 pub name_temp: String,
1340}
1341
1342#[derive(Debug)]
1343pub struct AlterSecretPlan {
1344 pub id: CatalogItemId,
1345 pub secret_as: MirScalarExpr,
1346}
1347
1348#[derive(Debug)]
1349pub struct AlterSystemSetPlan {
1350 pub name: String,
1351 pub value: VariableValue,
1352}
1353
1354#[derive(Debug)]
1355pub struct AlterSystemResetPlan {
1356 pub name: String,
1357}
1358
1359#[derive(Debug)]
1360pub struct AlterSystemResetAllPlan {}
1361
1362#[derive(Debug)]
1363pub struct AlterRolePlan {
1364 pub id: RoleId,
1365 pub name: String,
1366 pub option: PlannedAlterRoleOption,
1367}
1368
1369#[derive(Debug)]
1370pub struct AlterOwnerPlan {
1371 pub id: ObjectId,
1372 pub object_type: ObjectType,
1373 pub new_owner: RoleId,
1374}
1375
1376#[derive(Debug)]
1377pub struct AlterTablePlan {
1378 pub relation_id: CatalogItemId,
1379 pub column_name: ColumnName,
1380 pub column_type: SqlColumnType,
1381 pub raw_sql_type: RawDataType,
1382}
1383
1384#[derive(Debug, Clone)]
1385pub struct AlterMaterializedViewApplyReplacementPlan {
1386 pub id: CatalogItemId,
1387 pub replacement_id: CatalogItemId,
1388}
1389
1390#[derive(Debug)]
1391pub struct DeclarePlan {
1392 pub name: String,
1393 pub stmt: Statement<Raw>,
1394 pub sql: String,
1395 pub params: Params,
1396}
1397
1398#[derive(Debug)]
1399pub struct FetchPlan {
1400 pub name: String,
1401 pub count: Option<FetchDirection>,
1402 pub timeout: ExecuteTimeout,
1403}
1404
1405#[derive(Debug)]
1406pub struct ClosePlan {
1407 pub name: String,
1408}
1409
1410#[derive(Debug)]
1411pub struct PreparePlan {
1412 pub name: String,
1413 pub stmt: Statement<Raw>,
1414 pub sql: String,
1415 pub desc: StatementDesc,
1416}
1417
1418#[derive(Debug)]
1419pub struct ExecutePlan {
1420 pub name: String,
1421 pub params: Params,
1422}
1423
1424#[derive(Debug)]
1425pub struct DeallocatePlan {
1426 pub name: Option<String>,
1427}
1428
1429#[derive(Debug)]
1430pub struct RaisePlan {
1431 pub severity: NoticeSeverity,
1432}
1433
1434#[derive(Debug)]
1435pub struct GrantRolePlan {
1436 pub role_ids: Vec<RoleId>,
1438 pub member_ids: Vec<RoleId>,
1440 pub grantor_id: RoleId,
1442}
1443
1444#[derive(Debug)]
1445pub struct RevokeRolePlan {
1446 pub role_ids: Vec<RoleId>,
1448 pub member_ids: Vec<RoleId>,
1450 pub grantor_id: RoleId,
1452}
1453
1454#[derive(Debug)]
1455pub struct UpdatePrivilege {
1456 pub acl_mode: AclMode,
1458 pub target_id: SystemObjectId,
1460 pub grantor: RoleId,
1462}
1463
1464#[derive(Debug)]
1465pub struct GrantPrivilegesPlan {
1466 pub update_privileges: Vec<UpdatePrivilege>,
1468 pub grantees: Vec<RoleId>,
1470}
1471
1472#[derive(Debug)]
1473pub struct RevokePrivilegesPlan {
1474 pub update_privileges: Vec<UpdatePrivilege>,
1476 pub revokees: Vec<RoleId>,
1478}
1479#[derive(Debug)]
1480pub struct AlterDefaultPrivilegesPlan {
1481 pub privilege_objects: Vec<DefaultPrivilegeObject>,
1483 pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1485 pub is_grant: bool,
1487}
1488
1489#[derive(Debug)]
1490pub struct ReassignOwnedPlan {
1491 pub old_roles: Vec<RoleId>,
1493 pub new_role: RoleId,
1495 pub reassign_ids: Vec<ObjectId>,
1497}
1498
1499#[derive(Debug)]
1500pub struct CommentPlan {
1501 pub object_id: CommentObjectId,
1503 pub sub_component: Option<usize>,
1507 pub comment: Option<String>,
1509}
1510
1511#[derive(Clone, Debug)]
1512pub enum TableDataSource {
1513 TableWrites { defaults: Vec<Expr<Aug>> },
1515
1516 DataSource {
1519 desc: DataSourceDesc,
1520 timeline: Timeline,
1521 },
1522}
1523
1524#[derive(Clone, Debug)]
1525pub struct Table {
1526 pub create_sql: String,
1527 pub desc: VersionedRelationDesc,
1528 pub temporary: bool,
1529 pub compaction_window: Option<CompactionWindow>,
1530 pub data_source: TableDataSource,
1531}
1532
1533#[derive(Clone, Debug)]
1534pub struct Source {
1535 pub create_sql: String,
1536 pub data_source: DataSourceDesc,
1537 pub desc: RelationDesc,
1538 pub compaction_window: Option<CompactionWindow>,
1539}
1540
1541#[derive(Debug, Clone)]
1542pub enum DataSourceDesc {
1543 Ingestion(SourceDesc<ReferencedConnection>),
1545 OldSyntaxIngestion {
1547 desc: SourceDesc<ReferencedConnection>,
1548 progress_subsource: CatalogItemId,
1551 data_config: SourceExportDataConfig<ReferencedConnection>,
1552 details: SourceExportDetails,
1553 },
1554 IngestionExport {
1557 ingestion_id: CatalogItemId,
1558 external_reference: UnresolvedItemName,
1559 details: SourceExportDetails,
1560 data_config: SourceExportDataConfig<ReferencedConnection>,
1561 },
1562 Progress,
1564 Webhook {
1566 validate_using: Option<WebhookValidation>,
1567 body_format: WebhookBodyFormat,
1568 headers: WebhookHeaders,
1569 cluster_id: Option<StorageInstanceId>,
1571 },
1572}
1573
1574#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1575pub struct WebhookValidation {
1576 pub expression: MirScalarExpr,
1578 pub relation_desc: RelationDesc,
1580 pub bodies: Vec<(usize, bool)>,
1582 pub headers: Vec<(usize, bool)>,
1584 pub secrets: Vec<WebhookValidationSecret>,
1586}
1587
1588impl WebhookValidation {
1589 const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1590
1591 pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1596 let WebhookValidation {
1597 expression,
1598 relation_desc,
1599 ..
1600 } = self;
1601
1602 let mut expression_ = expression.clone();
1604 let desc_ = relation_desc.clone();
1605 let reduce_task = mz_ore::task::spawn_blocking(
1606 || "webhook-validation-reduce",
1607 move || {
1608 let repr_col_types: Vec<ReprColumnType> = desc_
1609 .typ()
1610 .column_types
1611 .iter()
1612 .map(ReprColumnType::from)
1613 .collect();
1614 expression_.reduce(&repr_col_types);
1615 expression_
1616 },
1617 );
1618
1619 match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1620 Ok(reduced_expr) => {
1621 *expression = reduced_expr;
1622 Ok(())
1623 }
1624 Err(_) => Err("timeout"),
1625 }
1626 }
1627}
1628
1629#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1630pub struct WebhookHeaders {
1631 pub header_column: Option<WebhookHeaderFilters>,
1633 pub mapped_headers: BTreeMap<usize, (String, bool)>,
1635}
1636
1637impl WebhookHeaders {
1638 pub fn num_columns(&self) -> usize {
1640 let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1641 let mapped_headers = self.mapped_headers.len();
1642
1643 header_column + mapped_headers
1644 }
1645}
1646
1647#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1648pub struct WebhookHeaderFilters {
1649 pub block: BTreeSet<String>,
1650 pub allow: BTreeSet<String>,
1651}
1652
1653#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Arbitrary)]
1654pub enum WebhookBodyFormat {
1655 Json { array: bool },
1656 Bytes,
1657 Text,
1658}
1659
1660impl From<WebhookBodyFormat> for SqlScalarType {
1661 fn from(value: WebhookBodyFormat) -> Self {
1662 match value {
1663 WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1664 WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1665 WebhookBodyFormat::Text => SqlScalarType::String,
1666 }
1667 }
1668}
1669
1670#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1671pub struct WebhookValidationSecret {
1672 pub id: CatalogItemId,
1674 pub column_idx: usize,
1676 pub use_bytes: bool,
1678}
1679
1680#[derive(Clone, Debug)]
1681pub struct Connection {
1682 pub create_sql: String,
1683 pub details: ConnectionDetails,
1684}
1685
1686#[derive(Clone, Debug, Serialize)]
1687pub enum ConnectionDetails {
1688 Kafka(KafkaConnection<ReferencedConnection>),
1689 Csr(CsrConnection<ReferencedConnection>),
1690 Postgres(PostgresConnection<ReferencedConnection>),
1691 Ssh {
1692 connection: SshConnection,
1693 key_1: SshKey,
1694 key_2: SshKey,
1695 },
1696 Aws(AwsConnection),
1697 AwsPrivatelink(AwsPrivatelinkConnection),
1698 MySql(MySqlConnection<ReferencedConnection>),
1699 SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1700 IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1701}
1702
1703impl ConnectionDetails {
1704 pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1705 match self {
1706 ConnectionDetails::Kafka(c) => {
1707 mz_storage_types::connections::Connection::Kafka(c.clone())
1708 }
1709 ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1710 ConnectionDetails::Postgres(c) => {
1711 mz_storage_types::connections::Connection::Postgres(c.clone())
1712 }
1713 ConnectionDetails::Ssh { connection, .. } => {
1714 mz_storage_types::connections::Connection::Ssh(connection.clone())
1715 }
1716 ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1717 ConnectionDetails::AwsPrivatelink(c) => {
1718 mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1719 }
1720 ConnectionDetails::MySql(c) => {
1721 mz_storage_types::connections::Connection::MySql(c.clone())
1722 }
1723 ConnectionDetails::SqlServer(c) => {
1724 mz_storage_types::connections::Connection::SqlServer(c.clone())
1725 }
1726 ConnectionDetails::IcebergCatalog(c) => {
1727 mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1728 }
1729 }
1730 }
1731}
1732
1733#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1734pub struct NetworkPolicyRule {
1735 pub name: String,
1736 pub action: NetworkPolicyRuleAction,
1737 pub address: PolicyAddress,
1738 pub direction: NetworkPolicyRuleDirection,
1739}
1740
1741#[derive(
1742 Debug,
1743 Clone,
1744 Serialize,
1745 Deserialize,
1746 PartialEq,
1747 Eq,
1748 Ord,
1749 PartialOrd,
1750 Hash
1751)]
1752pub enum NetworkPolicyRuleAction {
1753 Allow,
1754}
1755
1756impl std::fmt::Display for NetworkPolicyRuleAction {
1757 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1758 match self {
1759 Self::Allow => write!(f, "allow"),
1760 }
1761 }
1762}
1763impl TryFrom<&str> for NetworkPolicyRuleAction {
1764 type Error = PlanError;
1765 fn try_from(value: &str) -> Result<Self, Self::Error> {
1766 match value.to_uppercase().as_str() {
1767 "ALLOW" => Ok(Self::Allow),
1768 _ => Err(PlanError::Unstructured(
1769 "Allow is the only valid option".into(),
1770 )),
1771 }
1772 }
1773}
1774
1775#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1776pub enum NetworkPolicyRuleDirection {
1777 Ingress,
1778}
1779impl std::fmt::Display for NetworkPolicyRuleDirection {
1780 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1781 match self {
1782 Self::Ingress => write!(f, "ingress"),
1783 }
1784 }
1785}
1786impl TryFrom<&str> for NetworkPolicyRuleDirection {
1787 type Error = PlanError;
1788 fn try_from(value: &str) -> Result<Self, Self::Error> {
1789 match value.to_uppercase().as_str() {
1790 "INGRESS" => Ok(Self::Ingress),
1791 _ => Err(PlanError::Unstructured(
1792 "Ingress is the only valid option".into(),
1793 )),
1794 }
1795 }
1796}
1797
1798#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1799pub struct PolicyAddress(pub IpNet);
1800impl std::fmt::Display for PolicyAddress {
1801 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1802 write!(f, "{}", &self.0.to_string())
1803 }
1804}
1805impl From<String> for PolicyAddress {
1806 fn from(value: String) -> Self {
1807 Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1808 }
1809}
1810impl TryFrom<&str> for PolicyAddress {
1811 type Error = PlanError;
1812 fn try_from(value: &str) -> Result<Self, Self::Error> {
1813 let net = IpNet::from_str(value)
1814 .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1815 Ok(Self(net))
1816 }
1817}
1818
1819impl Serialize for PolicyAddress {
1820 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1821 where
1822 S: serde::Serializer,
1823 {
1824 serializer.serialize_str(&format!("{}", &self.0))
1825 }
1826}
1827
1828#[derive(Clone, Debug, Serialize)]
1829pub enum SshKey {
1830 PublicOnly(String),
1831 Both(SshKeyPair),
1832}
1833
1834impl SshKey {
1835 pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1836 match self {
1837 SshKey::PublicOnly(_) => None,
1838 SshKey::Both(key_pair) => Some(key_pair),
1839 }
1840 }
1841
1842 pub fn public_key(&self) -> String {
1843 match self {
1844 SshKey::PublicOnly(s) => s.into(),
1845 SshKey::Both(p) => p.ssh_public_key(),
1846 }
1847 }
1848}
1849
1850#[derive(Clone, Debug)]
1851pub struct Secret {
1852 pub create_sql: String,
1853 pub secret_as: MirScalarExpr,
1854}
1855
1856#[derive(Clone, Debug)]
1857pub struct Sink {
1858 pub create_sql: String,
1860 pub from: GlobalId,
1862 pub connection: StorageSinkConnection<ReferencedConnection>,
1864 pub envelope: SinkEnvelope,
1866 pub version: u64,
1867 pub commit_interval: Option<Duration>,
1868}
1869
1870#[derive(Clone, Debug)]
1871pub struct View {
1872 pub create_sql: String,
1874 pub expr: HirRelationExpr,
1876 pub dependencies: DependencyIds,
1878 pub column_names: Vec<ColumnName>,
1880 pub temporary: bool,
1882}
1883
1884#[derive(Clone, Debug)]
1885pub struct MaterializedView {
1886 pub create_sql: String,
1888 pub expr: HirRelationExpr,
1890 pub dependencies: DependencyIds,
1892 pub column_names: Vec<ColumnName>,
1894 pub replacement_target: Option<CatalogItemId>,
1895 pub cluster_id: ClusterId,
1897 pub target_replica: Option<ReplicaId>,
1899 pub non_null_assertions: Vec<usize>,
1900 pub compaction_window: Option<CompactionWindow>,
1901 pub refresh_schedule: Option<RefreshSchedule>,
1902 pub as_of: Option<Timestamp>,
1903}
1904
1905#[derive(Clone, Debug)]
1906pub struct Index {
1907 pub create_sql: String,
1909 pub on: GlobalId,
1911 pub keys: Vec<mz_expr::MirScalarExpr>,
1912 pub compaction_window: Option<CompactionWindow>,
1913 pub cluster_id: ClusterId,
1914}
1915
1916#[derive(Clone, Debug)]
1917pub struct Type {
1918 pub create_sql: String,
1919 pub inner: CatalogType<IdReference>,
1920}
1921
1922#[derive(Deserialize, Clone, Debug, PartialEq)]
1924pub enum QueryWhen {
1925 Immediately,
1928 FreshestTableWrite,
1931 AtTimestamp(Timestamp),
1936 AtLeastTimestamp(Timestamp),
1939}
1940
1941impl QueryWhen {
1942 pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1944 match self {
1945 QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1946 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1947 }
1948 }
1949 pub fn constrains_upper(&self) -> bool {
1953 match self {
1954 QueryWhen::AtTimestamp(_) => true,
1955 QueryWhen::AtLeastTimestamp(_)
1956 | QueryWhen::Immediately
1957 | QueryWhen::FreshestTableWrite => false,
1958 }
1959 }
1960 pub fn advance_to_since(&self) -> bool {
1962 match self {
1963 QueryWhen::Immediately
1964 | QueryWhen::AtLeastTimestamp(_)
1965 | QueryWhen::FreshestTableWrite => true,
1966 QueryWhen::AtTimestamp(_) => false,
1967 }
1968 }
1969 pub fn can_advance_to_upper(&self) -> bool {
1971 match self {
1972 QueryWhen::Immediately => true,
1973 QueryWhen::FreshestTableWrite
1974 | QueryWhen::AtTimestamp(_)
1975 | QueryWhen::AtLeastTimestamp(_) => false,
1976 }
1977 }
1978
1979 pub fn can_advance_to_timeline_ts(&self) -> bool {
1981 match self {
1982 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1983 QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1984 }
1985 }
1986 pub fn must_advance_to_timeline_ts(&self) -> bool {
1988 match self {
1989 QueryWhen::FreshestTableWrite => true,
1990 QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1991 false
1992 }
1993 }
1994 }
1995 pub fn is_transactional(&self) -> bool {
1997 match self {
1998 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1999 QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
2000 }
2001 }
2002}
2003
2004#[derive(Debug, Copy, Clone)]
2005pub enum MutationKind {
2006 Insert,
2007 Update,
2008 Delete,
2009}
2010
2011#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
2012pub enum CopyFormat {
2013 Text,
2014 Csv,
2015 Binary,
2016 Parquet,
2017}
2018
2019#[derive(Debug, Copy, Clone)]
2020pub enum ExecuteTimeout {
2021 None,
2022 Seconds(f64),
2023 WaitOnce,
2024}
2025
2026#[derive(Clone, Debug)]
2027pub enum IndexOption {
2028 RetainHistory(CompactionWindow),
2030}
2031
2032#[derive(Clone, Debug)]
2033pub enum TableOption {
2034 RetainHistory(CompactionWindow),
2036}
2037
2038#[derive(Clone, Debug)]
2039pub struct PlanClusterOption {
2040 pub availability_zones: AlterOptionParameter<Vec<String>>,
2041 pub introspection_debugging: AlterOptionParameter<bool>,
2042 pub introspection_interval: AlterOptionParameter<OptionalDuration>,
2043 pub managed: AlterOptionParameter<bool>,
2044 pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
2045 pub replication_factor: AlterOptionParameter<u32>,
2046 pub size: AlterOptionParameter,
2047 pub schedule: AlterOptionParameter<ClusterSchedule>,
2048 pub workload_class: AlterOptionParameter<Option<String>>,
2049}
2050
2051impl Default for PlanClusterOption {
2052 fn default() -> Self {
2053 Self {
2054 availability_zones: AlterOptionParameter::Unchanged,
2055 introspection_debugging: AlterOptionParameter::Unchanged,
2056 introspection_interval: AlterOptionParameter::Unchanged,
2057 managed: AlterOptionParameter::Unchanged,
2058 replicas: AlterOptionParameter::Unchanged,
2059 replication_factor: AlterOptionParameter::Unchanged,
2060 size: AlterOptionParameter::Unchanged,
2061 schedule: AlterOptionParameter::Unchanged,
2062 workload_class: AlterOptionParameter::Unchanged,
2063 }
2064 }
2065}
2066
2067#[derive(Clone, Debug, PartialEq, Eq)]
2068pub enum AlterClusterPlanStrategy {
2069 None,
2070 For(Duration),
2071 UntilReady {
2072 on_timeout: OnTimeoutAction,
2073 timeout: Duration,
2074 },
2075}
2076
2077#[derive(Clone, Debug, PartialEq, Eq)]
2078pub enum OnTimeoutAction {
2079 Commit,
2080 Rollback,
2081}
2082
2083impl Default for OnTimeoutAction {
2084 fn default() -> Self {
2085 Self::Commit
2086 }
2087}
2088
2089impl TryFrom<&str> for OnTimeoutAction {
2090 type Error = PlanError;
2091 fn try_from(value: &str) -> Result<Self, Self::Error> {
2092 match value.to_uppercase().as_str() {
2093 "COMMIT" => Ok(Self::Commit),
2094 "ROLLBACK" => Ok(Self::Rollback),
2095 _ => Err(PlanError::Unstructured(
2096 "Valid options are COMMIT, ROLLBACK".into(),
2097 )),
2098 }
2099 }
2100}
2101
2102impl AlterClusterPlanStrategy {
2103 pub fn is_none(&self) -> bool {
2104 matches!(self, Self::None)
2105 }
2106 pub fn is_some(&self) -> bool {
2107 !matches!(self, Self::None)
2108 }
2109}
2110
2111impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2112 type Error = PlanError;
2113
2114 fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2115 Ok(match value.wait {
2116 Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2117 Some(ClusterAlterOptionValue::UntilReady(options)) => {
2118 let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2119 Self::UntilReady {
2120 timeout: match extracted.timeout {
2121 Some(d) => d,
2122 None => Err(PlanError::UntilReadyTimeoutRequired)?,
2123 },
2124 on_timeout: match extracted.on_timeout {
2125 Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2126 PlanError::InvalidOptionValue {
2127 option_name: "ON TIMEOUT".into(),
2128 err: Box::new(e),
2129 }
2130 })?,
2131 None => OnTimeoutAction::default(),
2132 },
2133 }
2134 }
2135 None => Self::None,
2136 })
2137 }
2138}
2139
2140#[derive(Debug, Clone)]
2142pub struct Params {
2143 pub datums: Row,
2145 pub execute_types: Vec<SqlScalarType>,
2147 pub expected_types: Vec<SqlScalarType>,
2149}
2150
2151impl Params {
2152 pub fn empty() -> Params {
2154 Params {
2155 datums: Row::pack_slice(&[]),
2156 execute_types: vec![],
2157 expected_types: vec![],
2158 }
2159 }
2160}
2161
2162#[derive(
2164 Ord,
2165 PartialOrd,
2166 Clone,
2167 Debug,
2168 Eq,
2169 PartialEq,
2170 Serialize,
2171 Deserialize,
2172 Hash,
2173 Copy
2174)]
2175pub struct PlanContext {
2176 pub wall_time: DateTime<Utc>,
2177 pub ignore_if_exists_errors: bool,
2178}
2179
2180impl PlanContext {
2181 pub fn new(wall_time: DateTime<Utc>) -> Self {
2182 Self {
2183 wall_time,
2184 ignore_if_exists_errors: false,
2185 }
2186 }
2187
2188 pub fn zero() -> Self {
2192 PlanContext {
2193 wall_time: now::to_datetime(NOW_ZERO()),
2194 ignore_if_exists_errors: false,
2195 }
2196 }
2197
2198 pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2199 self.ignore_if_exists_errors = value;
2200 self
2201 }
2202}