1use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{
41 CollectionPlan, ColumnOrder, MapFilterProject, MirRelationExpr, MirScalarExpr, RowSetFinishing,
42};
43use mz_ore::now::{self, NOW_ZERO};
44use mz_pgcopy::CopyFormatParams;
45use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
46use mz_repr::explain::{ExplainConfig, ExplainFormat};
47use mz_repr::network_policy_id::NetworkPolicyId;
48use mz_repr::optimize::OptimizerFeatureOverrides;
49use mz_repr::refresh_schedule::RefreshSchedule;
50use mz_repr::role_id::RoleId;
51use mz_repr::{
52 CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, ReprColumnType, Row,
53 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
54};
55use mz_sql_parser::ast::{
56 AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
57 RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
58 Value, WithOptionValue,
59};
60use mz_ssh_util::keys::SshKeyPair;
61use mz_storage_types::connections::aws::AwsConnection;
62use mz_storage_types::connections::inline::ReferencedConnection;
63use mz_storage_types::connections::{
64 AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
65 MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
66};
67use mz_storage_types::instances::StorageInstanceId;
68use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
69use mz_storage_types::sources::{
70 SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
71};
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74
75use crate::ast::{
76 ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
77 TransactionAccessMode,
78};
79use crate::catalog::{
80 CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
81 RoleAttributesRaw,
82};
83use crate::names::{
84 Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
85 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
86};
87
88pub(crate) mod error;
89pub(crate) mod explain;
90pub(crate) mod hir;
91pub(crate) mod literal;
92pub(crate) mod lowering;
93pub(crate) mod notice;
94pub(crate) mod plan_utils;
95pub(crate) mod query;
96pub(crate) mod scope;
97pub(crate) mod side_effecting_func;
98pub(crate) mod statement;
99pub(crate) mod transform_ast;
100pub(crate) mod transform_hir;
101pub(crate) mod typeconv;
102pub(crate) mod with_options;
103
104use crate::plan;
105use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
106use crate::plan::with_options::OptionalDuration;
107pub use error::PlanError;
108pub use explain::normalize_subqueries;
109pub use hir::{
110 AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
111 WindowExprType,
112};
113pub use lowering::Config as HirToMirConfig;
114pub use notice::PlanNotice;
115pub use query::{ExprContext, QueryContext, QueryLifetime};
116pub use scope::Scope;
117pub use side_effecting_func::SideEffectingFunc;
118pub use statement::ddl::{
119 AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
120 PlannedAlterRoleOption, PlannedRoleVariable, SqlServerConfigOptionExtracted,
121};
122pub use statement::{
123 StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
124 resolve_cluster_for_materialized_view,
125};
126
127use self::statement::ddl::ClusterAlterOptionExtracted;
128use self::with_options::TryFromValue;
129
130#[derive(Debug, EnumKind)]
132#[enum_kind(PlanKind)]
133pub enum Plan {
134 CreateConnection(CreateConnectionPlan),
135 CreateDatabase(CreateDatabasePlan),
136 CreateSchema(CreateSchemaPlan),
137 CreateRole(CreateRolePlan),
138 CreateCluster(CreateClusterPlan),
139 CreateClusterReplica(CreateClusterReplicaPlan),
140 CreateSource(CreateSourcePlan),
141 CreateSources(Vec<CreateSourcePlanBundle>),
142 CreateSecret(CreateSecretPlan),
143 CreateSink(CreateSinkPlan),
144 CreateTable(CreateTablePlan),
145 CreateView(CreateViewPlan),
146 CreateMaterializedView(CreateMaterializedViewPlan),
147 CreateContinualTask(CreateContinualTaskPlan),
148 CreateNetworkPolicy(CreateNetworkPolicyPlan),
149 CreateIndex(CreateIndexPlan),
150 CreateType(CreateTypePlan),
151 Comment(CommentPlan),
152 DiscardTemp,
153 DiscardAll,
154 DropObjects(DropObjectsPlan),
155 DropOwned(DropOwnedPlan),
156 EmptyQuery,
157 ShowAllVariables,
158 ShowCreate(ShowCreatePlan),
159 ShowColumns(ShowColumnsPlan),
160 ShowVariable(ShowVariablePlan),
161 InspectShard(InspectShardPlan),
162 SetVariable(SetVariablePlan),
163 ResetVariable(ResetVariablePlan),
164 SetTransaction(SetTransactionPlan),
165 StartTransaction(StartTransactionPlan),
166 CommitTransaction(CommitTransactionPlan),
167 AbortTransaction(AbortTransactionPlan),
168 Select(SelectPlan),
169 Subscribe(SubscribePlan),
170 CopyFrom(CopyFromPlan),
171 CopyTo(CopyToPlan),
172 ExplainPlan(ExplainPlanPlan),
173 ExplainPushdown(ExplainPushdownPlan),
174 ExplainTimestamp(ExplainTimestampPlan),
175 ExplainSinkSchema(ExplainSinkSchemaPlan),
176 Insert(InsertPlan),
177 AlterCluster(AlterClusterPlan),
178 AlterClusterSwap(AlterClusterSwapPlan),
179 AlterNoop(AlterNoopPlan),
180 AlterSetCluster(AlterSetClusterPlan),
181 AlterConnection(AlterConnectionPlan),
182 AlterSource(AlterSourcePlan),
183 AlterClusterRename(AlterClusterRenamePlan),
184 AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
185 AlterItemRename(AlterItemRenamePlan),
186 AlterSchemaRename(AlterSchemaRenamePlan),
187 AlterSchemaSwap(AlterSchemaSwapPlan),
188 AlterSecret(AlterSecretPlan),
189 AlterSink(AlterSinkPlan),
190 AlterSystemSet(AlterSystemSetPlan),
191 AlterSystemReset(AlterSystemResetPlan),
192 AlterSystemResetAll(AlterSystemResetAllPlan),
193 AlterRole(AlterRolePlan),
194 AlterOwner(AlterOwnerPlan),
195 AlterTableAddColumn(AlterTablePlan),
196 AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementPlan),
197 AlterNetworkPolicy(AlterNetworkPolicyPlan),
198 Declare(DeclarePlan),
199 Fetch(FetchPlan),
200 Close(ClosePlan),
201 ReadThenWrite(ReadThenWritePlan),
202 Prepare(PreparePlan),
203 Execute(ExecutePlan),
204 Deallocate(DeallocatePlan),
205 Raise(RaisePlan),
206 GrantRole(GrantRolePlan),
207 RevokeRole(RevokeRolePlan),
208 GrantPrivileges(GrantPrivilegesPlan),
209 RevokePrivileges(RevokePrivilegesPlan),
210 AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
211 ReassignOwned(ReassignOwnedPlan),
212 SideEffectingFunc(SideEffectingFunc),
213 ValidateConnection(ValidateConnectionPlan),
214 AlterRetainHistory(AlterRetainHistoryPlan),
215 AlterSourceTimestampInterval(AlterSourceTimestampIntervalPlan),
216}
217
218impl Plan {
219 pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
222 match stmt {
223 StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
224 StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
225 StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
226 StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
227 StatementKind::AlterObjectRename => &[
228 PlanKind::AlterClusterRename,
229 PlanKind::AlterClusterReplicaRename,
230 PlanKind::AlterItemRename,
231 PlanKind::AlterSchemaRename,
232 PlanKind::AlterNoop,
233 ],
234 StatementKind::AlterObjectSwap => &[
235 PlanKind::AlterClusterSwap,
236 PlanKind::AlterSchemaSwap,
237 PlanKind::AlterNoop,
238 ],
239 StatementKind::AlterRole => &[PlanKind::AlterRole],
240 StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
241 StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
242 StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
243 StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
244 StatementKind::AlterSource => &[
245 PlanKind::AlterNoop,
246 PlanKind::AlterSource,
247 PlanKind::AlterRetainHistory,
248 PlanKind::AlterSourceTimestampInterval,
249 ],
250 StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
251 StatementKind::AlterSystemResetAll => {
252 &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
253 }
254 StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
255 StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
256 StatementKind::AlterTableAddColumn => {
257 &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
258 }
259 StatementKind::AlterMaterializedViewApplyReplacement => &[
260 PlanKind::AlterNoop,
261 PlanKind::AlterMaterializedViewApplyReplacement,
262 ],
263 StatementKind::Close => &[PlanKind::Close],
264 StatementKind::Comment => &[PlanKind::Comment],
265 StatementKind::Commit => &[PlanKind::CommitTransaction],
266 StatementKind::Copy => &[
267 PlanKind::CopyFrom,
268 PlanKind::Select,
269 PlanKind::Subscribe,
270 PlanKind::CopyTo,
271 ],
272 StatementKind::CreateCluster => &[PlanKind::CreateCluster],
273 StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
274 StatementKind::CreateConnection => &[PlanKind::CreateConnection],
275 StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
276 StatementKind::CreateIndex => &[PlanKind::CreateIndex],
277 StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
278 StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
279 StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
280 StatementKind::CreateRole => &[PlanKind::CreateRole],
281 StatementKind::CreateSchema => &[PlanKind::CreateSchema],
282 StatementKind::CreateSecret => &[PlanKind::CreateSecret],
283 StatementKind::CreateSink => &[PlanKind::CreateSink],
284 StatementKind::CreateSource | StatementKind::CreateSubsource => {
285 &[PlanKind::CreateSource]
286 }
287 StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
288 StatementKind::CreateTable => &[PlanKind::CreateTable],
289 StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
290 StatementKind::CreateType => &[PlanKind::CreateType],
291 StatementKind::CreateView => &[PlanKind::CreateView],
292 StatementKind::Deallocate => &[PlanKind::Deallocate],
293 StatementKind::Declare => &[PlanKind::Declare],
294 StatementKind::Delete => &[PlanKind::ReadThenWrite],
295 StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
296 StatementKind::DropObjects => &[PlanKind::DropObjects],
297 StatementKind::DropOwned => &[PlanKind::DropOwned],
298 StatementKind::Execute => &[PlanKind::Execute],
299 StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
300 StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
301 StatementKind::ExplainAnalyzeObject => &[PlanKind::Select],
302 StatementKind::ExplainAnalyzeCluster => &[PlanKind::Select],
303 StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
304 StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
305 StatementKind::Fetch => &[PlanKind::Fetch],
306 StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
307 StatementKind::GrantRole => &[PlanKind::GrantRole],
308 StatementKind::Insert => &[PlanKind::Insert],
309 StatementKind::Prepare => &[PlanKind::Prepare],
310 StatementKind::Raise => &[PlanKind::Raise],
311 StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
312 StatementKind::ResetVariable => &[PlanKind::ResetVariable],
313 StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
314 StatementKind::RevokeRole => &[PlanKind::RevokeRole],
315 StatementKind::Rollback => &[PlanKind::AbortTransaction],
316 StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
317 StatementKind::SetTransaction => &[PlanKind::SetTransaction],
318 StatementKind::SetVariable => &[PlanKind::SetVariable],
319 StatementKind::Show => &[
320 PlanKind::Select,
321 PlanKind::ShowVariable,
322 PlanKind::ShowCreate,
323 PlanKind::ShowColumns,
324 PlanKind::ShowAllVariables,
325 PlanKind::InspectShard,
326 ],
327 StatementKind::StartTransaction => &[PlanKind::StartTransaction],
328 StatementKind::Subscribe => &[PlanKind::Subscribe],
329 StatementKind::Update => &[PlanKind::ReadThenWrite],
330 StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
331 StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
332 }
333 }
334
335 pub fn name(&self) -> &str {
337 match self {
338 Plan::CreateConnection(_) => "create connection",
339 Plan::CreateDatabase(_) => "create database",
340 Plan::CreateSchema(_) => "create schema",
341 Plan::CreateRole(_) => "create role",
342 Plan::CreateCluster(_) => "create cluster",
343 Plan::CreateClusterReplica(_) => "create cluster replica",
344 Plan::CreateSource(_) => "create source",
345 Plan::CreateSources(_) => "create source",
346 Plan::CreateSecret(_) => "create secret",
347 Plan::CreateSink(_) => "create sink",
348 Plan::CreateTable(_) => "create table",
349 Plan::CreateView(_) => "create view",
350 Plan::CreateMaterializedView(_) => "create materialized view",
351 Plan::CreateContinualTask(_) => "create continual task",
352 Plan::CreateIndex(_) => "create index",
353 Plan::CreateType(_) => "create type",
354 Plan::CreateNetworkPolicy(_) => "create network policy",
355 Plan::Comment(_) => "comment",
356 Plan::DiscardTemp => "discard temp",
357 Plan::DiscardAll => "discard all",
358 Plan::DropObjects(plan) => match plan.object_type {
359 ObjectType::Table => "drop table",
360 ObjectType::View => "drop view",
361 ObjectType::MaterializedView => "drop materialized view",
362 ObjectType::Source => "drop source",
363 ObjectType::Sink => "drop sink",
364 ObjectType::Index => "drop index",
365 ObjectType::Type => "drop type",
366 ObjectType::Role => "drop roles",
367 ObjectType::Cluster => "drop clusters",
368 ObjectType::ClusterReplica => "drop cluster replicas",
369 ObjectType::Secret => "drop secret",
370 ObjectType::Connection => "drop connection",
371 ObjectType::Database => "drop database",
372 ObjectType::Schema => "drop schema",
373 ObjectType::Func => "drop function",
374 ObjectType::ContinualTask => "drop continual task",
375 ObjectType::NetworkPolicy => "drop network policy",
376 },
377 Plan::DropOwned(_) => "drop owned",
378 Plan::EmptyQuery => "do nothing",
379 Plan::ShowAllVariables => "show all variables",
380 Plan::ShowCreate(_) => "show create",
381 Plan::ShowColumns(_) => "show columns",
382 Plan::ShowVariable(_) => "show variable",
383 Plan::InspectShard(_) => "inspect shard",
384 Plan::SetVariable(_) => "set variable",
385 Plan::ResetVariable(_) => "reset variable",
386 Plan::SetTransaction(_) => "set transaction",
387 Plan::StartTransaction(_) => "start transaction",
388 Plan::CommitTransaction(_) => "commit",
389 Plan::AbortTransaction(_) => "abort",
390 Plan::Select(_) => "select",
391 Plan::Subscribe(_) => "subscribe",
392 Plan::CopyFrom(_) => "copy from",
393 Plan::CopyTo(_) => "copy to",
394 Plan::ExplainPlan(_) => "explain plan",
395 Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
396 Plan::ExplainTimestamp(_) => "explain timestamp",
397 Plan::ExplainSinkSchema(_) => "explain schema",
398 Plan::Insert(_) => "insert",
399 Plan::AlterNoop(plan) => match plan.object_type {
400 ObjectType::Table => "alter table",
401 ObjectType::View => "alter view",
402 ObjectType::MaterializedView => "alter materialized view",
403 ObjectType::Source => "alter source",
404 ObjectType::Sink => "alter sink",
405 ObjectType::Index => "alter index",
406 ObjectType::Type => "alter type",
407 ObjectType::Role => "alter role",
408 ObjectType::Cluster => "alter cluster",
409 ObjectType::ClusterReplica => "alter cluster replica",
410 ObjectType::Secret => "alter secret",
411 ObjectType::Connection => "alter connection",
412 ObjectType::Database => "alter database",
413 ObjectType::Schema => "alter schema",
414 ObjectType::Func => "alter function",
415 ObjectType::ContinualTask => "alter continual task",
416 ObjectType::NetworkPolicy => "alter network policy",
417 },
418 Plan::AlterCluster(_) => "alter cluster",
419 Plan::AlterClusterRename(_) => "alter cluster rename",
420 Plan::AlterClusterSwap(_) => "alter cluster swap",
421 Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
422 Plan::AlterSetCluster(_) => "alter set cluster",
423 Plan::AlterConnection(_) => "alter connection",
424 Plan::AlterSource(_) => "alter source",
425 Plan::AlterItemRename(_) => "rename item",
426 Plan::AlterSchemaRename(_) => "alter rename schema",
427 Plan::AlterSchemaSwap(_) => "alter swap schema",
428 Plan::AlterSecret(_) => "alter secret",
429 Plan::AlterSink(_) => "alter sink",
430 Plan::AlterSystemSet(_) => "alter system",
431 Plan::AlterSystemReset(_) => "alter system",
432 Plan::AlterSystemResetAll(_) => "alter system",
433 Plan::AlterRole(_) => "alter role",
434 Plan::AlterNetworkPolicy(_) => "alter network policy",
435 Plan::AlterOwner(plan) => match plan.object_type {
436 ObjectType::Table => "alter table owner",
437 ObjectType::View => "alter view owner",
438 ObjectType::MaterializedView => "alter materialized view owner",
439 ObjectType::Source => "alter source owner",
440 ObjectType::Sink => "alter sink owner",
441 ObjectType::Index => "alter index owner",
442 ObjectType::Type => "alter type owner",
443 ObjectType::Role => "alter role owner",
444 ObjectType::Cluster => "alter cluster owner",
445 ObjectType::ClusterReplica => "alter cluster replica owner",
446 ObjectType::Secret => "alter secret owner",
447 ObjectType::Connection => "alter connection owner",
448 ObjectType::Database => "alter database owner",
449 ObjectType::Schema => "alter schema owner",
450 ObjectType::Func => "alter function owner",
451 ObjectType::ContinualTask => "alter continual task owner",
452 ObjectType::NetworkPolicy => "alter network policy owner",
453 },
454 Plan::AlterTableAddColumn(_) => "alter table add column",
455 Plan::AlterMaterializedViewApplyReplacement(_) => {
456 "alter materialized view apply replacement"
457 }
458 Plan::Declare(_) => "declare",
459 Plan::Fetch(_) => "fetch",
460 Plan::Close(_) => "close",
461 Plan::ReadThenWrite(plan) => match plan.kind {
462 MutationKind::Insert => "insert into select",
463 MutationKind::Update => "update",
464 MutationKind::Delete => "delete",
465 },
466 Plan::Prepare(_) => "prepare",
467 Plan::Execute(_) => "execute",
468 Plan::Deallocate(_) => "deallocate",
469 Plan::Raise(_) => "raise",
470 Plan::GrantRole(_) => "grant role",
471 Plan::RevokeRole(_) => "revoke role",
472 Plan::GrantPrivileges(_) => "grant privilege",
473 Plan::RevokePrivileges(_) => "revoke privilege",
474 Plan::AlterDefaultPrivileges(_) => "alter default privileges",
475 Plan::ReassignOwned(_) => "reassign owned",
476 Plan::SideEffectingFunc(_) => "side effecting func",
477 Plan::ValidateConnection(_) => "validate connection",
478 Plan::AlterRetainHistory(_) => "alter retain history",
479 Plan::AlterSourceTimestampInterval(_) => "alter source timestamp interval",
480 }
481 }
482
483 pub fn allowed_in_read_only(&self) -> bool {
489 match self {
490 Plan::SetVariable(_) => true,
493 Plan::ResetVariable(_) => true,
494 Plan::SetTransaction(_) => true,
495 Plan::StartTransaction(_) => true,
496 Plan::CommitTransaction(_) => true,
497 Plan::AbortTransaction(_) => true,
498 Plan::Select(_) => true,
499 Plan::EmptyQuery => true,
500 Plan::ShowAllVariables => true,
501 Plan::ShowCreate(_) => true,
502 Plan::ShowColumns(_) => true,
503 Plan::ShowVariable(_) => true,
504 Plan::InspectShard(_) => true,
505 Plan::Subscribe(_) => true,
506 Plan::CopyTo(_) => true,
507 Plan::ExplainPlan(_) => true,
508 Plan::ExplainPushdown(_) => true,
509 Plan::ExplainTimestamp(_) => true,
510 Plan::ExplainSinkSchema(_) => true,
511 Plan::ValidateConnection(_) => true,
512 _ => false,
513 }
514 }
515}
516
517#[derive(Debug)]
518pub struct StartTransactionPlan {
519 pub access: Option<TransactionAccessMode>,
520 pub isolation_level: Option<TransactionIsolationLevel>,
521}
522
523#[derive(Debug)]
524pub enum TransactionType {
525 Explicit,
526 Implicit,
527}
528
529impl TransactionType {
530 pub fn is_explicit(&self) -> bool {
531 matches!(self, TransactionType::Explicit)
532 }
533
534 pub fn is_implicit(&self) -> bool {
535 matches!(self, TransactionType::Implicit)
536 }
537}
538
539#[derive(Debug)]
540pub struct CommitTransactionPlan {
541 pub transaction_type: TransactionType,
542}
543
544#[derive(Debug)]
545pub struct AbortTransactionPlan {
546 pub transaction_type: TransactionType,
547}
548
549#[derive(Debug)]
550pub struct CreateDatabasePlan {
551 pub name: String,
552 pub if_not_exists: bool,
553}
554
555#[derive(Debug)]
556pub struct CreateSchemaPlan {
557 pub database_spec: ResolvedDatabaseSpecifier,
558 pub schema_name: String,
559 pub if_not_exists: bool,
560}
561
562#[derive(Debug)]
563pub struct CreateRolePlan {
564 pub name: String,
565 pub attributes: RoleAttributesRaw,
566}
567
568#[derive(Debug, PartialEq, Eq, Clone)]
569pub struct CreateClusterPlan {
570 pub name: String,
571 pub variant: CreateClusterVariant,
572 pub workload_class: Option<String>,
573}
574
575#[derive(Debug, PartialEq, Eq, Clone)]
576pub enum CreateClusterVariant {
577 Managed(CreateClusterManagedPlan),
578 Unmanaged(CreateClusterUnmanagedPlan),
579}
580
581#[derive(Debug, PartialEq, Eq, Clone)]
582pub struct CreateClusterUnmanagedPlan {
583 pub replicas: Vec<(String, ReplicaConfig)>,
584}
585
586#[derive(Debug, PartialEq, Eq, Clone)]
587pub struct CreateClusterManagedPlan {
588 pub replication_factor: u32,
589 pub size: String,
590 pub availability_zones: Vec<String>,
591 pub compute: ComputeReplicaConfig,
592 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
593 pub schedule: ClusterSchedule,
594}
595
596#[derive(Debug)]
597pub struct CreateClusterReplicaPlan {
598 pub cluster_id: ClusterId,
599 pub name: String,
600 pub config: ReplicaConfig,
601}
602
603#[derive(
605 Clone,
606 Copy,
607 Debug,
608 Serialize,
609 Deserialize,
610 PartialOrd,
611 Ord,
612 PartialEq,
613 Eq
614)]
615pub struct ComputeReplicaIntrospectionConfig {
616 pub debugging: bool,
618 pub interval: Duration,
620}
621
622#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
623pub struct ComputeReplicaConfig {
624 pub introspection: Option<ComputeReplicaIntrospectionConfig>,
625}
626
627#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
628pub enum ReplicaConfig {
629 Unorchestrated {
630 storagectl_addrs: Vec<String>,
631 computectl_addrs: Vec<String>,
632 compute: ComputeReplicaConfig,
633 },
634 Orchestrated {
635 size: String,
636 availability_zone: Option<String>,
637 compute: ComputeReplicaConfig,
638 internal: bool,
639 billed_as: Option<String>,
640 },
641}
642
643#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
644pub enum ClusterSchedule {
645 Manual,
647 Refresh { hydration_time_estimate: Duration },
651}
652
653impl Default for ClusterSchedule {
654 fn default() -> Self {
655 ClusterSchedule::Manual
657 }
658}
659
660#[derive(Debug)]
661pub struct CreateSourcePlan {
662 pub name: QualifiedItemName,
663 pub source: Source,
664 pub if_not_exists: bool,
665 pub timeline: Timeline,
666 pub in_cluster: Option<ClusterId>,
668}
669
670#[derive(Clone, Debug, PartialEq, Eq)]
671pub struct SourceReferences {
672 pub updated_at: u64,
673 pub references: Vec<SourceReference>,
674}
675
676#[derive(Clone, Debug, PartialEq, Eq)]
679pub struct SourceReference {
680 pub name: String,
681 pub namespace: Option<String>,
682 pub columns: Vec<String>,
683}
684
685#[derive(Debug)]
687pub struct CreateSourcePlanBundle {
688 pub item_id: CatalogItemId,
690 pub global_id: GlobalId,
692 pub plan: CreateSourcePlan,
694 pub resolved_ids: ResolvedIds,
696 pub available_source_references: Option<SourceReferences>,
700}
701
702#[derive(Debug)]
703pub struct CreateConnectionPlan {
704 pub name: QualifiedItemName,
705 pub if_not_exists: bool,
706 pub connection: Connection,
707 pub validate: bool,
708}
709
710#[derive(Debug)]
711pub struct ValidateConnectionPlan {
712 pub id: CatalogItemId,
714 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
716}
717
718#[derive(Debug)]
719pub struct CreateSecretPlan {
720 pub name: QualifiedItemName,
721 pub secret: Secret,
722 pub if_not_exists: bool,
723}
724
725#[derive(Debug)]
726pub struct CreateSinkPlan {
727 pub name: QualifiedItemName,
728 pub sink: Sink,
729 pub with_snapshot: bool,
730 pub if_not_exists: bool,
731 pub in_cluster: ClusterId,
732}
733
734#[derive(Debug)]
735pub struct CreateTablePlan {
736 pub name: QualifiedItemName,
737 pub table: Table,
738 pub if_not_exists: bool,
739}
740
741#[derive(Debug, Clone)]
742pub struct CreateViewPlan {
743 pub name: QualifiedItemName,
744 pub view: View,
745 pub replace: Option<CatalogItemId>,
747 pub drop_ids: Vec<CatalogItemId>,
749 pub if_not_exists: bool,
750 pub ambiguous_columns: bool,
753}
754
755#[derive(Debug, Clone)]
756pub struct CreateMaterializedViewPlan {
757 pub name: QualifiedItemName,
758 pub materialized_view: MaterializedView,
759 pub replace: Option<CatalogItemId>,
761 pub drop_ids: Vec<CatalogItemId>,
763 pub if_not_exists: bool,
764 pub ambiguous_columns: bool,
767}
768
769#[derive(Debug, Clone)]
770pub struct CreateContinualTaskPlan {
771 pub name: QualifiedItemName,
772 pub placeholder_id: Option<mz_expr::LocalId>,
775 pub desc: RelationDesc,
776 pub input_id: GlobalId,
778 pub with_snapshot: bool,
779 pub continual_task: MaterializedView,
781}
782
783#[derive(Debug, Clone)]
784pub struct CreateNetworkPolicyPlan {
785 pub name: String,
786 pub rules: Vec<NetworkPolicyRule>,
787}
788
789#[derive(Debug, Clone)]
790pub struct AlterNetworkPolicyPlan {
791 pub id: NetworkPolicyId,
792 pub name: String,
793 pub rules: Vec<NetworkPolicyRule>,
794}
795
796#[derive(Debug, Clone)]
797pub struct CreateIndexPlan {
798 pub name: QualifiedItemName,
799 pub index: Index,
800 pub if_not_exists: bool,
801}
802
803#[derive(Debug)]
804pub struct CreateTypePlan {
805 pub name: QualifiedItemName,
806 pub typ: Type,
807}
808
809#[derive(Debug)]
810pub struct DropObjectsPlan {
811 pub referenced_ids: Vec<ObjectId>,
813 pub drop_ids: Vec<ObjectId>,
815 pub object_type: ObjectType,
818}
819
820#[derive(Debug)]
821pub struct DropOwnedPlan {
822 pub role_ids: Vec<RoleId>,
824 pub drop_ids: Vec<ObjectId>,
826 pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
828 pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
830}
831
832#[derive(Debug)]
833pub struct ShowVariablePlan {
834 pub name: String,
835}
836
837#[derive(Debug)]
838pub struct InspectShardPlan {
839 pub id: GlobalId,
841}
842
843#[derive(Debug)]
844pub struct SetVariablePlan {
845 pub name: String,
846 pub value: VariableValue,
847 pub local: bool,
848}
849
850#[derive(Debug)]
851pub enum VariableValue {
852 Default,
853 Values(Vec<String>),
854}
855
856#[derive(Debug)]
857pub struct ResetVariablePlan {
858 pub name: String,
859}
860
861#[derive(Debug)]
862pub struct SetTransactionPlan {
863 pub local: bool,
864 pub modes: Vec<TransactionMode>,
865}
866
867#[derive(Clone, Debug)]
869pub struct SelectPlan {
870 pub select: Option<Box<SelectStatement<Aug>>>,
873 pub source: HirRelationExpr,
875 pub when: QueryWhen,
877 pub finishing: RowSetFinishing,
879 pub copy_to: Option<CopyFormat>,
881}
882
883impl SelectPlan {
884 pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
885 let arity = typ.arity();
886 SelectPlan {
887 select: None,
888 source: HirRelationExpr::Constant { rows, typ },
889 when: QueryWhen::Immediately,
890 finishing: RowSetFinishing::trivial(arity),
891 copy_to: None,
892 }
893 }
894}
895
896#[derive(Debug, Clone)]
897pub enum SubscribeOutput {
898 Diffs,
899 WithinTimestampOrderBy {
900 order_by: Vec<ColumnOrder>,
902 },
903 EnvelopeUpsert {
904 order_by_keys: Vec<ColumnOrder>,
906 },
907 EnvelopeDebezium {
908 order_by_keys: Vec<ColumnOrder>,
910 },
911}
912
913#[derive(Debug, Clone)]
914pub struct SubscribePlan {
915 pub from: SubscribeFrom,
916 pub with_snapshot: bool,
917 pub when: QueryWhen,
918 pub up_to: Option<Timestamp>,
919 pub copy_to: Option<CopyFormat>,
920 pub emit_progress: bool,
921 pub output: SubscribeOutput,
922}
923
924#[derive(Debug, Clone)]
925pub enum SubscribeFrom {
926 Id(GlobalId),
928 Query {
930 expr: MirRelationExpr,
931 desc: RelationDesc,
932 },
933}
934
935impl SubscribeFrom {
936 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
937 match self {
938 SubscribeFrom::Id(id) => BTreeSet::from([*id]),
939 SubscribeFrom::Query { expr, .. } => expr.depends_on(),
940 }
941 }
942
943 pub fn contains_temporal(&self) -> bool {
944 match self {
945 SubscribeFrom::Id(_) => false,
946 SubscribeFrom::Query { expr, .. } => expr.contains_temporal(),
947 }
948 }
949}
950
951#[derive(Debug)]
952pub struct ShowCreatePlan {
953 pub id: ObjectId,
954 pub row: Row,
955}
956
957#[derive(Debug)]
958pub struct ShowColumnsPlan {
959 pub id: CatalogItemId,
960 pub select_plan: SelectPlan,
961 pub new_resolved_ids: ResolvedIds,
962}
963
964#[derive(Debug)]
965pub struct CopyFromPlan {
966 pub target_id: CatalogItemId,
968 pub target_name: String,
970 pub source: CopyFromSource,
972 pub columns: Vec<ColumnIndex>,
976 pub source_desc: RelationDesc,
978 pub mfp: MapFilterProject,
980 pub params: CopyFormatParams<'static>,
982 pub filter: Option<CopyFromFilter>,
984}
985
986#[derive(Debug)]
987pub enum CopyFromSource {
988 Stdin,
990 Url(HirScalarExpr),
994 AwsS3 {
996 uri: HirScalarExpr,
998 connection: AwsConnection,
1000 connection_id: CatalogItemId,
1002 },
1003}
1004
1005#[derive(Debug)]
1006pub enum CopyFromFilter {
1007 Files(Vec<String>),
1008 Pattern(String),
1009}
1010
1011#[derive(Debug, Clone)]
1016pub struct CopyToPlan {
1017 pub select_plan: SelectPlan,
1019 pub desc: RelationDesc,
1020 pub to: HirScalarExpr,
1022 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
1023 pub connection_id: CatalogItemId,
1025 pub format: S3SinkFormat,
1026 pub max_file_size: u64,
1027}
1028
1029#[derive(Clone, Debug)]
1030pub struct ExplainPlanPlan {
1031 pub stage: ExplainStage,
1032 pub format: ExplainFormat,
1033 pub config: ExplainConfig,
1034 pub explainee: Explainee,
1035}
1036
1037#[derive(Clone, Debug)]
1039pub enum Explainee {
1040 View(CatalogItemId),
1042 MaterializedView(CatalogItemId),
1044 Index(CatalogItemId),
1046 ReplanView(CatalogItemId),
1048 ReplanMaterializedView(CatalogItemId),
1050 ReplanIndex(CatalogItemId),
1052 Statement(ExplaineeStatement),
1054}
1055
1056#[derive(Clone, Debug, EnumKind)]
1058#[enum_kind(ExplaineeStatementKind)]
1059pub enum ExplaineeStatement {
1060 Select {
1062 broken: bool,
1064 plan: plan::SelectPlan,
1065 desc: RelationDesc,
1066 },
1067 CreateView {
1069 broken: bool,
1071 plan: plan::CreateViewPlan,
1072 },
1073 CreateMaterializedView {
1075 broken: bool,
1077 plan: plan::CreateMaterializedViewPlan,
1078 },
1079 CreateIndex {
1081 broken: bool,
1083 plan: plan::CreateIndexPlan,
1084 },
1085 Subscribe {
1087 broken: bool,
1089 plan: plan::SubscribePlan,
1090 },
1091}
1092
1093impl ExplaineeStatement {
1094 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1095 match self {
1096 Self::Select { plan, .. } => plan.source.depends_on(),
1097 Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1098 Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1099 Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1100 Self::Subscribe { plan, .. } => plan.from.depends_on(),
1101 }
1102 }
1103
1104 pub fn broken(&self) -> bool {
1115 match self {
1116 Self::Select { broken, .. } => *broken,
1117 Self::CreateView { broken, .. } => *broken,
1118 Self::CreateMaterializedView { broken, .. } => *broken,
1119 Self::CreateIndex { broken, .. } => *broken,
1120 Self::Subscribe { broken, .. } => *broken,
1121 }
1122 }
1123}
1124
1125impl ExplaineeStatementKind {
1126 pub fn supports(&self, stage: &ExplainStage) -> bool {
1127 use ExplainStage::*;
1128 match self {
1129 Self::Select => true,
1130 Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1131 Self::CreateMaterializedView => true,
1132 Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1133 Self::Subscribe => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1136 }
1137 }
1138}
1139
1140impl std::fmt::Display for ExplaineeStatementKind {
1141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1142 match self {
1143 Self::Select => write!(f, "SELECT"),
1144 Self::CreateView => write!(f, "CREATE VIEW"),
1145 Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1146 Self::CreateIndex => write!(f, "CREATE INDEX"),
1147 Self::Subscribe => write!(f, "SUBSCRIBE"),
1148 }
1149 }
1150}
1151
1152#[derive(Clone, Debug)]
1153pub struct ExplainPushdownPlan {
1154 pub explainee: Explainee,
1155}
1156
1157#[derive(Clone, Debug)]
1158pub struct ExplainTimestampPlan {
1159 pub format: ExplainFormat,
1160 pub raw_plan: HirRelationExpr,
1161 pub when: QueryWhen,
1162}
1163
1164#[derive(Debug)]
1165pub struct ExplainSinkSchemaPlan {
1166 pub sink_from: GlobalId,
1167 pub json_schema: String,
1168}
1169
1170#[derive(Debug)]
1171pub struct SendDiffsPlan {
1172 pub id: CatalogItemId,
1173 pub updates: Vec<(Row, Diff)>,
1174 pub kind: MutationKind,
1175 pub returning: Vec<(Row, NonZeroUsize)>,
1176 pub max_result_size: u64,
1177}
1178
1179#[derive(Debug)]
1180pub struct InsertPlan {
1181 pub id: CatalogItemId,
1182 pub values: HirRelationExpr,
1183 pub returning: Vec<mz_expr::MirScalarExpr>,
1184}
1185
1186#[derive(Debug)]
1187pub struct ReadThenWritePlan {
1188 pub id: CatalogItemId,
1189 pub selection: HirRelationExpr,
1190 pub finishing: RowSetFinishing,
1191 pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1192 pub kind: MutationKind,
1193 pub returning: Vec<mz_expr::MirScalarExpr>,
1194}
1195
1196#[derive(Debug)]
1198pub struct AlterNoopPlan {
1199 pub object_type: ObjectType,
1200}
1201
1202#[derive(Debug)]
1203pub struct AlterSetClusterPlan {
1204 pub id: CatalogItemId,
1205 pub set_cluster: ClusterId,
1206}
1207
1208#[derive(Debug)]
1209pub struct AlterRetainHistoryPlan {
1210 pub id: CatalogItemId,
1211 pub value: Option<Value>,
1212 pub window: CompactionWindow,
1213 pub object_type: ObjectType,
1214}
1215
1216#[derive(Debug)]
1217pub struct AlterSourceTimestampIntervalPlan {
1218 pub id: CatalogItemId,
1219 pub value: Option<Value>,
1220 pub interval: Duration,
1221}
1222
1223#[derive(Debug, Clone)]
1224
1225pub enum AlterOptionParameter<T = String> {
1226 Set(T),
1227 Reset,
1228 Unchanged,
1229}
1230
1231#[derive(Debug)]
1232pub enum AlterConnectionAction {
1233 RotateKeys,
1234 AlterOptions {
1235 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1236 drop_options: BTreeSet<ConnectionOptionName>,
1237 validate: bool,
1238 },
1239}
1240
1241#[derive(Debug)]
1242pub struct AlterConnectionPlan {
1243 pub id: CatalogItemId,
1244 pub action: AlterConnectionAction,
1245}
1246
1247#[derive(Debug)]
1248pub enum AlterSourceAction {
1249 AddSubsourceExports {
1250 subsources: Vec<CreateSourcePlanBundle>,
1251 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1252 },
1253 RefreshReferences {
1254 references: SourceReferences,
1255 },
1256}
1257
1258#[derive(Debug)]
1259pub struct AlterSourcePlan {
1260 pub item_id: CatalogItemId,
1261 pub ingestion_id: GlobalId,
1262 pub action: AlterSourceAction,
1263}
1264
1265#[derive(Debug, Clone)]
1266pub struct AlterSinkPlan {
1267 pub item_id: CatalogItemId,
1268 pub global_id: GlobalId,
1269 pub sink: Sink,
1270 pub with_snapshot: bool,
1271 pub in_cluster: ClusterId,
1272}
1273
1274#[derive(Debug, Clone)]
1275pub struct AlterClusterPlan {
1276 pub id: ClusterId,
1277 pub name: String,
1278 pub options: PlanClusterOption,
1279 pub strategy: AlterClusterPlanStrategy,
1280}
1281
1282#[derive(Debug)]
1283pub struct AlterClusterRenamePlan {
1284 pub id: ClusterId,
1285 pub name: String,
1286 pub to_name: String,
1287}
1288
1289#[derive(Debug)]
1290pub struct AlterClusterReplicaRenamePlan {
1291 pub cluster_id: ClusterId,
1292 pub replica_id: ReplicaId,
1293 pub name: QualifiedReplica,
1294 pub to_name: String,
1295}
1296
1297#[derive(Debug)]
1298pub struct AlterItemRenamePlan {
1299 pub id: CatalogItemId,
1300 pub current_full_name: FullItemName,
1301 pub to_name: String,
1302 pub object_type: ObjectType,
1303}
1304
1305#[derive(Debug)]
1306pub struct AlterSchemaRenamePlan {
1307 pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1308 pub new_schema_name: String,
1309}
1310
1311#[derive(Debug)]
1312pub struct AlterSchemaSwapPlan {
1313 pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1314 pub schema_a_name: String,
1315 pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1316 pub schema_b_name: String,
1317 pub name_temp: String,
1318}
1319
1320#[derive(Debug)]
1321pub struct AlterClusterSwapPlan {
1322 pub id_a: ClusterId,
1323 pub id_b: ClusterId,
1324 pub name_a: String,
1325 pub name_b: String,
1326 pub name_temp: String,
1327}
1328
1329#[derive(Debug)]
1330pub struct AlterSecretPlan {
1331 pub id: CatalogItemId,
1332 pub secret_as: MirScalarExpr,
1333}
1334
1335#[derive(Debug)]
1336pub struct AlterSystemSetPlan {
1337 pub name: String,
1338 pub value: VariableValue,
1339}
1340
1341#[derive(Debug)]
1342pub struct AlterSystemResetPlan {
1343 pub name: String,
1344}
1345
1346#[derive(Debug)]
1347pub struct AlterSystemResetAllPlan {}
1348
1349#[derive(Debug)]
1350pub struct AlterRolePlan {
1351 pub id: RoleId,
1352 pub name: String,
1353 pub option: PlannedAlterRoleOption,
1354}
1355
1356#[derive(Debug)]
1357pub struct AlterOwnerPlan {
1358 pub id: ObjectId,
1359 pub object_type: ObjectType,
1360 pub new_owner: RoleId,
1361}
1362
1363#[derive(Debug)]
1364pub struct AlterTablePlan {
1365 pub relation_id: CatalogItemId,
1366 pub column_name: ColumnName,
1367 pub column_type: SqlColumnType,
1368 pub raw_sql_type: RawDataType,
1369}
1370
1371#[derive(Debug, Clone)]
1372pub struct AlterMaterializedViewApplyReplacementPlan {
1373 pub id: CatalogItemId,
1374 pub replacement_id: CatalogItemId,
1375}
1376
1377#[derive(Debug)]
1378pub struct DeclarePlan {
1379 pub name: String,
1380 pub stmt: Statement<Raw>,
1381 pub sql: String,
1382 pub params: Params,
1383}
1384
1385#[derive(Debug)]
1386pub struct FetchPlan {
1387 pub name: String,
1388 pub count: Option<FetchDirection>,
1389 pub timeout: ExecuteTimeout,
1390}
1391
1392#[derive(Debug)]
1393pub struct ClosePlan {
1394 pub name: String,
1395}
1396
1397#[derive(Debug)]
1398pub struct PreparePlan {
1399 pub name: String,
1400 pub stmt: Statement<Raw>,
1401 pub sql: String,
1402 pub desc: StatementDesc,
1403}
1404
1405#[derive(Debug)]
1406pub struct ExecutePlan {
1407 pub name: String,
1408 pub params: Params,
1409}
1410
1411#[derive(Debug)]
1412pub struct DeallocatePlan {
1413 pub name: Option<String>,
1414}
1415
1416#[derive(Debug)]
1417pub struct RaisePlan {
1418 pub severity: NoticeSeverity,
1419}
1420
1421#[derive(Debug)]
1422pub struct GrantRolePlan {
1423 pub role_ids: Vec<RoleId>,
1425 pub member_ids: Vec<RoleId>,
1427 pub grantor_id: RoleId,
1429}
1430
1431#[derive(Debug)]
1432pub struct RevokeRolePlan {
1433 pub role_ids: Vec<RoleId>,
1435 pub member_ids: Vec<RoleId>,
1437 pub grantor_id: RoleId,
1439}
1440
1441#[derive(Debug)]
1442pub struct UpdatePrivilege {
1443 pub acl_mode: AclMode,
1445 pub target_id: SystemObjectId,
1447 pub grantor: RoleId,
1449}
1450
1451#[derive(Debug)]
1452pub struct GrantPrivilegesPlan {
1453 pub update_privileges: Vec<UpdatePrivilege>,
1455 pub grantees: Vec<RoleId>,
1457}
1458
1459#[derive(Debug)]
1460pub struct RevokePrivilegesPlan {
1461 pub update_privileges: Vec<UpdatePrivilege>,
1463 pub revokees: Vec<RoleId>,
1465}
1466#[derive(Debug)]
1467pub struct AlterDefaultPrivilegesPlan {
1468 pub privilege_objects: Vec<DefaultPrivilegeObject>,
1470 pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1472 pub is_grant: bool,
1474}
1475
1476#[derive(Debug)]
1477pub struct ReassignOwnedPlan {
1478 pub old_roles: Vec<RoleId>,
1480 pub new_role: RoleId,
1482 pub reassign_ids: Vec<ObjectId>,
1484}
1485
1486#[derive(Debug)]
1487pub struct CommentPlan {
1488 pub object_id: CommentObjectId,
1490 pub sub_component: Option<usize>,
1494 pub comment: Option<String>,
1496}
1497
1498#[derive(Clone, Debug)]
1499pub enum TableDataSource {
1500 TableWrites { defaults: Vec<Expr<Aug>> },
1502
1503 DataSource {
1506 desc: DataSourceDesc,
1507 timeline: Timeline,
1508 },
1509}
1510
1511#[derive(Clone, Debug)]
1512pub struct Table {
1513 pub create_sql: String,
1514 pub desc: VersionedRelationDesc,
1515 pub temporary: bool,
1516 pub compaction_window: Option<CompactionWindow>,
1517 pub data_source: TableDataSource,
1518}
1519
1520#[derive(Clone, Debug)]
1521pub struct Source {
1522 pub create_sql: String,
1523 pub data_source: DataSourceDesc,
1524 pub desc: RelationDesc,
1525 pub compaction_window: Option<CompactionWindow>,
1526}
1527
1528#[derive(Debug, Clone)]
1529pub enum DataSourceDesc {
1530 Ingestion(SourceDesc<ReferencedConnection>),
1532 OldSyntaxIngestion {
1534 desc: SourceDesc<ReferencedConnection>,
1535 progress_subsource: CatalogItemId,
1538 data_config: SourceExportDataConfig<ReferencedConnection>,
1539 details: SourceExportDetails,
1540 },
1541 IngestionExport {
1544 ingestion_id: CatalogItemId,
1545 external_reference: UnresolvedItemName,
1546 details: SourceExportDetails,
1547 data_config: SourceExportDataConfig<ReferencedConnection>,
1548 },
1549 Progress,
1551 Webhook {
1553 validate_using: Option<WebhookValidation>,
1554 body_format: WebhookBodyFormat,
1555 headers: WebhookHeaders,
1556 cluster_id: Option<StorageInstanceId>,
1558 },
1559}
1560
1561#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1562pub struct WebhookValidation {
1563 pub expression: MirScalarExpr,
1565 pub relation_desc: RelationDesc,
1567 pub bodies: Vec<(usize, bool)>,
1569 pub headers: Vec<(usize, bool)>,
1571 pub secrets: Vec<WebhookValidationSecret>,
1573}
1574
1575impl WebhookValidation {
1576 const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1577
1578 pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1583 let WebhookValidation {
1584 expression,
1585 relation_desc,
1586 ..
1587 } = self;
1588
1589 let mut expression_ = expression.clone();
1591 let desc_ = relation_desc.clone();
1592 let reduce_task = mz_ore::task::spawn_blocking(
1593 || "webhook-validation-reduce",
1594 move || {
1595 let repr_col_types: Vec<ReprColumnType> = desc_
1596 .typ()
1597 .column_types
1598 .iter()
1599 .map(ReprColumnType::from)
1600 .collect();
1601 expression_.reduce(&repr_col_types);
1602 expression_
1603 },
1604 );
1605
1606 match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1607 Ok(reduced_expr) => {
1608 *expression = reduced_expr;
1609 Ok(())
1610 }
1611 Err(_) => Err("timeout"),
1612 }
1613 }
1614}
1615
1616#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1617pub struct WebhookHeaders {
1618 pub header_column: Option<WebhookHeaderFilters>,
1620 pub mapped_headers: BTreeMap<usize, (String, bool)>,
1622}
1623
1624impl WebhookHeaders {
1625 pub fn num_columns(&self) -> usize {
1627 let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1628 let mapped_headers = self.mapped_headers.len();
1629
1630 header_column + mapped_headers
1631 }
1632}
1633
1634#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1635pub struct WebhookHeaderFilters {
1636 pub block: BTreeSet<String>,
1637 pub allow: BTreeSet<String>,
1638}
1639
1640#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Arbitrary)]
1641pub enum WebhookBodyFormat {
1642 Json { array: bool },
1643 Bytes,
1644 Text,
1645}
1646
1647impl From<WebhookBodyFormat> for SqlScalarType {
1648 fn from(value: WebhookBodyFormat) -> Self {
1649 match value {
1650 WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1651 WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1652 WebhookBodyFormat::Text => SqlScalarType::String,
1653 }
1654 }
1655}
1656
1657#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1658pub struct WebhookValidationSecret {
1659 pub id: CatalogItemId,
1661 pub column_idx: usize,
1663 pub use_bytes: bool,
1665}
1666
1667#[derive(Clone, Debug)]
1668pub struct Connection {
1669 pub create_sql: String,
1670 pub details: ConnectionDetails,
1671}
1672
1673#[derive(Clone, Debug, Serialize)]
1674pub enum ConnectionDetails {
1675 Kafka(KafkaConnection<ReferencedConnection>),
1676 Csr(CsrConnection<ReferencedConnection>),
1677 Postgres(PostgresConnection<ReferencedConnection>),
1678 Ssh {
1679 connection: SshConnection,
1680 key_1: SshKey,
1681 key_2: SshKey,
1682 },
1683 Aws(AwsConnection),
1684 AwsPrivatelink(AwsPrivatelinkConnection),
1685 MySql(MySqlConnection<ReferencedConnection>),
1686 SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1687 IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1688}
1689
1690impl ConnectionDetails {
1691 pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1692 match self {
1693 ConnectionDetails::Kafka(c) => {
1694 mz_storage_types::connections::Connection::Kafka(c.clone())
1695 }
1696 ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1697 ConnectionDetails::Postgres(c) => {
1698 mz_storage_types::connections::Connection::Postgres(c.clone())
1699 }
1700 ConnectionDetails::Ssh { connection, .. } => {
1701 mz_storage_types::connections::Connection::Ssh(connection.clone())
1702 }
1703 ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1704 ConnectionDetails::AwsPrivatelink(c) => {
1705 mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1706 }
1707 ConnectionDetails::MySql(c) => {
1708 mz_storage_types::connections::Connection::MySql(c.clone())
1709 }
1710 ConnectionDetails::SqlServer(c) => {
1711 mz_storage_types::connections::Connection::SqlServer(c.clone())
1712 }
1713 ConnectionDetails::IcebergCatalog(c) => {
1714 mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1715 }
1716 }
1717 }
1718}
1719
1720#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1721pub struct NetworkPolicyRule {
1722 pub name: String,
1723 pub action: NetworkPolicyRuleAction,
1724 pub address: PolicyAddress,
1725 pub direction: NetworkPolicyRuleDirection,
1726}
1727
1728#[derive(
1729 Debug,
1730 Clone,
1731 Serialize,
1732 Deserialize,
1733 PartialEq,
1734 Eq,
1735 Ord,
1736 PartialOrd,
1737 Hash
1738)]
1739pub enum NetworkPolicyRuleAction {
1740 Allow,
1741}
1742
1743impl std::fmt::Display for NetworkPolicyRuleAction {
1744 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1745 match self {
1746 Self::Allow => write!(f, "allow"),
1747 }
1748 }
1749}
1750impl TryFrom<&str> for NetworkPolicyRuleAction {
1751 type Error = PlanError;
1752 fn try_from(value: &str) -> Result<Self, Self::Error> {
1753 match value.to_uppercase().as_str() {
1754 "ALLOW" => Ok(Self::Allow),
1755 _ => Err(PlanError::Unstructured(
1756 "Allow is the only valid option".into(),
1757 )),
1758 }
1759 }
1760}
1761
1762#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1763pub enum NetworkPolicyRuleDirection {
1764 Ingress,
1765}
1766impl std::fmt::Display for NetworkPolicyRuleDirection {
1767 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1768 match self {
1769 Self::Ingress => write!(f, "ingress"),
1770 }
1771 }
1772}
1773impl TryFrom<&str> for NetworkPolicyRuleDirection {
1774 type Error = PlanError;
1775 fn try_from(value: &str) -> Result<Self, Self::Error> {
1776 match value.to_uppercase().as_str() {
1777 "INGRESS" => Ok(Self::Ingress),
1778 _ => Err(PlanError::Unstructured(
1779 "Ingress is the only valid option".into(),
1780 )),
1781 }
1782 }
1783}
1784
1785#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1786pub struct PolicyAddress(pub IpNet);
1787impl std::fmt::Display for PolicyAddress {
1788 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1789 write!(f, "{}", &self.0.to_string())
1790 }
1791}
1792impl From<String> for PolicyAddress {
1793 fn from(value: String) -> Self {
1794 Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1795 }
1796}
1797impl TryFrom<&str> for PolicyAddress {
1798 type Error = PlanError;
1799 fn try_from(value: &str) -> Result<Self, Self::Error> {
1800 let net = IpNet::from_str(value)
1801 .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1802 Ok(Self(net))
1803 }
1804}
1805
1806impl Serialize for PolicyAddress {
1807 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1808 where
1809 S: serde::Serializer,
1810 {
1811 serializer.serialize_str(&format!("{}", &self.0))
1812 }
1813}
1814
1815#[derive(Clone, Debug, Serialize)]
1816pub enum SshKey {
1817 PublicOnly(String),
1818 Both(SshKeyPair),
1819}
1820
1821impl SshKey {
1822 pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1823 match self {
1824 SshKey::PublicOnly(_) => None,
1825 SshKey::Both(key_pair) => Some(key_pair),
1826 }
1827 }
1828
1829 pub fn public_key(&self) -> String {
1830 match self {
1831 SshKey::PublicOnly(s) => s.into(),
1832 SshKey::Both(p) => p.ssh_public_key(),
1833 }
1834 }
1835}
1836
1837#[derive(Clone, Debug)]
1838pub struct Secret {
1839 pub create_sql: String,
1840 pub secret_as: MirScalarExpr,
1841}
1842
1843#[derive(Clone, Debug)]
1844pub struct Sink {
1845 pub create_sql: String,
1847 pub from: GlobalId,
1849 pub connection: StorageSinkConnection<ReferencedConnection>,
1851 pub envelope: SinkEnvelope,
1853 pub version: u64,
1854 pub commit_interval: Option<Duration>,
1855}
1856
1857#[derive(Clone, Debug)]
1858pub struct View {
1859 pub create_sql: String,
1861 pub expr: HirRelationExpr,
1863 pub dependencies: DependencyIds,
1865 pub column_names: Vec<ColumnName>,
1867 pub temporary: bool,
1869}
1870
1871#[derive(Clone, Debug)]
1872pub struct MaterializedView {
1873 pub create_sql: String,
1875 pub expr: HirRelationExpr,
1877 pub dependencies: DependencyIds,
1879 pub column_names: Vec<ColumnName>,
1881 pub replacement_target: Option<CatalogItemId>,
1882 pub cluster_id: ClusterId,
1884 pub target_replica: Option<ReplicaId>,
1886 pub non_null_assertions: Vec<usize>,
1887 pub compaction_window: Option<CompactionWindow>,
1888 pub refresh_schedule: Option<RefreshSchedule>,
1889 pub as_of: Option<Timestamp>,
1890}
1891
1892#[derive(Clone, Debug)]
1893pub struct Index {
1894 pub create_sql: String,
1896 pub on: GlobalId,
1898 pub keys: Vec<mz_expr::MirScalarExpr>,
1899 pub compaction_window: Option<CompactionWindow>,
1900 pub cluster_id: ClusterId,
1901}
1902
1903#[derive(Clone, Debug)]
1904pub struct Type {
1905 pub create_sql: String,
1906 pub inner: CatalogType<IdReference>,
1907}
1908
1909#[derive(Deserialize, Clone, Debug, PartialEq)]
1911pub enum QueryWhen {
1912 Immediately,
1915 FreshestTableWrite,
1918 AtTimestamp(Timestamp),
1923 AtLeastTimestamp(Timestamp),
1926}
1927
1928impl QueryWhen {
1929 pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1931 match self {
1932 QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1933 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1934 }
1935 }
1936 pub fn constrains_upper(&self) -> bool {
1940 match self {
1941 QueryWhen::AtTimestamp(_) => true,
1942 QueryWhen::AtLeastTimestamp(_)
1943 | QueryWhen::Immediately
1944 | QueryWhen::FreshestTableWrite => false,
1945 }
1946 }
1947 pub fn advance_to_since(&self) -> bool {
1949 match self {
1950 QueryWhen::Immediately
1951 | QueryWhen::AtLeastTimestamp(_)
1952 | QueryWhen::FreshestTableWrite => true,
1953 QueryWhen::AtTimestamp(_) => false,
1954 }
1955 }
1956 pub fn can_advance_to_upper(&self) -> bool {
1958 match self {
1959 QueryWhen::Immediately => true,
1960 QueryWhen::FreshestTableWrite
1961 | QueryWhen::AtTimestamp(_)
1962 | QueryWhen::AtLeastTimestamp(_) => false,
1963 }
1964 }
1965
1966 pub fn can_advance_to_timeline_ts(&self) -> bool {
1968 match self {
1969 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1970 QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1971 }
1972 }
1973 pub fn must_advance_to_timeline_ts(&self) -> bool {
1975 match self {
1976 QueryWhen::FreshestTableWrite => true,
1977 QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1978 false
1979 }
1980 }
1981 }
1982 pub fn is_transactional(&self) -> bool {
1984 match self {
1985 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1986 QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1987 }
1988 }
1989}
1990
1991#[derive(Debug, Copy, Clone)]
1992pub enum MutationKind {
1993 Insert,
1994 Update,
1995 Delete,
1996}
1997
1998#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1999pub enum CopyFormat {
2000 Text,
2001 Csv,
2002 Binary,
2003 Parquet,
2004}
2005
2006#[derive(Debug, Copy, Clone)]
2007pub enum ExecuteTimeout {
2008 None,
2009 Seconds(f64),
2010 WaitOnce,
2011}
2012
2013#[derive(Clone, Debug)]
2014pub enum IndexOption {
2015 RetainHistory(CompactionWindow),
2017}
2018
2019#[derive(Clone, Debug)]
2020pub enum TableOption {
2021 RetainHistory(CompactionWindow),
2023}
2024
2025#[derive(Clone, Debug)]
2026pub struct PlanClusterOption {
2027 pub availability_zones: AlterOptionParameter<Vec<String>>,
2028 pub introspection_debugging: AlterOptionParameter<bool>,
2029 pub introspection_interval: AlterOptionParameter<OptionalDuration>,
2030 pub managed: AlterOptionParameter<bool>,
2031 pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
2032 pub replication_factor: AlterOptionParameter<u32>,
2033 pub size: AlterOptionParameter,
2034 pub schedule: AlterOptionParameter<ClusterSchedule>,
2035 pub workload_class: AlterOptionParameter<Option<String>>,
2036}
2037
2038impl Default for PlanClusterOption {
2039 fn default() -> Self {
2040 Self {
2041 availability_zones: AlterOptionParameter::Unchanged,
2042 introspection_debugging: AlterOptionParameter::Unchanged,
2043 introspection_interval: AlterOptionParameter::Unchanged,
2044 managed: AlterOptionParameter::Unchanged,
2045 replicas: AlterOptionParameter::Unchanged,
2046 replication_factor: AlterOptionParameter::Unchanged,
2047 size: AlterOptionParameter::Unchanged,
2048 schedule: AlterOptionParameter::Unchanged,
2049 workload_class: AlterOptionParameter::Unchanged,
2050 }
2051 }
2052}
2053
2054#[derive(Clone, Debug, PartialEq, Eq)]
2055pub enum AlterClusterPlanStrategy {
2056 None,
2057 For(Duration),
2058 UntilReady {
2059 on_timeout: OnTimeoutAction,
2060 timeout: Duration,
2061 },
2062}
2063
2064#[derive(Clone, Debug, PartialEq, Eq)]
2065pub enum OnTimeoutAction {
2066 Commit,
2067 Rollback,
2068}
2069
2070impl Default for OnTimeoutAction {
2071 fn default() -> Self {
2072 Self::Commit
2073 }
2074}
2075
2076impl TryFrom<&str> for OnTimeoutAction {
2077 type Error = PlanError;
2078 fn try_from(value: &str) -> Result<Self, Self::Error> {
2079 match value.to_uppercase().as_str() {
2080 "COMMIT" => Ok(Self::Commit),
2081 "ROLLBACK" => Ok(Self::Rollback),
2082 _ => Err(PlanError::Unstructured(
2083 "Valid options are COMMIT, ROLLBACK".into(),
2084 )),
2085 }
2086 }
2087}
2088
2089impl AlterClusterPlanStrategy {
2090 pub fn is_none(&self) -> bool {
2091 matches!(self, Self::None)
2092 }
2093 pub fn is_some(&self) -> bool {
2094 !matches!(self, Self::None)
2095 }
2096}
2097
2098impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2099 type Error = PlanError;
2100
2101 fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2102 Ok(match value.wait {
2103 Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2104 Some(ClusterAlterOptionValue::UntilReady(options)) => {
2105 let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2106 Self::UntilReady {
2107 timeout: match extracted.timeout {
2108 Some(d) => d,
2109 None => Err(PlanError::UntilReadyTimeoutRequired)?,
2110 },
2111 on_timeout: match extracted.on_timeout {
2112 Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2113 PlanError::InvalidOptionValue {
2114 option_name: "ON TIMEOUT".into(),
2115 err: Box::new(e),
2116 }
2117 })?,
2118 None => OnTimeoutAction::default(),
2119 },
2120 }
2121 }
2122 None => Self::None,
2123 })
2124 }
2125}
2126
2127#[derive(Debug, Clone)]
2129pub struct Params {
2130 pub datums: Row,
2132 pub execute_types: Vec<SqlScalarType>,
2134 pub expected_types: Vec<SqlScalarType>,
2136}
2137
2138impl Params {
2139 pub fn empty() -> Params {
2141 Params {
2142 datums: Row::pack_slice(&[]),
2143 execute_types: vec![],
2144 expected_types: vec![],
2145 }
2146 }
2147}
2148
2149#[derive(
2151 Ord,
2152 PartialOrd,
2153 Clone,
2154 Debug,
2155 Eq,
2156 PartialEq,
2157 Serialize,
2158 Deserialize,
2159 Hash,
2160 Copy
2161)]
2162pub struct PlanContext {
2163 pub wall_time: DateTime<Utc>,
2164 pub ignore_if_exists_errors: bool,
2165}
2166
2167impl PlanContext {
2168 pub fn new(wall_time: DateTime<Utc>) -> Self {
2169 Self {
2170 wall_time,
2171 ignore_if_exists_errors: false,
2172 }
2173 }
2174
2175 pub fn zero() -> Self {
2179 PlanContext {
2180 wall_time: now::to_datetime(NOW_ZERO()),
2181 ignore_if_exists_errors: false,
2182 }
2183 }
2184
2185 pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2186 self.ignore_if_exists_errors = value;
2187 self
2188 }
2189}