1use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{CollectionPlan, ColumnOrder, MapFilterProject, MirScalarExpr, RowSetFinishing};
41use mz_ore::now::{self, NOW_ZERO};
42use mz_pgcopy::CopyFormatParams;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
44use mz_repr::explain::{ExplainConfig, ExplainFormat};
45use mz_repr::network_policy_id::NetworkPolicyId;
46use mz_repr::optimize::OptimizerFeatureOverrides;
47use mz_repr::refresh_schedule::RefreshSchedule;
48use mz_repr::role_id::RoleId;
49use mz_repr::{
50 CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, ReprColumnType, Row,
51 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
52};
53use mz_sql_parser::ast::{
54 AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
55 RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
56 Value, WithOptionValue,
57};
58use mz_ssh_util::keys::SshKeyPair;
59use mz_storage_types::connections::aws::AwsConnection;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::connections::{
62 AwsPrivatelinkConnection, CsrConnection, GlueSchemaRegistryConnection,
63 IcebergCatalogConnection, KafkaConnection, MySqlConnection, PostgresConnection,
64 SqlServerConnectionDetails, SshConnection,
65};
66use mz_storage_types::instances::StorageInstanceId;
67use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
68use mz_storage_types::sources::{
69 SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
70};
71use proptest_derive::Arbitrary;
72use serde::{Deserialize, Serialize};
73
74use crate::ast::{
75 ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
76 TransactionAccessMode,
77};
78use crate::catalog::{
79 CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
80 RoleAttributesRaw,
81};
82use crate::names::{
83 Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
84 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
85};
86
87pub(crate) mod error;
88pub(crate) mod explain;
89pub(crate) mod hir;
90pub(crate) mod literal;
91pub(crate) mod lowering;
92pub(crate) mod notice;
93pub(crate) mod plan_utils;
94pub(crate) mod query;
95pub(crate) mod scope;
96pub(crate) mod side_effecting_func;
97pub(crate) mod statement;
98pub(crate) mod transform_ast;
99pub(crate) mod transform_hir;
100pub(crate) mod typeconv;
101pub(crate) mod with_options;
102
103use crate::plan;
104use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
105use crate::plan::with_options::OptionalDuration;
106pub use error::PlanError;
107pub use explain::normalize_subqueries;
108pub use hir::{
109 AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
110 WindowExprType,
111};
112pub use lowering::Config as HirToMirConfig;
113pub use notice::PlanNotice;
114pub use query::{ExprContext, QueryContext, QueryLifetime};
115pub use scope::Scope;
116pub use side_effecting_func::SideEffectingFunc;
117pub use statement::ddl::{
118 AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
119 PlannedAlterRoleOption, PlannedRoleAttributes, PlannedRoleVariable,
120 SqlServerConfigOptionExtracted,
121};
122pub use statement::{
123 StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
124 resolve_cluster_for_materialized_view,
125};
126
127use self::statement::ddl::ClusterAlterOptionExtracted;
128use self::with_options::TryFromValue;
129
130#[derive(Debug, EnumKind)]
132#[enum_kind(PlanKind)]
133pub enum Plan {
134 CreateConnection(CreateConnectionPlan),
135 CreateDatabase(CreateDatabasePlan),
136 CreateSchema(CreateSchemaPlan),
137 CreateRole(CreateRolePlan),
138 CreateCluster(CreateClusterPlan),
139 CreateClusterReplica(CreateClusterReplicaPlan),
140 CreateSource(CreateSourcePlan),
141 CreateSources(Vec<CreateSourcePlanBundle>),
142 CreateSecret(CreateSecretPlan),
143 CreateSink(CreateSinkPlan),
144 CreateTable(CreateTablePlan),
145 CreateView(CreateViewPlan),
146 CreateMaterializedView(CreateMaterializedViewPlan),
147 CreateNetworkPolicy(CreateNetworkPolicyPlan),
148 CreateIndex(CreateIndexPlan),
149 CreateType(CreateTypePlan),
150 Comment(CommentPlan),
151 DiscardTemp,
152 DiscardAll,
153 DropObjects(DropObjectsPlan),
154 DropOwned(DropOwnedPlan),
155 EmptyQuery,
156 ShowAllVariables,
157 ShowCreate(ShowCreatePlan),
158 ShowColumns(ShowColumnsPlan),
159 ShowVariable(ShowVariablePlan),
160 InspectShard(InspectShardPlan),
161 SetVariable(SetVariablePlan),
162 ResetVariable(ResetVariablePlan),
163 SetTransaction(SetTransactionPlan),
164 StartTransaction(StartTransactionPlan),
165 CommitTransaction(CommitTransactionPlan),
166 AbortTransaction(AbortTransactionPlan),
167 Select(SelectPlan),
168 Subscribe(SubscribePlan),
169 CopyFrom(CopyFromPlan),
170 CopyTo(CopyToPlan),
171 ExplainPlan(ExplainPlanPlan),
172 ExplainPushdown(ExplainPushdownPlan),
173 ExplainTimestamp(ExplainTimestampPlan),
174 ExplainSinkSchema(ExplainSinkSchemaPlan),
175 Insert(InsertPlan),
176 AlterCluster(AlterClusterPlan),
177 AlterClusterSwap(AlterClusterSwapPlan),
178 AlterNoop(AlterNoopPlan),
179 AlterSetCluster(AlterSetClusterPlan),
180 AlterConnection(AlterConnectionPlan),
181 AlterSource(AlterSourcePlan),
182 AlterClusterRename(AlterClusterRenamePlan),
183 AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
184 AlterItemRename(AlterItemRenamePlan),
185 AlterSchemaRename(AlterSchemaRenamePlan),
186 AlterSchemaSwap(AlterSchemaSwapPlan),
187 AlterSecret(AlterSecretPlan),
188 AlterSink(AlterSinkPlan),
189 AlterSystemSet(AlterSystemSetPlan),
190 AlterSystemReset(AlterSystemResetPlan),
191 AlterSystemResetAll(AlterSystemResetAllPlan),
192 AlterRole(AlterRolePlan),
193 AlterOwner(AlterOwnerPlan),
194 AlterTableAddColumn(AlterTablePlan),
195 AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementPlan),
196 AlterNetworkPolicy(AlterNetworkPolicyPlan),
197 Declare(DeclarePlan),
198 Fetch(FetchPlan),
199 Close(ClosePlan),
200 ReadThenWrite(ReadThenWritePlan),
201 Prepare(PreparePlan),
202 Execute(ExecutePlan),
203 Deallocate(DeallocatePlan),
204 Raise(RaisePlan),
205 GrantRole(GrantRolePlan),
206 RevokeRole(RevokeRolePlan),
207 GrantPrivileges(GrantPrivilegesPlan),
208 RevokePrivileges(RevokePrivilegesPlan),
209 AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
210 ReassignOwned(ReassignOwnedPlan),
211 SideEffectingFunc(SideEffectingFunc),
212 ValidateConnection(ValidateConnectionPlan),
213 AlterRetainHistory(AlterRetainHistoryPlan),
214 AlterSourceTimestampInterval(AlterSourceTimestampIntervalPlan),
215}
216
217impl Plan {
218 pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
221 match stmt {
222 StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
223 StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
224 StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
225 StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
226 StatementKind::AlterObjectRename => &[
227 PlanKind::AlterClusterRename,
228 PlanKind::AlterClusterReplicaRename,
229 PlanKind::AlterItemRename,
230 PlanKind::AlterSchemaRename,
231 PlanKind::AlterNoop,
232 ],
233 StatementKind::AlterObjectSwap => &[
234 PlanKind::AlterClusterSwap,
235 PlanKind::AlterSchemaSwap,
236 PlanKind::AlterNoop,
237 ],
238 StatementKind::AlterRole => &[PlanKind::AlterRole],
239 StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
240 StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
241 StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
242 StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
243 StatementKind::AlterSource => &[
244 PlanKind::AlterNoop,
245 PlanKind::AlterSource,
246 PlanKind::AlterRetainHistory,
247 PlanKind::AlterSourceTimestampInterval,
248 ],
249 StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
250 StatementKind::AlterSystemResetAll => {
251 &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
252 }
253 StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
254 StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
255 StatementKind::AlterTableAddColumn => {
256 &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
257 }
258 StatementKind::AlterMaterializedViewApplyReplacement => &[
259 PlanKind::AlterNoop,
260 PlanKind::AlterMaterializedViewApplyReplacement,
261 ],
262 StatementKind::Close => &[PlanKind::Close],
263 StatementKind::Comment => &[PlanKind::Comment],
264 StatementKind::Commit => &[PlanKind::CommitTransaction],
265 StatementKind::Copy => &[
266 PlanKind::CopyFrom,
267 PlanKind::Select,
268 PlanKind::Subscribe,
269 PlanKind::CopyTo,
270 ],
271 StatementKind::CreateCluster => &[PlanKind::CreateCluster],
272 StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
273 StatementKind::CreateConnection => &[PlanKind::CreateConnection],
274 StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
275 StatementKind::CreateIndex => &[PlanKind::CreateIndex],
276 StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
277 StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
278 StatementKind::CreateRole => &[PlanKind::CreateRole],
279 StatementKind::CreateSchema => &[PlanKind::CreateSchema],
280 StatementKind::CreateSecret => &[PlanKind::CreateSecret],
281 StatementKind::CreateSink => &[PlanKind::CreateSink],
282 StatementKind::CreateSource | StatementKind::CreateSubsource => {
283 &[PlanKind::CreateSource]
284 }
285 StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
286 StatementKind::CreateTable => &[PlanKind::CreateTable],
287 StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
288 StatementKind::CreateType => &[PlanKind::CreateType],
289 StatementKind::CreateView => &[PlanKind::CreateView],
290 StatementKind::Deallocate => &[PlanKind::Deallocate],
291 StatementKind::Declare => &[PlanKind::Declare],
292 StatementKind::Delete => &[PlanKind::ReadThenWrite],
293 StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
294 StatementKind::DropObjects => &[PlanKind::DropObjects],
295 StatementKind::DropOwned => &[PlanKind::DropOwned],
296 StatementKind::Execute => &[PlanKind::Execute],
297 StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
298 StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
299 StatementKind::ExplainAnalyzeObject => &[PlanKind::Select],
300 StatementKind::ExplainAnalyzeCluster => &[PlanKind::Select],
301 StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
302 StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
303 StatementKind::Fetch => &[PlanKind::Fetch],
304 StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
305 StatementKind::GrantRole => &[PlanKind::GrantRole],
306 StatementKind::Insert => &[PlanKind::Insert],
307 StatementKind::Prepare => &[PlanKind::Prepare],
308 StatementKind::Raise => &[PlanKind::Raise],
309 StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
310 StatementKind::ResetVariable => &[PlanKind::ResetVariable],
311 StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
312 StatementKind::RevokeRole => &[PlanKind::RevokeRole],
313 StatementKind::Rollback => &[PlanKind::AbortTransaction],
314 StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
315 StatementKind::SetTransaction => &[PlanKind::SetTransaction],
316 StatementKind::SetVariable => &[PlanKind::SetVariable],
317 StatementKind::Show => &[
318 PlanKind::Select,
319 PlanKind::ShowVariable,
320 PlanKind::ShowCreate,
321 PlanKind::ShowColumns,
322 PlanKind::ShowAllVariables,
323 PlanKind::InspectShard,
324 ],
325 StatementKind::StartTransaction => &[PlanKind::StartTransaction],
326 StatementKind::Subscribe => &[PlanKind::Subscribe],
327 StatementKind::Update => &[PlanKind::ReadThenWrite],
328 StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
329 StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
330 StatementKind::ExecuteUnitTest => &[],
331 }
332 }
333
334 pub fn name(&self) -> &str {
336 match self {
337 Plan::CreateConnection(_) => "create connection",
338 Plan::CreateDatabase(_) => "create database",
339 Plan::CreateSchema(_) => "create schema",
340 Plan::CreateRole(_) => "create role",
341 Plan::CreateCluster(_) => "create cluster",
342 Plan::CreateClusterReplica(_) => "create cluster replica",
343 Plan::CreateSource(_) => "create source",
344 Plan::CreateSources(_) => "create source",
345 Plan::CreateSecret(_) => "create secret",
346 Plan::CreateSink(_) => "create sink",
347 Plan::CreateTable(_) => "create table",
348 Plan::CreateView(_) => "create view",
349 Plan::CreateMaterializedView(_) => "create materialized view",
350 Plan::CreateIndex(_) => "create index",
351 Plan::CreateType(_) => "create type",
352 Plan::CreateNetworkPolicy(_) => "create network policy",
353 Plan::Comment(_) => "comment",
354 Plan::DiscardTemp => "discard temp",
355 Plan::DiscardAll => "discard all",
356 Plan::DropObjects(plan) => match plan.object_type {
357 ObjectType::Table => "drop table",
358 ObjectType::View => "drop view",
359 ObjectType::MaterializedView => "drop materialized view",
360 ObjectType::Source => "drop source",
361 ObjectType::Sink => "drop sink",
362 ObjectType::Index => "drop index",
363 ObjectType::Type => "drop type",
364 ObjectType::Role => "drop roles",
365 ObjectType::Cluster => "drop clusters",
366 ObjectType::ClusterReplica => "drop cluster replicas",
367 ObjectType::Secret => "drop secret",
368 ObjectType::Connection => "drop connection",
369 ObjectType::Database => "drop database",
370 ObjectType::Schema => "drop schema",
371 ObjectType::Func => "drop function",
372 ObjectType::NetworkPolicy => "drop network policy",
373 },
374 Plan::DropOwned(_) => "drop owned",
375 Plan::EmptyQuery => "do nothing",
376 Plan::ShowAllVariables => "show all variables",
377 Plan::ShowCreate(_) => "show create",
378 Plan::ShowColumns(_) => "show columns",
379 Plan::ShowVariable(_) => "show variable",
380 Plan::InspectShard(_) => "inspect shard",
381 Plan::SetVariable(_) => "set variable",
382 Plan::ResetVariable(_) => "reset variable",
383 Plan::SetTransaction(_) => "set transaction",
384 Plan::StartTransaction(_) => "start transaction",
385 Plan::CommitTransaction(_) => "commit",
386 Plan::AbortTransaction(_) => "abort",
387 Plan::Select(_) => "select",
388 Plan::Subscribe(_) => "subscribe",
389 Plan::CopyFrom(_) => "copy from",
390 Plan::CopyTo(_) => "copy to",
391 Plan::ExplainPlan(_) => "explain plan",
392 Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
393 Plan::ExplainTimestamp(_) => "explain timestamp",
394 Plan::ExplainSinkSchema(_) => "explain schema",
395 Plan::Insert(_) => "insert",
396 Plan::AlterNoop(plan) => match plan.object_type {
397 ObjectType::Table => "alter table",
398 ObjectType::View => "alter view",
399 ObjectType::MaterializedView => "alter materialized view",
400 ObjectType::Source => "alter source",
401 ObjectType::Sink => "alter sink",
402 ObjectType::Index => "alter index",
403 ObjectType::Type => "alter type",
404 ObjectType::Role => "alter role",
405 ObjectType::Cluster => "alter cluster",
406 ObjectType::ClusterReplica => "alter cluster replica",
407 ObjectType::Secret => "alter secret",
408 ObjectType::Connection => "alter connection",
409 ObjectType::Database => "alter database",
410 ObjectType::Schema => "alter schema",
411 ObjectType::Func => "alter function",
412 ObjectType::NetworkPolicy => "alter network policy",
413 },
414 Plan::AlterCluster(_) => "alter cluster",
415 Plan::AlterClusterRename(_) => "alter cluster rename",
416 Plan::AlterClusterSwap(_) => "alter cluster swap",
417 Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
418 Plan::AlterSetCluster(_) => "alter set cluster",
419 Plan::AlterConnection(_) => "alter connection",
420 Plan::AlterSource(_) => "alter source",
421 Plan::AlterItemRename(_) => "rename item",
422 Plan::AlterSchemaRename(_) => "alter rename schema",
423 Plan::AlterSchemaSwap(_) => "alter swap schema",
424 Plan::AlterSecret(_) => "alter secret",
425 Plan::AlterSink(_) => "alter sink",
426 Plan::AlterSystemSet(_) => "alter system",
427 Plan::AlterSystemReset(_) => "alter system",
428 Plan::AlterSystemResetAll(_) => "alter system",
429 Plan::AlterRole(_) => "alter role",
430 Plan::AlterNetworkPolicy(_) => "alter network policy",
431 Plan::AlterOwner(plan) => match plan.object_type {
432 ObjectType::Table => "alter table owner",
433 ObjectType::View => "alter view owner",
434 ObjectType::MaterializedView => "alter materialized view owner",
435 ObjectType::Source => "alter source owner",
436 ObjectType::Sink => "alter sink owner",
437 ObjectType::Index => "alter index owner",
438 ObjectType::Type => "alter type owner",
439 ObjectType::Role => "alter role owner",
440 ObjectType::Cluster => "alter cluster owner",
441 ObjectType::ClusterReplica => "alter cluster replica owner",
442 ObjectType::Secret => "alter secret owner",
443 ObjectType::Connection => "alter connection owner",
444 ObjectType::Database => "alter database owner",
445 ObjectType::Schema => "alter schema owner",
446 ObjectType::Func => "alter function owner",
447 ObjectType::NetworkPolicy => "alter network policy owner",
448 },
449 Plan::AlterTableAddColumn(_) => "alter table add column",
450 Plan::AlterMaterializedViewApplyReplacement(_) => {
451 "alter materialized view apply replacement"
452 }
453 Plan::Declare(_) => "declare",
454 Plan::Fetch(_) => "fetch",
455 Plan::Close(_) => "close",
456 Plan::ReadThenWrite(plan) => match plan.kind {
457 MutationKind::Insert => "insert into select",
458 MutationKind::Update => "update",
459 MutationKind::Delete => "delete",
460 },
461 Plan::Prepare(_) => "prepare",
462 Plan::Execute(_) => "execute",
463 Plan::Deallocate(_) => "deallocate",
464 Plan::Raise(_) => "raise",
465 Plan::GrantRole(_) => "grant role",
466 Plan::RevokeRole(_) => "revoke role",
467 Plan::GrantPrivileges(_) => "grant privilege",
468 Plan::RevokePrivileges(_) => "revoke privilege",
469 Plan::AlterDefaultPrivileges(_) => "alter default privileges",
470 Plan::ReassignOwned(_) => "reassign owned",
471 Plan::SideEffectingFunc(_) => "side effecting func",
472 Plan::ValidateConnection(_) => "validate connection",
473 Plan::AlterRetainHistory(_) => "alter retain history",
474 Plan::AlterSourceTimestampInterval(_) => "alter source timestamp interval",
475 }
476 }
477
478 pub fn allowed_in_read_only(&self) -> bool {
484 match self {
485 Plan::SetVariable(_) => true,
488 Plan::ResetVariable(_) => true,
489 Plan::SetTransaction(_) => true,
490 Plan::StartTransaction(_) => true,
491 Plan::CommitTransaction(_) => true,
492 Plan::AbortTransaction(_) => true,
493 Plan::Select(_) => true,
494 Plan::EmptyQuery => true,
495 Plan::ShowAllVariables => true,
496 Plan::ShowCreate(_) => true,
497 Plan::ShowColumns(_) => true,
498 Plan::ShowVariable(_) => true,
499 Plan::InspectShard(_) => true,
500 Plan::Subscribe(_) => true,
501 Plan::CopyTo(_) => true,
502 Plan::ExplainPlan(_) => true,
503 Plan::ExplainPushdown(_) => true,
504 Plan::ExplainTimestamp(_) => true,
505 Plan::ExplainSinkSchema(_) => true,
506 Plan::ValidateConnection(_) => true,
507 _ => false,
508 }
509 }
510}
511
512#[derive(Debug)]
513pub struct StartTransactionPlan {
514 pub access: Option<TransactionAccessMode>,
515 pub isolation_level: Option<TransactionIsolationLevel>,
516}
517
518#[derive(Debug)]
519pub enum TransactionType {
520 Explicit,
521 Implicit,
522}
523
524impl TransactionType {
525 pub fn is_explicit(&self) -> bool {
526 matches!(self, TransactionType::Explicit)
527 }
528
529 pub fn is_implicit(&self) -> bool {
530 matches!(self, TransactionType::Implicit)
531 }
532}
533
534#[derive(Debug)]
535pub struct CommitTransactionPlan {
536 pub transaction_type: TransactionType,
537}
538
539#[derive(Debug)]
540pub struct AbortTransactionPlan {
541 pub transaction_type: TransactionType,
542}
543
544#[derive(Debug)]
545pub struct CreateDatabasePlan {
546 pub name: String,
547 pub if_not_exists: bool,
548}
549
550#[derive(Debug)]
551pub struct CreateSchemaPlan {
552 pub database_spec: ResolvedDatabaseSpecifier,
553 pub schema_name: String,
554 pub if_not_exists: bool,
555}
556
557#[derive(Debug)]
558pub struct CreateRolePlan {
559 pub name: String,
560 pub attributes: RoleAttributesRaw,
561}
562
563#[derive(Debug, PartialEq, Eq, Clone)]
564pub struct CreateClusterPlan {
565 pub name: String,
566 pub variant: CreateClusterVariant,
567 pub workload_class: Option<String>,
568}
569
570#[derive(Debug, PartialEq, Eq, Clone)]
571pub enum CreateClusterVariant {
572 Managed(CreateClusterManagedPlan),
573 Unmanaged(CreateClusterUnmanagedPlan),
574}
575
576#[derive(Debug, PartialEq, Eq, Clone)]
577pub struct CreateClusterUnmanagedPlan {
578 pub replicas: Vec<(String, ReplicaConfig)>,
579}
580
581#[derive(Debug, PartialEq, Eq, Clone)]
582pub struct CreateClusterManagedPlan {
583 pub replication_factor: u32,
584 pub size: String,
585 pub availability_zones: Vec<String>,
586 pub compute: ComputeReplicaConfig,
587 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
588 pub schedule: ClusterSchedule,
589}
590
591#[derive(Debug)]
592pub struct CreateClusterReplicaPlan {
593 pub cluster_id: ClusterId,
594 pub name: String,
595 pub config: ReplicaConfig,
596}
597
598#[derive(
600 Clone,
601 Copy,
602 Debug,
603 Serialize,
604 Deserialize,
605 PartialOrd,
606 Ord,
607 PartialEq,
608 Eq
609)]
610pub struct ComputeReplicaIntrospectionConfig {
611 pub debugging: bool,
613 pub interval: Duration,
615}
616
617#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
618pub struct ComputeReplicaConfig {
619 pub introspection: Option<ComputeReplicaIntrospectionConfig>,
620}
621
622#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
623pub enum ReplicaConfig {
624 Unorchestrated {
625 storagectl_addrs: Vec<String>,
626 computectl_addrs: Vec<String>,
627 compute: ComputeReplicaConfig,
628 },
629 Orchestrated {
630 size: String,
631 availability_zone: Option<String>,
632 compute: ComputeReplicaConfig,
633 internal: bool,
634 billed_as: Option<String>,
635 },
636}
637
638#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
639pub enum ClusterSchedule {
640 Manual,
642 Refresh { hydration_time_estimate: Duration },
646}
647
648impl Default for ClusterSchedule {
649 fn default() -> Self {
650 ClusterSchedule::Manual
652 }
653}
654
655#[derive(Debug)]
656pub struct CreateSourcePlan {
657 pub name: QualifiedItemName,
658 pub source: Source,
659 pub if_not_exists: bool,
660 pub timeline: Timeline,
661 pub in_cluster: Option<ClusterId>,
663}
664
665#[derive(Clone, Debug, PartialEq, Eq)]
666pub struct SourceReferences {
667 pub updated_at: u64,
668 pub references: Vec<SourceReference>,
669}
670
671#[derive(Clone, Debug, PartialEq, Eq)]
674pub struct SourceReference {
675 pub name: String,
676 pub namespace: Option<String>,
677 pub columns: Vec<String>,
678}
679
680#[derive(Debug)]
682pub struct CreateSourcePlanBundle {
683 pub item_id: CatalogItemId,
685 pub global_id: GlobalId,
687 pub plan: CreateSourcePlan,
689 pub resolved_ids: ResolvedIds,
691 pub available_source_references: Option<SourceReferences>,
695}
696
697#[derive(Debug)]
698pub struct CreateConnectionPlan {
699 pub name: QualifiedItemName,
700 pub if_not_exists: bool,
701 pub connection: Connection,
702 pub validate: bool,
703}
704
705#[derive(Debug)]
706pub struct ValidateConnectionPlan {
707 pub id: CatalogItemId,
709 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
711}
712
713#[derive(Debug)]
714pub struct CreateSecretPlan {
715 pub name: QualifiedItemName,
716 pub secret: Secret,
717 pub if_not_exists: bool,
718}
719
720#[derive(Debug)]
721pub struct CreateSinkPlan {
722 pub name: QualifiedItemName,
723 pub sink: Sink,
724 pub with_snapshot: bool,
725 pub if_not_exists: bool,
726 pub in_cluster: ClusterId,
727}
728
729#[derive(Debug)]
730pub struct CreateTablePlan {
731 pub name: QualifiedItemName,
732 pub table: Table,
733 pub if_not_exists: bool,
734}
735
736#[derive(Debug, Clone)]
737pub struct CreateViewPlan {
738 pub name: QualifiedItemName,
739 pub view: View,
740 pub replace: Option<CatalogItemId>,
742 pub drop_ids: Vec<CatalogItemId>,
744 pub if_not_exists: bool,
745 pub ambiguous_columns: bool,
748}
749
750#[derive(Debug, Clone)]
751pub struct CreateMaterializedViewPlan {
752 pub name: QualifiedItemName,
753 pub materialized_view: MaterializedView,
754 pub replace: Option<CatalogItemId>,
756 pub drop_ids: Vec<CatalogItemId>,
758 pub if_not_exists: bool,
759 pub ambiguous_columns: bool,
762}
763
764#[derive(Debug, Clone)]
765pub struct CreateNetworkPolicyPlan {
766 pub name: String,
767 pub rules: Vec<NetworkPolicyRule>,
768}
769
770#[derive(Debug, Clone)]
771pub struct AlterNetworkPolicyPlan {
772 pub id: NetworkPolicyId,
773 pub name: String,
774 pub rules: Vec<NetworkPolicyRule>,
775}
776
777#[derive(Debug, Clone)]
778pub struct CreateIndexPlan {
779 pub name: QualifiedItemName,
780 pub index: Index,
781 pub if_not_exists: bool,
782}
783
784#[derive(Debug)]
785pub struct CreateTypePlan {
786 pub name: QualifiedItemName,
787 pub typ: Type,
788}
789
790#[derive(Debug)]
791pub struct DropObjectsPlan {
792 pub referenced_ids: Vec<ObjectId>,
794 pub drop_ids: Vec<ObjectId>,
796 pub object_type: ObjectType,
799}
800
801#[derive(Debug)]
802pub struct DropOwnedPlan {
803 pub role_ids: Vec<RoleId>,
805 pub drop_ids: Vec<ObjectId>,
807 pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
809 pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
811}
812
813#[derive(Debug)]
814pub struct ShowVariablePlan {
815 pub name: String,
816}
817
818#[derive(Debug)]
819pub struct InspectShardPlan {
820 pub id: GlobalId,
822}
823
824#[derive(Debug)]
825pub struct SetVariablePlan {
826 pub name: String,
827 pub value: VariableValue,
828 pub local: bool,
829}
830
831#[derive(Debug)]
832pub enum VariableValue {
833 Default,
834 Values(Vec<String>),
835}
836
837#[derive(Debug)]
838pub struct ResetVariablePlan {
839 pub name: String,
840}
841
842#[derive(Debug)]
843pub struct SetTransactionPlan {
844 pub local: bool,
845 pub modes: Vec<TransactionMode>,
846}
847
848#[derive(Clone, Debug)]
850pub struct SelectPlan {
851 pub select: Option<Box<SelectStatement<Aug>>>,
854 pub source: HirRelationExpr,
856 pub when: QueryWhen,
858 pub finishing: RowSetFinishing,
860 pub copy_to: Option<CopyFormat>,
862}
863
864impl SelectPlan {
865 pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
866 let arity = typ.arity();
867 SelectPlan {
868 select: None,
869 source: HirRelationExpr::Constant { rows, typ },
870 when: QueryWhen::Immediately,
871 finishing: RowSetFinishing::trivial(arity),
872 copy_to: None,
873 }
874 }
875}
876
877#[derive(Debug, Clone)]
878pub enum SubscribeOutput {
879 Diffs,
880 WithinTimestampOrderBy {
881 order_by: Vec<ColumnOrder>,
883 },
884 EnvelopeUpsert {
885 order_by_keys: Vec<ColumnOrder>,
887 },
888 EnvelopeDebezium {
889 order_by_keys: Vec<ColumnOrder>,
891 },
892}
893
894impl SubscribeOutput {
895 pub fn row_order(&self) -> &[ColumnOrder] {
896 match self {
897 SubscribeOutput::Diffs => &[],
898 SubscribeOutput::WithinTimestampOrderBy { .. } => &[],
900 SubscribeOutput::EnvelopeUpsert { order_by_keys } => order_by_keys,
901 SubscribeOutput::EnvelopeDebezium { order_by_keys } => order_by_keys,
902 }
903 }
904}
905
906#[derive(Debug, Clone)]
907pub struct SubscribePlan {
908 pub from: SubscribeFrom,
909 pub with_snapshot: bool,
910 pub when: QueryWhen,
911 pub up_to: Option<Timestamp>,
912 pub copy_to: Option<CopyFormat>,
913 pub emit_progress: bool,
914 pub output: SubscribeOutput,
915}
916
917#[derive(Debug, Clone)]
918pub enum SubscribeFrom {
919 Id(GlobalId),
921 Query {
923 expr: HirRelationExpr,
924 desc: RelationDesc,
925 },
926}
927
928impl SubscribeFrom {
929 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
930 match self {
931 SubscribeFrom::Id(id) => BTreeSet::from([*id]),
932 SubscribeFrom::Query { expr, .. } => expr.depends_on(),
933 }
934 }
935
936 pub fn contains_temporal(&self) -> bool {
937 match self {
938 SubscribeFrom::Id(_) => false,
939 SubscribeFrom::Query { expr, .. } => expr
940 .contains_temporal()
941 .expect("Unexpected error in `visit_scalars` call"),
942 }
943 }
944}
945
946#[derive(Debug)]
947pub struct ShowCreatePlan {
948 pub id: ObjectId,
949 pub row: Row,
950}
951
952#[derive(Debug)]
953pub struct ShowColumnsPlan {
954 pub id: CatalogItemId,
955 pub select_plan: SelectPlan,
956 pub new_resolved_ids: ResolvedIds,
957}
958
959#[derive(Debug)]
960pub struct CopyFromPlan {
961 pub target_id: CatalogItemId,
963 pub target_name: String,
965 pub source: CopyFromSource,
967 pub columns: Vec<ColumnIndex>,
971 pub source_desc: RelationDesc,
973 pub mfp: MapFilterProject,
975 pub params: CopyFormatParams<'static>,
977 pub filter: Option<CopyFromFilter>,
979}
980
981#[derive(Debug)]
982pub enum CopyFromSource {
983 Stdin,
985 Url(HirScalarExpr),
989 AwsS3 {
991 uri: HirScalarExpr,
993 connection: AwsConnection,
995 connection_id: CatalogItemId,
997 },
998}
999
1000#[derive(Debug)]
1001pub enum CopyFromFilter {
1002 Files(Vec<String>),
1003 Pattern(String),
1004}
1005
1006#[derive(Debug, Clone)]
1011pub struct CopyToPlan {
1012 pub select_plan: SelectPlan,
1014 pub desc: RelationDesc,
1015 pub to: HirScalarExpr,
1017 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
1018 pub connection_id: CatalogItemId,
1020 pub format: S3SinkFormat,
1021 pub max_file_size: u64,
1022}
1023
1024#[derive(Clone, Debug)]
1025pub struct ExplainPlanPlan {
1026 pub stage: ExplainStage,
1027 pub format: ExplainFormat,
1028 pub config: ExplainConfig,
1029 pub explainee: Explainee,
1030}
1031
1032#[derive(Clone, Debug)]
1034pub enum Explainee {
1035 View(CatalogItemId),
1037 MaterializedView(CatalogItemId),
1039 Index(CatalogItemId),
1041 ReplanView(CatalogItemId),
1043 ReplanMaterializedView(CatalogItemId),
1045 ReplanIndex(CatalogItemId),
1047 Statement(ExplaineeStatement),
1049}
1050
1051#[derive(Clone, Debug, EnumKind)]
1053#[enum_kind(ExplaineeStatementKind)]
1054pub enum ExplaineeStatement {
1055 Select {
1057 broken: bool,
1059 plan: plan::SelectPlan,
1060 desc: RelationDesc,
1061 },
1062 CreateView {
1064 broken: bool,
1066 plan: plan::CreateViewPlan,
1067 },
1068 CreateMaterializedView {
1070 broken: bool,
1072 plan: plan::CreateMaterializedViewPlan,
1073 },
1074 CreateIndex {
1076 broken: bool,
1078 plan: plan::CreateIndexPlan,
1079 },
1080 Subscribe {
1082 broken: bool,
1084 plan: plan::SubscribePlan,
1085 },
1086}
1087
1088impl ExplaineeStatement {
1089 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1090 match self {
1091 Self::Select { plan, .. } => plan.source.depends_on(),
1092 Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1093 Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1094 Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1095 Self::Subscribe { plan, .. } => plan.from.depends_on(),
1096 }
1097 }
1098
1099 pub fn broken(&self) -> bool {
1110 match self {
1111 Self::Select { broken, .. } => *broken,
1112 Self::CreateView { broken, .. } => *broken,
1113 Self::CreateMaterializedView { broken, .. } => *broken,
1114 Self::CreateIndex { broken, .. } => *broken,
1115 Self::Subscribe { broken, .. } => *broken,
1116 }
1117 }
1118}
1119
1120impl ExplaineeStatementKind {
1121 pub fn supports(&self, stage: &ExplainStage) -> bool {
1122 use ExplainStage::*;
1123 match self {
1124 Self::Select => true,
1125 Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1126 Self::CreateMaterializedView => true,
1127 Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1128 Self::Subscribe => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1131 }
1132 }
1133}
1134
1135impl std::fmt::Display for ExplaineeStatementKind {
1136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1137 match self {
1138 Self::Select => write!(f, "SELECT"),
1139 Self::CreateView => write!(f, "CREATE VIEW"),
1140 Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1141 Self::CreateIndex => write!(f, "CREATE INDEX"),
1142 Self::Subscribe => write!(f, "SUBSCRIBE"),
1143 }
1144 }
1145}
1146
1147#[derive(Clone, Debug)]
1148pub struct ExplainPushdownPlan {
1149 pub explainee: Explainee,
1150}
1151
1152#[derive(Clone, Debug)]
1153pub struct ExplainTimestampPlan {
1154 pub format: ExplainFormat,
1155 pub raw_plan: HirRelationExpr,
1156 pub when: QueryWhen,
1157}
1158
1159#[derive(Debug)]
1160pub struct ExplainSinkSchemaPlan {
1161 pub sink_from: GlobalId,
1162 pub json_schema: String,
1163}
1164
1165#[derive(Debug)]
1166pub struct SendDiffsPlan {
1167 pub id: CatalogItemId,
1168 pub updates: Vec<(Row, Diff)>,
1169 pub kind: MutationKind,
1170 pub returning: Vec<(Row, NonZeroUsize)>,
1171 pub max_result_size: u64,
1172}
1173
1174#[derive(Debug)]
1175pub struct InsertPlan {
1176 pub id: CatalogItemId,
1177 pub values: HirRelationExpr,
1178 pub returning: Vec<mz_expr::MirScalarExpr>,
1179}
1180
1181#[derive(Debug)]
1182pub struct ReadThenWritePlan {
1183 pub id: CatalogItemId,
1184 pub selection: HirRelationExpr,
1185 pub finishing: RowSetFinishing,
1186 pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1187 pub kind: MutationKind,
1188 pub returning: Vec<mz_expr::MirScalarExpr>,
1189}
1190
1191#[derive(Debug)]
1193pub struct AlterNoopPlan {
1194 pub object_type: ObjectType,
1195}
1196
1197#[derive(Debug)]
1198pub struct AlterSetClusterPlan {
1199 pub id: CatalogItemId,
1200 pub set_cluster: ClusterId,
1201}
1202
1203#[derive(Debug)]
1204pub struct AlterRetainHistoryPlan {
1205 pub id: CatalogItemId,
1206 pub value: Option<Value>,
1207 pub window: CompactionWindow,
1208 pub object_type: ObjectType,
1209}
1210
1211#[derive(Debug)]
1212pub struct AlterSourceTimestampIntervalPlan {
1213 pub id: CatalogItemId,
1214 pub value: Option<Value>,
1215 pub interval: Duration,
1216}
1217
1218#[derive(Debug, Clone)]
1219
1220pub enum AlterOptionParameter<T = String> {
1221 Set(T),
1222 Reset,
1223 Unchanged,
1224}
1225
1226#[derive(Debug)]
1227pub enum AlterConnectionAction {
1228 RotateKeys,
1229 AlterOptions {
1230 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1231 drop_options: BTreeSet<ConnectionOptionName>,
1232 validate: bool,
1233 },
1234}
1235
1236#[derive(Debug)]
1237pub struct AlterConnectionPlan {
1238 pub id: CatalogItemId,
1239 pub action: AlterConnectionAction,
1240}
1241
1242#[derive(Debug)]
1243pub enum AlterSourceAction {
1244 AddSubsourceExports {
1245 subsources: Vec<CreateSourcePlanBundle>,
1246 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1247 },
1248 RefreshReferences {
1249 references: SourceReferences,
1250 },
1251}
1252
1253#[derive(Debug)]
1254pub struct AlterSourcePlan {
1255 pub item_id: CatalogItemId,
1256 pub ingestion_id: GlobalId,
1257 pub action: AlterSourceAction,
1258}
1259
1260#[derive(Debug, Clone)]
1261pub struct AlterSinkPlan {
1262 pub item_id: CatalogItemId,
1263 pub global_id: GlobalId,
1264 pub sink: Sink,
1265 pub with_snapshot: bool,
1266 pub in_cluster: ClusterId,
1267}
1268
1269#[derive(Debug, Clone)]
1270pub struct AlterClusterPlan {
1271 pub id: ClusterId,
1272 pub name: String,
1273 pub options: PlanClusterOption,
1274 pub strategy: AlterClusterPlanStrategy,
1275}
1276
1277#[derive(Debug)]
1278pub struct AlterClusterRenamePlan {
1279 pub id: ClusterId,
1280 pub name: String,
1281 pub to_name: String,
1282}
1283
1284#[derive(Debug)]
1285pub struct AlterClusterReplicaRenamePlan {
1286 pub cluster_id: ClusterId,
1287 pub replica_id: ReplicaId,
1288 pub name: QualifiedReplica,
1289 pub to_name: String,
1290}
1291
1292#[derive(Debug)]
1293pub struct AlterItemRenamePlan {
1294 pub id: CatalogItemId,
1295 pub current_full_name: FullItemName,
1296 pub to_name: String,
1297 pub object_type: ObjectType,
1298}
1299
1300#[derive(Debug)]
1301pub struct AlterSchemaRenamePlan {
1302 pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1303 pub new_schema_name: String,
1304}
1305
1306#[derive(Debug)]
1307pub struct AlterSchemaSwapPlan {
1308 pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1309 pub schema_a_name: String,
1310 pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1311 pub schema_b_name: String,
1312 pub name_temp: String,
1313}
1314
1315#[derive(Debug)]
1316pub struct AlterClusterSwapPlan {
1317 pub id_a: ClusterId,
1318 pub id_b: ClusterId,
1319 pub name_a: String,
1320 pub name_b: String,
1321 pub name_temp: String,
1322}
1323
1324#[derive(Debug)]
1325pub struct AlterSecretPlan {
1326 pub id: CatalogItemId,
1327 pub secret_as: MirScalarExpr,
1328}
1329
1330#[derive(Debug)]
1331pub struct AlterSystemSetPlan {
1332 pub name: String,
1333 pub value: VariableValue,
1334}
1335
1336#[derive(Debug)]
1337pub struct AlterSystemResetPlan {
1338 pub name: String,
1339}
1340
1341#[derive(Debug)]
1342pub struct AlterSystemResetAllPlan {}
1343
1344#[derive(Debug)]
1345pub struct AlterRolePlan {
1346 pub id: RoleId,
1347 pub name: String,
1348 pub option: PlannedAlterRoleOption,
1349}
1350
1351#[derive(Debug)]
1352pub struct AlterOwnerPlan {
1353 pub id: ObjectId,
1354 pub object_type: ObjectType,
1355 pub new_owner: RoleId,
1356}
1357
1358#[derive(Debug)]
1359pub struct AlterTablePlan {
1360 pub relation_id: CatalogItemId,
1361 pub column_name: ColumnName,
1362 pub column_type: SqlColumnType,
1363 pub raw_sql_type: RawDataType,
1364}
1365
1366#[derive(Debug, Clone)]
1367pub struct AlterMaterializedViewApplyReplacementPlan {
1368 pub id: CatalogItemId,
1369 pub replacement_id: CatalogItemId,
1370}
1371
1372#[derive(Debug)]
1373pub struct DeclarePlan {
1374 pub name: String,
1375 pub stmt: Statement<Raw>,
1376 pub sql: String,
1377 pub params: Params,
1378}
1379
1380#[derive(Debug)]
1381pub struct FetchPlan {
1382 pub name: String,
1383 pub count: Option<FetchDirection>,
1384 pub timeout: ExecuteTimeout,
1385}
1386
1387#[derive(Debug)]
1388pub struct ClosePlan {
1389 pub name: String,
1390}
1391
1392#[derive(Debug)]
1393pub struct PreparePlan {
1394 pub name: String,
1395 pub stmt: Statement<Raw>,
1396 pub sql: String,
1397 pub desc: StatementDesc,
1398}
1399
1400#[derive(Debug)]
1401pub struct ExecutePlan {
1402 pub name: String,
1403 pub params: Params,
1404}
1405
1406#[derive(Debug)]
1407pub struct DeallocatePlan {
1408 pub name: Option<String>,
1409}
1410
1411#[derive(Debug)]
1412pub struct RaisePlan {
1413 pub severity: NoticeSeverity,
1414}
1415
1416#[derive(Debug)]
1417pub struct GrantRolePlan {
1418 pub role_ids: Vec<RoleId>,
1420 pub member_ids: Vec<RoleId>,
1422 pub grantor_id: RoleId,
1424}
1425
1426#[derive(Debug)]
1427pub struct RevokeRolePlan {
1428 pub role_ids: Vec<RoleId>,
1430 pub member_ids: Vec<RoleId>,
1432 pub grantor_id: RoleId,
1434}
1435
1436#[derive(Debug)]
1437pub struct UpdatePrivilege {
1438 pub acl_mode: AclMode,
1440 pub target_id: SystemObjectId,
1442 pub grantor: RoleId,
1444}
1445
1446#[derive(Debug)]
1447pub struct GrantPrivilegesPlan {
1448 pub update_privileges: Vec<UpdatePrivilege>,
1450 pub grantees: Vec<RoleId>,
1452}
1453
1454#[derive(Debug)]
1455pub struct RevokePrivilegesPlan {
1456 pub update_privileges: Vec<UpdatePrivilege>,
1458 pub revokees: Vec<RoleId>,
1460}
1461#[derive(Debug)]
1462pub struct AlterDefaultPrivilegesPlan {
1463 pub privilege_objects: Vec<DefaultPrivilegeObject>,
1465 pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1467 pub is_grant: bool,
1469}
1470
1471#[derive(Debug)]
1472pub struct ReassignOwnedPlan {
1473 pub old_roles: Vec<RoleId>,
1475 pub new_role: RoleId,
1477 pub reassign_ids: Vec<ObjectId>,
1479}
1480
1481#[derive(Debug)]
1482pub struct CommentPlan {
1483 pub object_id: CommentObjectId,
1485 pub sub_component: Option<usize>,
1489 pub comment: Option<String>,
1491}
1492
1493#[derive(Clone, Debug)]
1494pub enum TableDataSource {
1495 TableWrites { defaults: Vec<Expr<Aug>> },
1497
1498 DataSource {
1501 desc: DataSourceDesc,
1502 timeline: Timeline,
1503 },
1504}
1505
1506#[derive(Clone, Debug)]
1507pub struct Table {
1508 pub create_sql: String,
1509 pub desc: VersionedRelationDesc,
1510 pub temporary: bool,
1511 pub compaction_window: Option<CompactionWindow>,
1512 pub data_source: TableDataSource,
1513}
1514
1515#[derive(Clone, Debug)]
1516pub struct Source {
1517 pub create_sql: String,
1518 pub data_source: DataSourceDesc,
1519 pub desc: RelationDesc,
1520 pub compaction_window: Option<CompactionWindow>,
1521}
1522
1523#[derive(Debug, Clone)]
1524pub enum DataSourceDesc {
1525 Ingestion(SourceDesc<ReferencedConnection>),
1527 OldSyntaxIngestion {
1529 desc: SourceDesc<ReferencedConnection>,
1530 progress_subsource: CatalogItemId,
1533 data_config: SourceExportDataConfig<ReferencedConnection>,
1534 details: SourceExportDetails,
1535 },
1536 IngestionExport {
1539 ingestion_id: CatalogItemId,
1540 external_reference: UnresolvedItemName,
1541 details: SourceExportDetails,
1542 data_config: SourceExportDataConfig<ReferencedConnection>,
1543 },
1544 Progress,
1546 Webhook {
1548 validate_using: Option<WebhookValidation>,
1549 body_format: WebhookBodyFormat,
1550 headers: WebhookHeaders,
1551 cluster_id: Option<StorageInstanceId>,
1553 },
1554}
1555
1556#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1557pub struct WebhookValidation {
1558 pub expression: MirScalarExpr,
1560 pub relation_desc: RelationDesc,
1562 pub bodies: Vec<(usize, bool)>,
1564 pub headers: Vec<(usize, bool)>,
1566 pub secrets: Vec<WebhookValidationSecret>,
1568}
1569
1570impl WebhookValidation {
1571 const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1572
1573 pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1578 let WebhookValidation {
1579 expression,
1580 relation_desc,
1581 ..
1582 } = self;
1583
1584 let mut expression_ = expression.clone();
1586 let desc_ = relation_desc.clone();
1587 let reduce_task = mz_ore::task::spawn_blocking(
1588 || "webhook-validation-reduce",
1589 move || {
1590 let repr_col_types: Vec<ReprColumnType> = desc_
1591 .typ()
1592 .column_types
1593 .iter()
1594 .map(ReprColumnType::from)
1595 .collect();
1596 expression_.reduce(&repr_col_types);
1597 expression_
1598 },
1599 );
1600
1601 match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1602 Ok(reduced_expr) => {
1603 *expression = reduced_expr;
1604 Ok(())
1605 }
1606 Err(_) => Err("timeout"),
1607 }
1608 }
1609}
1610
1611#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1612pub struct WebhookHeaders {
1613 pub header_column: Option<WebhookHeaderFilters>,
1615 pub mapped_headers: BTreeMap<usize, (String, bool)>,
1617}
1618
1619impl WebhookHeaders {
1620 pub fn num_columns(&self) -> usize {
1622 let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1623 let mapped_headers = self.mapped_headers.len();
1624
1625 header_column + mapped_headers
1626 }
1627}
1628
1629#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1630pub struct WebhookHeaderFilters {
1631 pub block: BTreeSet<String>,
1632 pub allow: BTreeSet<String>,
1633}
1634
1635#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Arbitrary)]
1636pub enum WebhookBodyFormat {
1637 Json { array: bool },
1638 Bytes,
1639 Text,
1640}
1641
1642impl From<WebhookBodyFormat> for SqlScalarType {
1643 fn from(value: WebhookBodyFormat) -> Self {
1644 match value {
1645 WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1646 WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1647 WebhookBodyFormat::Text => SqlScalarType::String,
1648 }
1649 }
1650}
1651
1652#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1653pub struct WebhookValidationSecret {
1654 pub id: CatalogItemId,
1656 pub column_idx: usize,
1658 pub use_bytes: bool,
1660}
1661
1662#[derive(Clone, Debug)]
1663pub struct Connection {
1664 pub create_sql: String,
1665 pub details: ConnectionDetails,
1666}
1667
1668#[derive(Clone, Debug, Serialize)]
1669pub enum ConnectionDetails {
1670 Kafka(KafkaConnection<ReferencedConnection>),
1671 Csr(CsrConnection<ReferencedConnection>),
1672 GlueSchemaRegistry(GlueSchemaRegistryConnection<ReferencedConnection>),
1673 Postgres(PostgresConnection<ReferencedConnection>),
1674 Ssh {
1675 connection: SshConnection,
1676 key_1: SshKey,
1677 key_2: SshKey,
1678 },
1679 Aws(AwsConnection),
1680 AwsPrivatelink(AwsPrivatelinkConnection),
1681 MySql(MySqlConnection<ReferencedConnection>),
1682 SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1683 IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1684}
1685
1686impl ConnectionDetails {
1687 pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1688 match self {
1689 ConnectionDetails::Kafka(c) => {
1690 mz_storage_types::connections::Connection::Kafka(c.clone())
1691 }
1692 ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1693 ConnectionDetails::GlueSchemaRegistry(c) => {
1694 mz_storage_types::connections::Connection::GlueSchemaRegistry(c.clone())
1695 }
1696 ConnectionDetails::Postgres(c) => {
1697 mz_storage_types::connections::Connection::Postgres(c.clone())
1698 }
1699 ConnectionDetails::Ssh { connection, .. } => {
1700 mz_storage_types::connections::Connection::Ssh(connection.clone())
1701 }
1702 ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1703 ConnectionDetails::AwsPrivatelink(c) => {
1704 mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1705 }
1706 ConnectionDetails::MySql(c) => {
1707 mz_storage_types::connections::Connection::MySql(c.clone())
1708 }
1709 ConnectionDetails::SqlServer(c) => {
1710 mz_storage_types::connections::Connection::SqlServer(c.clone())
1711 }
1712 ConnectionDetails::IcebergCatalog(c) => {
1713 mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1714 }
1715 }
1716 }
1717}
1718
1719#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1720pub struct NetworkPolicyRule {
1721 pub name: String,
1722 pub action: NetworkPolicyRuleAction,
1723 pub address: PolicyAddress,
1724 pub direction: NetworkPolicyRuleDirection,
1725}
1726
1727#[derive(
1728 Debug,
1729 Clone,
1730 Serialize,
1731 Deserialize,
1732 PartialEq,
1733 Eq,
1734 Ord,
1735 PartialOrd,
1736 Hash
1737)]
1738pub enum NetworkPolicyRuleAction {
1739 Allow,
1740}
1741
1742impl std::fmt::Display for NetworkPolicyRuleAction {
1743 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1744 match self {
1745 Self::Allow => write!(f, "allow"),
1746 }
1747 }
1748}
1749impl TryFrom<&str> for NetworkPolicyRuleAction {
1750 type Error = PlanError;
1751 fn try_from(value: &str) -> Result<Self, Self::Error> {
1752 match value.to_uppercase().as_str() {
1753 "ALLOW" => Ok(Self::Allow),
1754 _ => Err(PlanError::Unstructured(
1755 "Allow is the only valid option".into(),
1756 )),
1757 }
1758 }
1759}
1760
1761#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1762pub enum NetworkPolicyRuleDirection {
1763 Ingress,
1764}
1765impl std::fmt::Display for NetworkPolicyRuleDirection {
1766 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1767 match self {
1768 Self::Ingress => write!(f, "ingress"),
1769 }
1770 }
1771}
1772impl TryFrom<&str> for NetworkPolicyRuleDirection {
1773 type Error = PlanError;
1774 fn try_from(value: &str) -> Result<Self, Self::Error> {
1775 match value.to_uppercase().as_str() {
1776 "INGRESS" => Ok(Self::Ingress),
1777 _ => Err(PlanError::Unstructured(
1778 "Ingress is the only valid option".into(),
1779 )),
1780 }
1781 }
1782}
1783
1784#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1785pub struct PolicyAddress(pub IpNet);
1786impl std::fmt::Display for PolicyAddress {
1787 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1788 write!(f, "{}", &self.0.to_string())
1789 }
1790}
1791impl From<String> for PolicyAddress {
1792 fn from(value: String) -> Self {
1793 Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1794 }
1795}
1796impl TryFrom<&str> for PolicyAddress {
1797 type Error = PlanError;
1798 fn try_from(value: &str) -> Result<Self, Self::Error> {
1799 let net = IpNet::from_str(value)
1800 .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1801 Ok(Self(net))
1802 }
1803}
1804
1805impl Serialize for PolicyAddress {
1806 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1807 where
1808 S: serde::Serializer,
1809 {
1810 serializer.serialize_str(&format!("{}", &self.0))
1811 }
1812}
1813
1814#[derive(Clone, Debug, Serialize)]
1815pub enum SshKey {
1816 PublicOnly(String),
1817 Both(SshKeyPair),
1818}
1819
1820impl SshKey {
1821 pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1822 match self {
1823 SshKey::PublicOnly(_) => None,
1824 SshKey::Both(key_pair) => Some(key_pair),
1825 }
1826 }
1827
1828 pub fn public_key(&self) -> String {
1829 match self {
1830 SshKey::PublicOnly(s) => s.into(),
1831 SshKey::Both(p) => p.ssh_public_key(),
1832 }
1833 }
1834}
1835
1836#[derive(Clone, Debug)]
1837pub struct Secret {
1838 pub create_sql: String,
1839 pub secret_as: MirScalarExpr,
1840}
1841
1842#[derive(Clone, Debug)]
1843pub struct Sink {
1844 pub create_sql: String,
1846 pub from: GlobalId,
1848 pub connection: StorageSinkConnection<ReferencedConnection>,
1850 pub envelope: SinkEnvelope,
1852 pub version: u64,
1853 pub commit_interval: Option<Duration>,
1854}
1855
1856#[derive(Clone, Debug)]
1857pub struct View {
1858 pub create_sql: String,
1860 pub expr: HirRelationExpr,
1862 pub dependencies: DependencyIds,
1864 pub column_names: Vec<ColumnName>,
1866 pub temporary: bool,
1868}
1869
1870#[derive(Clone, Debug)]
1871pub struct MaterializedView {
1872 pub create_sql: String,
1874 pub expr: HirRelationExpr,
1876 pub dependencies: DependencyIds,
1878 pub column_names: Vec<ColumnName>,
1880 pub replacement_target: Option<CatalogItemId>,
1881 pub cluster_id: ClusterId,
1883 pub target_replica: Option<ReplicaId>,
1885 pub non_null_assertions: Vec<usize>,
1886 pub compaction_window: Option<CompactionWindow>,
1887 pub refresh_schedule: Option<RefreshSchedule>,
1888 pub as_of: Option<Timestamp>,
1889}
1890
1891#[derive(Clone, Debug)]
1892pub struct Index {
1893 pub create_sql: String,
1895 pub on: GlobalId,
1897 pub keys: Vec<mz_expr::MirScalarExpr>,
1898 pub compaction_window: Option<CompactionWindow>,
1899 pub cluster_id: ClusterId,
1900}
1901
1902#[derive(Clone, Debug)]
1903pub struct Type {
1904 pub create_sql: String,
1905 pub inner: CatalogType<IdReference>,
1906}
1907
1908#[derive(Deserialize, Clone, Debug, PartialEq)]
1910pub enum QueryWhen {
1911 Immediately,
1914 FreshestTableWrite,
1917 AtTimestamp(Timestamp),
1922 AtLeastTimestamp(Timestamp),
1925}
1926
1927impl QueryWhen {
1928 pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1930 match self {
1931 QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1932 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1933 }
1934 }
1935 pub fn constrains_upper(&self) -> bool {
1939 match self {
1940 QueryWhen::AtTimestamp(_) => true,
1941 QueryWhen::AtLeastTimestamp(_)
1942 | QueryWhen::Immediately
1943 | QueryWhen::FreshestTableWrite => false,
1944 }
1945 }
1946 pub fn advance_to_since(&self) -> bool {
1948 match self {
1949 QueryWhen::Immediately
1950 | QueryWhen::AtLeastTimestamp(_)
1951 | QueryWhen::FreshestTableWrite => true,
1952 QueryWhen::AtTimestamp(_) => false,
1953 }
1954 }
1955 pub fn can_advance_to_upper(&self) -> bool {
1957 match self {
1958 QueryWhen::Immediately => true,
1959 QueryWhen::FreshestTableWrite
1960 | QueryWhen::AtTimestamp(_)
1961 | QueryWhen::AtLeastTimestamp(_) => false,
1962 }
1963 }
1964
1965 pub fn can_advance_to_timeline_ts(&self) -> bool {
1967 match self {
1968 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1969 QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1970 }
1971 }
1972 pub fn must_advance_to_timeline_ts(&self) -> bool {
1974 match self {
1975 QueryWhen::FreshestTableWrite => true,
1976 QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1977 false
1978 }
1979 }
1980 }
1981 pub fn is_transactional(&self) -> bool {
1983 match self {
1984 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1985 QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1986 }
1987 }
1988}
1989
1990#[derive(Debug, Copy, Clone)]
1991pub enum MutationKind {
1992 Insert,
1993 Update,
1994 Delete,
1995}
1996
1997#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1998pub enum CopyFormat {
1999 Text,
2000 Csv,
2001 Binary,
2002 Parquet,
2003}
2004
2005#[derive(Debug, Copy, Clone)]
2006pub enum ExecuteTimeout {
2007 None,
2008 Seconds(f64),
2009 WaitOnce,
2010}
2011
2012#[derive(Clone, Debug)]
2013pub enum IndexOption {
2014 RetainHistory(CompactionWindow),
2016}
2017
2018#[derive(Clone, Debug)]
2019pub enum TableOption {
2020 RetainHistory(CompactionWindow),
2022}
2023
2024#[derive(Clone, Debug)]
2025pub struct PlanClusterOption {
2026 pub availability_zones: AlterOptionParameter<Vec<String>>,
2027 pub introspection_debugging: AlterOptionParameter<bool>,
2028 pub introspection_interval: AlterOptionParameter<OptionalDuration>,
2029 pub managed: AlterOptionParameter<bool>,
2030 pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
2031 pub replication_factor: AlterOptionParameter<u32>,
2032 pub size: AlterOptionParameter,
2033 pub schedule: AlterOptionParameter<ClusterSchedule>,
2034 pub workload_class: AlterOptionParameter<Option<String>>,
2035}
2036
2037impl Default for PlanClusterOption {
2038 fn default() -> Self {
2039 Self {
2040 availability_zones: AlterOptionParameter::Unchanged,
2041 introspection_debugging: AlterOptionParameter::Unchanged,
2042 introspection_interval: AlterOptionParameter::Unchanged,
2043 managed: AlterOptionParameter::Unchanged,
2044 replicas: AlterOptionParameter::Unchanged,
2045 replication_factor: AlterOptionParameter::Unchanged,
2046 size: AlterOptionParameter::Unchanged,
2047 schedule: AlterOptionParameter::Unchanged,
2048 workload_class: AlterOptionParameter::Unchanged,
2049 }
2050 }
2051}
2052
2053#[derive(Clone, Debug, PartialEq, Eq)]
2054pub enum AlterClusterPlanStrategy {
2055 None,
2056 For(Duration),
2057 UntilReady {
2058 on_timeout: OnTimeoutAction,
2059 timeout: Duration,
2060 },
2061}
2062
2063#[derive(Clone, Debug, PartialEq, Eq)]
2064pub enum OnTimeoutAction {
2065 Commit,
2066 Rollback,
2067}
2068
2069impl Default for OnTimeoutAction {
2070 fn default() -> Self {
2071 Self::Commit
2072 }
2073}
2074
2075impl TryFrom<&str> for OnTimeoutAction {
2076 type Error = PlanError;
2077 fn try_from(value: &str) -> Result<Self, Self::Error> {
2078 match value.to_uppercase().as_str() {
2079 "COMMIT" => Ok(Self::Commit),
2080 "ROLLBACK" => Ok(Self::Rollback),
2081 _ => Err(PlanError::Unstructured(
2082 "Valid options are COMMIT, ROLLBACK".into(),
2083 )),
2084 }
2085 }
2086}
2087
2088impl AlterClusterPlanStrategy {
2089 pub fn is_none(&self) -> bool {
2090 matches!(self, Self::None)
2091 }
2092 pub fn is_some(&self) -> bool {
2093 !matches!(self, Self::None)
2094 }
2095}
2096
2097impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2098 type Error = PlanError;
2099
2100 fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2101 Ok(match value.wait {
2102 Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2103 Some(ClusterAlterOptionValue::UntilReady(options)) => {
2104 let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2105 Self::UntilReady {
2106 timeout: match extracted.timeout {
2107 Some(d) => d,
2108 None => Err(PlanError::UntilReadyTimeoutRequired)?,
2109 },
2110 on_timeout: match extracted.on_timeout {
2111 Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2112 PlanError::InvalidOptionValue {
2113 option_name: "ON TIMEOUT".into(),
2114 err: Box::new(e),
2115 }
2116 })?,
2117 None => OnTimeoutAction::default(),
2118 },
2119 }
2120 }
2121 None => Self::None,
2122 })
2123 }
2124}
2125
2126#[derive(Debug, Clone)]
2128pub struct Params {
2129 pub datums: Row,
2131 pub execute_types: Vec<SqlScalarType>,
2133 pub expected_types: Vec<SqlScalarType>,
2135}
2136
2137impl Params {
2138 pub fn empty() -> Params {
2140 Params {
2141 datums: Row::pack_slice(&[]),
2142 execute_types: vec![],
2143 expected_types: vec![],
2144 }
2145 }
2146}
2147
2148#[derive(
2150 Ord,
2151 PartialOrd,
2152 Clone,
2153 Debug,
2154 Eq,
2155 PartialEq,
2156 Serialize,
2157 Deserialize,
2158 Hash,
2159 Copy
2160)]
2161pub struct PlanContext {
2162 pub wall_time: DateTime<Utc>,
2163 pub ignore_if_exists_errors: bool,
2164}
2165
2166impl PlanContext {
2167 pub fn new(wall_time: DateTime<Utc>) -> Self {
2168 Self {
2169 wall_time,
2170 ignore_if_exists_errors: false,
2171 }
2172 }
2173
2174 pub fn zero() -> Self {
2178 PlanContext {
2179 wall_time: now::to_datetime(NOW_ZERO()),
2180 ignore_if_exists_errors: false,
2181 }
2182 }
2183
2184 pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2185 self.ignore_if_exists_errors = value;
2186 self
2187 }
2188}