1use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{CollectionPlan, ColumnOrder, MapFilterProject, MirScalarExpr, RowSetFinishing};
41use mz_ore::now::{self, NOW_ZERO};
42use mz_pgcopy::CopyFormatParams;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
44use mz_repr::explain::{ExplainConfig, ExplainFormat};
45use mz_repr::network_policy_id::NetworkPolicyId;
46use mz_repr::optimize::OptimizerFeatureOverrides;
47use mz_repr::refresh_schedule::RefreshSchedule;
48use mz_repr::role_id::RoleId;
49use mz_repr::{
50 CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, ReprColumnType, Row,
51 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
52};
53use mz_sql_parser::ast::{
54 AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
55 RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
56 Value, WithOptionValue,
57};
58use mz_ssh_util::keys::SshKeyPair;
59use mz_storage_types::connections::aws::AwsConnection;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::connections::{
62 AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
63 MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
64};
65use mz_storage_types::instances::StorageInstanceId;
66use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
67use mz_storage_types::sources::{
68 SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
69};
70use proptest_derive::Arbitrary;
71use serde::{Deserialize, Serialize};
72
73use crate::ast::{
74 ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
75 TransactionAccessMode,
76};
77use crate::catalog::{
78 CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
79 RoleAttributesRaw,
80};
81use crate::names::{
82 Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
83 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
84};
85
86pub(crate) mod error;
87pub(crate) mod explain;
88pub(crate) mod hir;
89pub(crate) mod literal;
90pub(crate) mod lowering;
91pub(crate) mod notice;
92pub(crate) mod plan_utils;
93pub(crate) mod query;
94pub(crate) mod scope;
95pub(crate) mod side_effecting_func;
96pub(crate) mod statement;
97pub(crate) mod transform_ast;
98pub(crate) mod transform_hir;
99pub(crate) mod typeconv;
100pub(crate) mod with_options;
101
102use crate::plan;
103use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
104use crate::plan::with_options::OptionalDuration;
105pub use error::PlanError;
106pub use explain::normalize_subqueries;
107pub use hir::{
108 AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
109 WindowExprType,
110};
111pub use lowering::Config as HirToMirConfig;
112pub use notice::PlanNotice;
113pub use query::{ExprContext, QueryContext, QueryLifetime};
114pub use scope::Scope;
115pub use side_effecting_func::SideEffectingFunc;
116pub use statement::ddl::{
117 AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
118 PlannedAlterRoleOption, PlannedRoleAttributes, PlannedRoleVariable,
119 SqlServerConfigOptionExtracted,
120};
121pub use statement::{
122 StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
123 resolve_cluster_for_materialized_view,
124};
125
126use self::statement::ddl::ClusterAlterOptionExtracted;
127use self::with_options::TryFromValue;
128
129#[derive(Debug, EnumKind)]
131#[enum_kind(PlanKind)]
132pub enum Plan {
133 CreateConnection(CreateConnectionPlan),
134 CreateDatabase(CreateDatabasePlan),
135 CreateSchema(CreateSchemaPlan),
136 CreateRole(CreateRolePlan),
137 CreateCluster(CreateClusterPlan),
138 CreateClusterReplica(CreateClusterReplicaPlan),
139 CreateSource(CreateSourcePlan),
140 CreateSources(Vec<CreateSourcePlanBundle>),
141 CreateSecret(CreateSecretPlan),
142 CreateSink(CreateSinkPlan),
143 CreateTable(CreateTablePlan),
144 CreateView(CreateViewPlan),
145 CreateMaterializedView(CreateMaterializedViewPlan),
146 CreateNetworkPolicy(CreateNetworkPolicyPlan),
147 CreateIndex(CreateIndexPlan),
148 CreateType(CreateTypePlan),
149 Comment(CommentPlan),
150 DiscardTemp,
151 DiscardAll,
152 DropObjects(DropObjectsPlan),
153 DropOwned(DropOwnedPlan),
154 EmptyQuery,
155 ShowAllVariables,
156 ShowCreate(ShowCreatePlan),
157 ShowColumns(ShowColumnsPlan),
158 ShowVariable(ShowVariablePlan),
159 InspectShard(InspectShardPlan),
160 SetVariable(SetVariablePlan),
161 ResetVariable(ResetVariablePlan),
162 SetTransaction(SetTransactionPlan),
163 StartTransaction(StartTransactionPlan),
164 CommitTransaction(CommitTransactionPlan),
165 AbortTransaction(AbortTransactionPlan),
166 Select(SelectPlan),
167 Subscribe(SubscribePlan),
168 CopyFrom(CopyFromPlan),
169 CopyTo(CopyToPlan),
170 ExplainPlan(ExplainPlanPlan),
171 ExplainPushdown(ExplainPushdownPlan),
172 ExplainTimestamp(ExplainTimestampPlan),
173 ExplainSinkSchema(ExplainSinkSchemaPlan),
174 Insert(InsertPlan),
175 AlterCluster(AlterClusterPlan),
176 AlterClusterSwap(AlterClusterSwapPlan),
177 AlterNoop(AlterNoopPlan),
178 AlterSetCluster(AlterSetClusterPlan),
179 AlterConnection(AlterConnectionPlan),
180 AlterSource(AlterSourcePlan),
181 AlterClusterRename(AlterClusterRenamePlan),
182 AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
183 AlterItemRename(AlterItemRenamePlan),
184 AlterSchemaRename(AlterSchemaRenamePlan),
185 AlterSchemaSwap(AlterSchemaSwapPlan),
186 AlterSecret(AlterSecretPlan),
187 AlterSink(AlterSinkPlan),
188 AlterSystemSet(AlterSystemSetPlan),
189 AlterSystemReset(AlterSystemResetPlan),
190 AlterSystemResetAll(AlterSystemResetAllPlan),
191 AlterRole(AlterRolePlan),
192 AlterOwner(AlterOwnerPlan),
193 AlterTableAddColumn(AlterTablePlan),
194 AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementPlan),
195 AlterNetworkPolicy(AlterNetworkPolicyPlan),
196 Declare(DeclarePlan),
197 Fetch(FetchPlan),
198 Close(ClosePlan),
199 ReadThenWrite(ReadThenWritePlan),
200 Prepare(PreparePlan),
201 Execute(ExecutePlan),
202 Deallocate(DeallocatePlan),
203 Raise(RaisePlan),
204 GrantRole(GrantRolePlan),
205 RevokeRole(RevokeRolePlan),
206 GrantPrivileges(GrantPrivilegesPlan),
207 RevokePrivileges(RevokePrivilegesPlan),
208 AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
209 ReassignOwned(ReassignOwnedPlan),
210 SideEffectingFunc(SideEffectingFunc),
211 ValidateConnection(ValidateConnectionPlan),
212 AlterRetainHistory(AlterRetainHistoryPlan),
213 AlterSourceTimestampInterval(AlterSourceTimestampIntervalPlan),
214}
215
216impl Plan {
217 pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
220 match stmt {
221 StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
222 StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
223 StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
224 StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
225 StatementKind::AlterObjectRename => &[
226 PlanKind::AlterClusterRename,
227 PlanKind::AlterClusterReplicaRename,
228 PlanKind::AlterItemRename,
229 PlanKind::AlterSchemaRename,
230 PlanKind::AlterNoop,
231 ],
232 StatementKind::AlterObjectSwap => &[
233 PlanKind::AlterClusterSwap,
234 PlanKind::AlterSchemaSwap,
235 PlanKind::AlterNoop,
236 ],
237 StatementKind::AlterRole => &[PlanKind::AlterRole],
238 StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
239 StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
240 StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
241 StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
242 StatementKind::AlterSource => &[
243 PlanKind::AlterNoop,
244 PlanKind::AlterSource,
245 PlanKind::AlterRetainHistory,
246 PlanKind::AlterSourceTimestampInterval,
247 ],
248 StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
249 StatementKind::AlterSystemResetAll => {
250 &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
251 }
252 StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
253 StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
254 StatementKind::AlterTableAddColumn => {
255 &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
256 }
257 StatementKind::AlterMaterializedViewApplyReplacement => &[
258 PlanKind::AlterNoop,
259 PlanKind::AlterMaterializedViewApplyReplacement,
260 ],
261 StatementKind::Close => &[PlanKind::Close],
262 StatementKind::Comment => &[PlanKind::Comment],
263 StatementKind::Commit => &[PlanKind::CommitTransaction],
264 StatementKind::Copy => &[
265 PlanKind::CopyFrom,
266 PlanKind::Select,
267 PlanKind::Subscribe,
268 PlanKind::CopyTo,
269 ],
270 StatementKind::CreateCluster => &[PlanKind::CreateCluster],
271 StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
272 StatementKind::CreateConnection => &[PlanKind::CreateConnection],
273 StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
274 StatementKind::CreateIndex => &[PlanKind::CreateIndex],
275 StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
276 StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
277 StatementKind::CreateRole => &[PlanKind::CreateRole],
278 StatementKind::CreateSchema => &[PlanKind::CreateSchema],
279 StatementKind::CreateSecret => &[PlanKind::CreateSecret],
280 StatementKind::CreateSink => &[PlanKind::CreateSink],
281 StatementKind::CreateSource | StatementKind::CreateSubsource => {
282 &[PlanKind::CreateSource]
283 }
284 StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
285 StatementKind::CreateTable => &[PlanKind::CreateTable],
286 StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
287 StatementKind::CreateType => &[PlanKind::CreateType],
288 StatementKind::CreateView => &[PlanKind::CreateView],
289 StatementKind::Deallocate => &[PlanKind::Deallocate],
290 StatementKind::Declare => &[PlanKind::Declare],
291 StatementKind::Delete => &[PlanKind::ReadThenWrite],
292 StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
293 StatementKind::DropObjects => &[PlanKind::DropObjects],
294 StatementKind::DropOwned => &[PlanKind::DropOwned],
295 StatementKind::Execute => &[PlanKind::Execute],
296 StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
297 StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
298 StatementKind::ExplainAnalyzeObject => &[PlanKind::Select],
299 StatementKind::ExplainAnalyzeCluster => &[PlanKind::Select],
300 StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
301 StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
302 StatementKind::Fetch => &[PlanKind::Fetch],
303 StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
304 StatementKind::GrantRole => &[PlanKind::GrantRole],
305 StatementKind::Insert => &[PlanKind::Insert],
306 StatementKind::Prepare => &[PlanKind::Prepare],
307 StatementKind::Raise => &[PlanKind::Raise],
308 StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
309 StatementKind::ResetVariable => &[PlanKind::ResetVariable],
310 StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
311 StatementKind::RevokeRole => &[PlanKind::RevokeRole],
312 StatementKind::Rollback => &[PlanKind::AbortTransaction],
313 StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
314 StatementKind::SetTransaction => &[PlanKind::SetTransaction],
315 StatementKind::SetVariable => &[PlanKind::SetVariable],
316 StatementKind::Show => &[
317 PlanKind::Select,
318 PlanKind::ShowVariable,
319 PlanKind::ShowCreate,
320 PlanKind::ShowColumns,
321 PlanKind::ShowAllVariables,
322 PlanKind::InspectShard,
323 ],
324 StatementKind::StartTransaction => &[PlanKind::StartTransaction],
325 StatementKind::Subscribe => &[PlanKind::Subscribe],
326 StatementKind::Update => &[PlanKind::ReadThenWrite],
327 StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
328 StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
329 StatementKind::ExecuteUnitTest => &[],
330 }
331 }
332
333 pub fn name(&self) -> &str {
335 match self {
336 Plan::CreateConnection(_) => "create connection",
337 Plan::CreateDatabase(_) => "create database",
338 Plan::CreateSchema(_) => "create schema",
339 Plan::CreateRole(_) => "create role",
340 Plan::CreateCluster(_) => "create cluster",
341 Plan::CreateClusterReplica(_) => "create cluster replica",
342 Plan::CreateSource(_) => "create source",
343 Plan::CreateSources(_) => "create source",
344 Plan::CreateSecret(_) => "create secret",
345 Plan::CreateSink(_) => "create sink",
346 Plan::CreateTable(_) => "create table",
347 Plan::CreateView(_) => "create view",
348 Plan::CreateMaterializedView(_) => "create materialized view",
349 Plan::CreateIndex(_) => "create index",
350 Plan::CreateType(_) => "create type",
351 Plan::CreateNetworkPolicy(_) => "create network policy",
352 Plan::Comment(_) => "comment",
353 Plan::DiscardTemp => "discard temp",
354 Plan::DiscardAll => "discard all",
355 Plan::DropObjects(plan) => match plan.object_type {
356 ObjectType::Table => "drop table",
357 ObjectType::View => "drop view",
358 ObjectType::MaterializedView => "drop materialized view",
359 ObjectType::Source => "drop source",
360 ObjectType::Sink => "drop sink",
361 ObjectType::Index => "drop index",
362 ObjectType::Type => "drop type",
363 ObjectType::Role => "drop roles",
364 ObjectType::Cluster => "drop clusters",
365 ObjectType::ClusterReplica => "drop cluster replicas",
366 ObjectType::Secret => "drop secret",
367 ObjectType::Connection => "drop connection",
368 ObjectType::Database => "drop database",
369 ObjectType::Schema => "drop schema",
370 ObjectType::Func => "drop function",
371 ObjectType::NetworkPolicy => "drop network policy",
372 },
373 Plan::DropOwned(_) => "drop owned",
374 Plan::EmptyQuery => "do nothing",
375 Plan::ShowAllVariables => "show all variables",
376 Plan::ShowCreate(_) => "show create",
377 Plan::ShowColumns(_) => "show columns",
378 Plan::ShowVariable(_) => "show variable",
379 Plan::InspectShard(_) => "inspect shard",
380 Plan::SetVariable(_) => "set variable",
381 Plan::ResetVariable(_) => "reset variable",
382 Plan::SetTransaction(_) => "set transaction",
383 Plan::StartTransaction(_) => "start transaction",
384 Plan::CommitTransaction(_) => "commit",
385 Plan::AbortTransaction(_) => "abort",
386 Plan::Select(_) => "select",
387 Plan::Subscribe(_) => "subscribe",
388 Plan::CopyFrom(_) => "copy from",
389 Plan::CopyTo(_) => "copy to",
390 Plan::ExplainPlan(_) => "explain plan",
391 Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
392 Plan::ExplainTimestamp(_) => "explain timestamp",
393 Plan::ExplainSinkSchema(_) => "explain schema",
394 Plan::Insert(_) => "insert",
395 Plan::AlterNoop(plan) => match plan.object_type {
396 ObjectType::Table => "alter table",
397 ObjectType::View => "alter view",
398 ObjectType::MaterializedView => "alter materialized view",
399 ObjectType::Source => "alter source",
400 ObjectType::Sink => "alter sink",
401 ObjectType::Index => "alter index",
402 ObjectType::Type => "alter type",
403 ObjectType::Role => "alter role",
404 ObjectType::Cluster => "alter cluster",
405 ObjectType::ClusterReplica => "alter cluster replica",
406 ObjectType::Secret => "alter secret",
407 ObjectType::Connection => "alter connection",
408 ObjectType::Database => "alter database",
409 ObjectType::Schema => "alter schema",
410 ObjectType::Func => "alter function",
411 ObjectType::NetworkPolicy => "alter network policy",
412 },
413 Plan::AlterCluster(_) => "alter cluster",
414 Plan::AlterClusterRename(_) => "alter cluster rename",
415 Plan::AlterClusterSwap(_) => "alter cluster swap",
416 Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
417 Plan::AlterSetCluster(_) => "alter set cluster",
418 Plan::AlterConnection(_) => "alter connection",
419 Plan::AlterSource(_) => "alter source",
420 Plan::AlterItemRename(_) => "rename item",
421 Plan::AlterSchemaRename(_) => "alter rename schema",
422 Plan::AlterSchemaSwap(_) => "alter swap schema",
423 Plan::AlterSecret(_) => "alter secret",
424 Plan::AlterSink(_) => "alter sink",
425 Plan::AlterSystemSet(_) => "alter system",
426 Plan::AlterSystemReset(_) => "alter system",
427 Plan::AlterSystemResetAll(_) => "alter system",
428 Plan::AlterRole(_) => "alter role",
429 Plan::AlterNetworkPolicy(_) => "alter network policy",
430 Plan::AlterOwner(plan) => match plan.object_type {
431 ObjectType::Table => "alter table owner",
432 ObjectType::View => "alter view owner",
433 ObjectType::MaterializedView => "alter materialized view owner",
434 ObjectType::Source => "alter source owner",
435 ObjectType::Sink => "alter sink owner",
436 ObjectType::Index => "alter index owner",
437 ObjectType::Type => "alter type owner",
438 ObjectType::Role => "alter role owner",
439 ObjectType::Cluster => "alter cluster owner",
440 ObjectType::ClusterReplica => "alter cluster replica owner",
441 ObjectType::Secret => "alter secret owner",
442 ObjectType::Connection => "alter connection owner",
443 ObjectType::Database => "alter database owner",
444 ObjectType::Schema => "alter schema owner",
445 ObjectType::Func => "alter function owner",
446 ObjectType::NetworkPolicy => "alter network policy owner",
447 },
448 Plan::AlterTableAddColumn(_) => "alter table add column",
449 Plan::AlterMaterializedViewApplyReplacement(_) => {
450 "alter materialized view apply replacement"
451 }
452 Plan::Declare(_) => "declare",
453 Plan::Fetch(_) => "fetch",
454 Plan::Close(_) => "close",
455 Plan::ReadThenWrite(plan) => match plan.kind {
456 MutationKind::Insert => "insert into select",
457 MutationKind::Update => "update",
458 MutationKind::Delete => "delete",
459 },
460 Plan::Prepare(_) => "prepare",
461 Plan::Execute(_) => "execute",
462 Plan::Deallocate(_) => "deallocate",
463 Plan::Raise(_) => "raise",
464 Plan::GrantRole(_) => "grant role",
465 Plan::RevokeRole(_) => "revoke role",
466 Plan::GrantPrivileges(_) => "grant privilege",
467 Plan::RevokePrivileges(_) => "revoke privilege",
468 Plan::AlterDefaultPrivileges(_) => "alter default privileges",
469 Plan::ReassignOwned(_) => "reassign owned",
470 Plan::SideEffectingFunc(_) => "side effecting func",
471 Plan::ValidateConnection(_) => "validate connection",
472 Plan::AlterRetainHistory(_) => "alter retain history",
473 Plan::AlterSourceTimestampInterval(_) => "alter source timestamp interval",
474 }
475 }
476
477 pub fn allowed_in_read_only(&self) -> bool {
483 match self {
484 Plan::SetVariable(_) => true,
487 Plan::ResetVariable(_) => true,
488 Plan::SetTransaction(_) => true,
489 Plan::StartTransaction(_) => true,
490 Plan::CommitTransaction(_) => true,
491 Plan::AbortTransaction(_) => true,
492 Plan::Select(_) => true,
493 Plan::EmptyQuery => true,
494 Plan::ShowAllVariables => true,
495 Plan::ShowCreate(_) => true,
496 Plan::ShowColumns(_) => true,
497 Plan::ShowVariable(_) => true,
498 Plan::InspectShard(_) => true,
499 Plan::Subscribe(_) => true,
500 Plan::CopyTo(_) => true,
501 Plan::ExplainPlan(_) => true,
502 Plan::ExplainPushdown(_) => true,
503 Plan::ExplainTimestamp(_) => true,
504 Plan::ExplainSinkSchema(_) => true,
505 Plan::ValidateConnection(_) => true,
506 _ => false,
507 }
508 }
509}
510
511#[derive(Debug)]
512pub struct StartTransactionPlan {
513 pub access: Option<TransactionAccessMode>,
514 pub isolation_level: Option<TransactionIsolationLevel>,
515}
516
517#[derive(Debug)]
518pub enum TransactionType {
519 Explicit,
520 Implicit,
521}
522
523impl TransactionType {
524 pub fn is_explicit(&self) -> bool {
525 matches!(self, TransactionType::Explicit)
526 }
527
528 pub fn is_implicit(&self) -> bool {
529 matches!(self, TransactionType::Implicit)
530 }
531}
532
533#[derive(Debug)]
534pub struct CommitTransactionPlan {
535 pub transaction_type: TransactionType,
536}
537
538#[derive(Debug)]
539pub struct AbortTransactionPlan {
540 pub transaction_type: TransactionType,
541}
542
543#[derive(Debug)]
544pub struct CreateDatabasePlan {
545 pub name: String,
546 pub if_not_exists: bool,
547}
548
549#[derive(Debug)]
550pub struct CreateSchemaPlan {
551 pub database_spec: ResolvedDatabaseSpecifier,
552 pub schema_name: String,
553 pub if_not_exists: bool,
554}
555
556#[derive(Debug)]
557pub struct CreateRolePlan {
558 pub name: String,
559 pub attributes: RoleAttributesRaw,
560}
561
562#[derive(Debug, PartialEq, Eq, Clone)]
563pub struct CreateClusterPlan {
564 pub name: String,
565 pub variant: CreateClusterVariant,
566 pub workload_class: Option<String>,
567}
568
569#[derive(Debug, PartialEq, Eq, Clone)]
570pub enum CreateClusterVariant {
571 Managed(CreateClusterManagedPlan),
572 Unmanaged(CreateClusterUnmanagedPlan),
573}
574
575#[derive(Debug, PartialEq, Eq, Clone)]
576pub struct CreateClusterUnmanagedPlan {
577 pub replicas: Vec<(String, ReplicaConfig)>,
578}
579
580#[derive(Debug, PartialEq, Eq, Clone)]
581pub struct CreateClusterManagedPlan {
582 pub replication_factor: u32,
583 pub size: String,
584 pub availability_zones: Vec<String>,
585 pub compute: ComputeReplicaConfig,
586 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
587 pub schedule: ClusterSchedule,
588}
589
590#[derive(Debug)]
591pub struct CreateClusterReplicaPlan {
592 pub cluster_id: ClusterId,
593 pub name: String,
594 pub config: ReplicaConfig,
595}
596
597#[derive(
599 Clone,
600 Copy,
601 Debug,
602 Serialize,
603 Deserialize,
604 PartialOrd,
605 Ord,
606 PartialEq,
607 Eq
608)]
609pub struct ComputeReplicaIntrospectionConfig {
610 pub debugging: bool,
612 pub interval: Duration,
614}
615
616#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
617pub struct ComputeReplicaConfig {
618 pub introspection: Option<ComputeReplicaIntrospectionConfig>,
619}
620
621#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
622pub enum ReplicaConfig {
623 Unorchestrated {
624 storagectl_addrs: Vec<String>,
625 computectl_addrs: Vec<String>,
626 compute: ComputeReplicaConfig,
627 },
628 Orchestrated {
629 size: String,
630 availability_zone: Option<String>,
631 compute: ComputeReplicaConfig,
632 internal: bool,
633 billed_as: Option<String>,
634 },
635}
636
637#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
638pub enum ClusterSchedule {
639 Manual,
641 Refresh { hydration_time_estimate: Duration },
645}
646
647impl Default for ClusterSchedule {
648 fn default() -> Self {
649 ClusterSchedule::Manual
651 }
652}
653
654#[derive(Debug)]
655pub struct CreateSourcePlan {
656 pub name: QualifiedItemName,
657 pub source: Source,
658 pub if_not_exists: bool,
659 pub timeline: Timeline,
660 pub in_cluster: Option<ClusterId>,
662}
663
664#[derive(Clone, Debug, PartialEq, Eq)]
665pub struct SourceReferences {
666 pub updated_at: u64,
667 pub references: Vec<SourceReference>,
668}
669
670#[derive(Clone, Debug, PartialEq, Eq)]
673pub struct SourceReference {
674 pub name: String,
675 pub namespace: Option<String>,
676 pub columns: Vec<String>,
677}
678
679#[derive(Debug)]
681pub struct CreateSourcePlanBundle {
682 pub item_id: CatalogItemId,
684 pub global_id: GlobalId,
686 pub plan: CreateSourcePlan,
688 pub resolved_ids: ResolvedIds,
690 pub available_source_references: Option<SourceReferences>,
694}
695
696#[derive(Debug)]
697pub struct CreateConnectionPlan {
698 pub name: QualifiedItemName,
699 pub if_not_exists: bool,
700 pub connection: Connection,
701 pub validate: bool,
702}
703
704#[derive(Debug)]
705pub struct ValidateConnectionPlan {
706 pub id: CatalogItemId,
708 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
710}
711
712#[derive(Debug)]
713pub struct CreateSecretPlan {
714 pub name: QualifiedItemName,
715 pub secret: Secret,
716 pub if_not_exists: bool,
717}
718
719#[derive(Debug)]
720pub struct CreateSinkPlan {
721 pub name: QualifiedItemName,
722 pub sink: Sink,
723 pub with_snapshot: bool,
724 pub if_not_exists: bool,
725 pub in_cluster: ClusterId,
726}
727
728#[derive(Debug)]
729pub struct CreateTablePlan {
730 pub name: QualifiedItemName,
731 pub table: Table,
732 pub if_not_exists: bool,
733}
734
735#[derive(Debug, Clone)]
736pub struct CreateViewPlan {
737 pub name: QualifiedItemName,
738 pub view: View,
739 pub replace: Option<CatalogItemId>,
741 pub drop_ids: Vec<CatalogItemId>,
743 pub if_not_exists: bool,
744 pub ambiguous_columns: bool,
747}
748
749#[derive(Debug, Clone)]
750pub struct CreateMaterializedViewPlan {
751 pub name: QualifiedItemName,
752 pub materialized_view: MaterializedView,
753 pub replace: Option<CatalogItemId>,
755 pub drop_ids: Vec<CatalogItemId>,
757 pub if_not_exists: bool,
758 pub ambiguous_columns: bool,
761}
762
763#[derive(Debug, Clone)]
764pub struct CreateNetworkPolicyPlan {
765 pub name: String,
766 pub rules: Vec<NetworkPolicyRule>,
767}
768
769#[derive(Debug, Clone)]
770pub struct AlterNetworkPolicyPlan {
771 pub id: NetworkPolicyId,
772 pub name: String,
773 pub rules: Vec<NetworkPolicyRule>,
774}
775
776#[derive(Debug, Clone)]
777pub struct CreateIndexPlan {
778 pub name: QualifiedItemName,
779 pub index: Index,
780 pub if_not_exists: bool,
781}
782
783#[derive(Debug)]
784pub struct CreateTypePlan {
785 pub name: QualifiedItemName,
786 pub typ: Type,
787}
788
789#[derive(Debug)]
790pub struct DropObjectsPlan {
791 pub referenced_ids: Vec<ObjectId>,
793 pub drop_ids: Vec<ObjectId>,
795 pub object_type: ObjectType,
798}
799
800#[derive(Debug)]
801pub struct DropOwnedPlan {
802 pub role_ids: Vec<RoleId>,
804 pub drop_ids: Vec<ObjectId>,
806 pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
808 pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
810}
811
812#[derive(Debug)]
813pub struct ShowVariablePlan {
814 pub name: String,
815}
816
817#[derive(Debug)]
818pub struct InspectShardPlan {
819 pub id: GlobalId,
821}
822
823#[derive(Debug)]
824pub struct SetVariablePlan {
825 pub name: String,
826 pub value: VariableValue,
827 pub local: bool,
828}
829
830#[derive(Debug)]
831pub enum VariableValue {
832 Default,
833 Values(Vec<String>),
834}
835
836#[derive(Debug)]
837pub struct ResetVariablePlan {
838 pub name: String,
839}
840
841#[derive(Debug)]
842pub struct SetTransactionPlan {
843 pub local: bool,
844 pub modes: Vec<TransactionMode>,
845}
846
847#[derive(Clone, Debug)]
849pub struct SelectPlan {
850 pub select: Option<Box<SelectStatement<Aug>>>,
853 pub source: HirRelationExpr,
855 pub when: QueryWhen,
857 pub finishing: RowSetFinishing,
859 pub copy_to: Option<CopyFormat>,
861}
862
863impl SelectPlan {
864 pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
865 let arity = typ.arity();
866 SelectPlan {
867 select: None,
868 source: HirRelationExpr::Constant { rows, typ },
869 when: QueryWhen::Immediately,
870 finishing: RowSetFinishing::trivial(arity),
871 copy_to: None,
872 }
873 }
874}
875
876#[derive(Debug, Clone)]
877pub enum SubscribeOutput {
878 Diffs,
879 WithinTimestampOrderBy {
880 order_by: Vec<ColumnOrder>,
882 },
883 EnvelopeUpsert {
884 order_by_keys: Vec<ColumnOrder>,
886 },
887 EnvelopeDebezium {
888 order_by_keys: Vec<ColumnOrder>,
890 },
891}
892
893impl SubscribeOutput {
894 pub fn row_order(&self) -> &[ColumnOrder] {
895 match self {
896 SubscribeOutput::Diffs => &[],
897 SubscribeOutput::WithinTimestampOrderBy { .. } => &[],
899 SubscribeOutput::EnvelopeUpsert { order_by_keys } => order_by_keys,
900 SubscribeOutput::EnvelopeDebezium { order_by_keys } => order_by_keys,
901 }
902 }
903}
904
905#[derive(Debug, Clone)]
906pub struct SubscribePlan {
907 pub from: SubscribeFrom,
908 pub with_snapshot: bool,
909 pub when: QueryWhen,
910 pub up_to: Option<Timestamp>,
911 pub copy_to: Option<CopyFormat>,
912 pub emit_progress: bool,
913 pub output: SubscribeOutput,
914}
915
916#[derive(Debug, Clone)]
917pub enum SubscribeFrom {
918 Id(GlobalId),
920 Query {
922 expr: HirRelationExpr,
923 desc: RelationDesc,
924 },
925}
926
927impl SubscribeFrom {
928 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
929 match self {
930 SubscribeFrom::Id(id) => BTreeSet::from([*id]),
931 SubscribeFrom::Query { expr, .. } => expr.depends_on(),
932 }
933 }
934
935 pub fn contains_temporal(&self) -> bool {
936 match self {
937 SubscribeFrom::Id(_) => false,
938 SubscribeFrom::Query { expr, .. } => expr
939 .contains_temporal()
940 .expect("Unexpected error in `visit_scalars` call"),
941 }
942 }
943}
944
945#[derive(Debug)]
946pub struct ShowCreatePlan {
947 pub id: ObjectId,
948 pub row: Row,
949}
950
951#[derive(Debug)]
952pub struct ShowColumnsPlan {
953 pub id: CatalogItemId,
954 pub select_plan: SelectPlan,
955 pub new_resolved_ids: ResolvedIds,
956}
957
958#[derive(Debug)]
959pub struct CopyFromPlan {
960 pub target_id: CatalogItemId,
962 pub target_name: String,
964 pub source: CopyFromSource,
966 pub columns: Vec<ColumnIndex>,
970 pub source_desc: RelationDesc,
972 pub mfp: MapFilterProject,
974 pub params: CopyFormatParams<'static>,
976 pub filter: Option<CopyFromFilter>,
978}
979
980#[derive(Debug)]
981pub enum CopyFromSource {
982 Stdin,
984 Url(HirScalarExpr),
988 AwsS3 {
990 uri: HirScalarExpr,
992 connection: AwsConnection,
994 connection_id: CatalogItemId,
996 },
997}
998
999#[derive(Debug)]
1000pub enum CopyFromFilter {
1001 Files(Vec<String>),
1002 Pattern(String),
1003}
1004
1005#[derive(Debug, Clone)]
1010pub struct CopyToPlan {
1011 pub select_plan: SelectPlan,
1013 pub desc: RelationDesc,
1014 pub to: HirScalarExpr,
1016 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
1017 pub connection_id: CatalogItemId,
1019 pub format: S3SinkFormat,
1020 pub max_file_size: u64,
1021}
1022
1023#[derive(Clone, Debug)]
1024pub struct ExplainPlanPlan {
1025 pub stage: ExplainStage,
1026 pub format: ExplainFormat,
1027 pub config: ExplainConfig,
1028 pub explainee: Explainee,
1029}
1030
1031#[derive(Clone, Debug)]
1033pub enum Explainee {
1034 View(CatalogItemId),
1036 MaterializedView(CatalogItemId),
1038 Index(CatalogItemId),
1040 ReplanView(CatalogItemId),
1042 ReplanMaterializedView(CatalogItemId),
1044 ReplanIndex(CatalogItemId),
1046 Statement(ExplaineeStatement),
1048}
1049
1050#[derive(Clone, Debug, EnumKind)]
1052#[enum_kind(ExplaineeStatementKind)]
1053pub enum ExplaineeStatement {
1054 Select {
1056 broken: bool,
1058 plan: plan::SelectPlan,
1059 desc: RelationDesc,
1060 },
1061 CreateView {
1063 broken: bool,
1065 plan: plan::CreateViewPlan,
1066 },
1067 CreateMaterializedView {
1069 broken: bool,
1071 plan: plan::CreateMaterializedViewPlan,
1072 },
1073 CreateIndex {
1075 broken: bool,
1077 plan: plan::CreateIndexPlan,
1078 },
1079 Subscribe {
1081 broken: bool,
1083 plan: plan::SubscribePlan,
1084 },
1085}
1086
1087impl ExplaineeStatement {
1088 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1089 match self {
1090 Self::Select { plan, .. } => plan.source.depends_on(),
1091 Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1092 Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1093 Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1094 Self::Subscribe { plan, .. } => plan.from.depends_on(),
1095 }
1096 }
1097
1098 pub fn broken(&self) -> bool {
1109 match self {
1110 Self::Select { broken, .. } => *broken,
1111 Self::CreateView { broken, .. } => *broken,
1112 Self::CreateMaterializedView { broken, .. } => *broken,
1113 Self::CreateIndex { broken, .. } => *broken,
1114 Self::Subscribe { broken, .. } => *broken,
1115 }
1116 }
1117}
1118
1119impl ExplaineeStatementKind {
1120 pub fn supports(&self, stage: &ExplainStage) -> bool {
1121 use ExplainStage::*;
1122 match self {
1123 Self::Select => true,
1124 Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1125 Self::CreateMaterializedView => true,
1126 Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1127 Self::Subscribe => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1130 }
1131 }
1132}
1133
1134impl std::fmt::Display for ExplaineeStatementKind {
1135 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1136 match self {
1137 Self::Select => write!(f, "SELECT"),
1138 Self::CreateView => write!(f, "CREATE VIEW"),
1139 Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1140 Self::CreateIndex => write!(f, "CREATE INDEX"),
1141 Self::Subscribe => write!(f, "SUBSCRIBE"),
1142 }
1143 }
1144}
1145
1146#[derive(Clone, Debug)]
1147pub struct ExplainPushdownPlan {
1148 pub explainee: Explainee,
1149}
1150
1151#[derive(Clone, Debug)]
1152pub struct ExplainTimestampPlan {
1153 pub format: ExplainFormat,
1154 pub raw_plan: HirRelationExpr,
1155 pub when: QueryWhen,
1156}
1157
1158#[derive(Debug)]
1159pub struct ExplainSinkSchemaPlan {
1160 pub sink_from: GlobalId,
1161 pub json_schema: String,
1162}
1163
1164#[derive(Debug)]
1165pub struct SendDiffsPlan {
1166 pub id: CatalogItemId,
1167 pub updates: Vec<(Row, Diff)>,
1168 pub kind: MutationKind,
1169 pub returning: Vec<(Row, NonZeroUsize)>,
1170 pub max_result_size: u64,
1171}
1172
1173#[derive(Debug)]
1174pub struct InsertPlan {
1175 pub id: CatalogItemId,
1176 pub values: HirRelationExpr,
1177 pub returning: Vec<mz_expr::MirScalarExpr>,
1178}
1179
1180#[derive(Debug)]
1181pub struct ReadThenWritePlan {
1182 pub id: CatalogItemId,
1183 pub selection: HirRelationExpr,
1184 pub finishing: RowSetFinishing,
1185 pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1186 pub kind: MutationKind,
1187 pub returning: Vec<mz_expr::MirScalarExpr>,
1188}
1189
1190#[derive(Debug)]
1192pub struct AlterNoopPlan {
1193 pub object_type: ObjectType,
1194}
1195
1196#[derive(Debug)]
1197pub struct AlterSetClusterPlan {
1198 pub id: CatalogItemId,
1199 pub set_cluster: ClusterId,
1200}
1201
1202#[derive(Debug)]
1203pub struct AlterRetainHistoryPlan {
1204 pub id: CatalogItemId,
1205 pub value: Option<Value>,
1206 pub window: CompactionWindow,
1207 pub object_type: ObjectType,
1208}
1209
1210#[derive(Debug)]
1211pub struct AlterSourceTimestampIntervalPlan {
1212 pub id: CatalogItemId,
1213 pub value: Option<Value>,
1214 pub interval: Duration,
1215}
1216
1217#[derive(Debug, Clone)]
1218
1219pub enum AlterOptionParameter<T = String> {
1220 Set(T),
1221 Reset,
1222 Unchanged,
1223}
1224
1225#[derive(Debug)]
1226pub enum AlterConnectionAction {
1227 RotateKeys,
1228 AlterOptions {
1229 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1230 drop_options: BTreeSet<ConnectionOptionName>,
1231 validate: bool,
1232 },
1233}
1234
1235#[derive(Debug)]
1236pub struct AlterConnectionPlan {
1237 pub id: CatalogItemId,
1238 pub action: AlterConnectionAction,
1239}
1240
1241#[derive(Debug)]
1242pub enum AlterSourceAction {
1243 AddSubsourceExports {
1244 subsources: Vec<CreateSourcePlanBundle>,
1245 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1246 },
1247 RefreshReferences {
1248 references: SourceReferences,
1249 },
1250}
1251
1252#[derive(Debug)]
1253pub struct AlterSourcePlan {
1254 pub item_id: CatalogItemId,
1255 pub ingestion_id: GlobalId,
1256 pub action: AlterSourceAction,
1257}
1258
1259#[derive(Debug, Clone)]
1260pub struct AlterSinkPlan {
1261 pub item_id: CatalogItemId,
1262 pub global_id: GlobalId,
1263 pub sink: Sink,
1264 pub with_snapshot: bool,
1265 pub in_cluster: ClusterId,
1266}
1267
1268#[derive(Debug, Clone)]
1269pub struct AlterClusterPlan {
1270 pub id: ClusterId,
1271 pub name: String,
1272 pub options: PlanClusterOption,
1273 pub strategy: AlterClusterPlanStrategy,
1274}
1275
1276#[derive(Debug)]
1277pub struct AlterClusterRenamePlan {
1278 pub id: ClusterId,
1279 pub name: String,
1280 pub to_name: String,
1281}
1282
1283#[derive(Debug)]
1284pub struct AlterClusterReplicaRenamePlan {
1285 pub cluster_id: ClusterId,
1286 pub replica_id: ReplicaId,
1287 pub name: QualifiedReplica,
1288 pub to_name: String,
1289}
1290
1291#[derive(Debug)]
1292pub struct AlterItemRenamePlan {
1293 pub id: CatalogItemId,
1294 pub current_full_name: FullItemName,
1295 pub to_name: String,
1296 pub object_type: ObjectType,
1297}
1298
1299#[derive(Debug)]
1300pub struct AlterSchemaRenamePlan {
1301 pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1302 pub new_schema_name: String,
1303}
1304
1305#[derive(Debug)]
1306pub struct AlterSchemaSwapPlan {
1307 pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1308 pub schema_a_name: String,
1309 pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1310 pub schema_b_name: String,
1311 pub name_temp: String,
1312}
1313
1314#[derive(Debug)]
1315pub struct AlterClusterSwapPlan {
1316 pub id_a: ClusterId,
1317 pub id_b: ClusterId,
1318 pub name_a: String,
1319 pub name_b: String,
1320 pub name_temp: String,
1321}
1322
1323#[derive(Debug)]
1324pub struct AlterSecretPlan {
1325 pub id: CatalogItemId,
1326 pub secret_as: MirScalarExpr,
1327}
1328
1329#[derive(Debug)]
1330pub struct AlterSystemSetPlan {
1331 pub name: String,
1332 pub value: VariableValue,
1333}
1334
1335#[derive(Debug)]
1336pub struct AlterSystemResetPlan {
1337 pub name: String,
1338}
1339
1340#[derive(Debug)]
1341pub struct AlterSystemResetAllPlan {}
1342
1343#[derive(Debug)]
1344pub struct AlterRolePlan {
1345 pub id: RoleId,
1346 pub name: String,
1347 pub option: PlannedAlterRoleOption,
1348}
1349
1350#[derive(Debug)]
1351pub struct AlterOwnerPlan {
1352 pub id: ObjectId,
1353 pub object_type: ObjectType,
1354 pub new_owner: RoleId,
1355}
1356
1357#[derive(Debug)]
1358pub struct AlterTablePlan {
1359 pub relation_id: CatalogItemId,
1360 pub column_name: ColumnName,
1361 pub column_type: SqlColumnType,
1362 pub raw_sql_type: RawDataType,
1363}
1364
1365#[derive(Debug, Clone)]
1366pub struct AlterMaterializedViewApplyReplacementPlan {
1367 pub id: CatalogItemId,
1368 pub replacement_id: CatalogItemId,
1369}
1370
1371#[derive(Debug)]
1372pub struct DeclarePlan {
1373 pub name: String,
1374 pub stmt: Statement<Raw>,
1375 pub sql: String,
1376 pub params: Params,
1377}
1378
1379#[derive(Debug)]
1380pub struct FetchPlan {
1381 pub name: String,
1382 pub count: Option<FetchDirection>,
1383 pub timeout: ExecuteTimeout,
1384}
1385
1386#[derive(Debug)]
1387pub struct ClosePlan {
1388 pub name: String,
1389}
1390
1391#[derive(Debug)]
1392pub struct PreparePlan {
1393 pub name: String,
1394 pub stmt: Statement<Raw>,
1395 pub sql: String,
1396 pub desc: StatementDesc,
1397}
1398
1399#[derive(Debug)]
1400pub struct ExecutePlan {
1401 pub name: String,
1402 pub params: Params,
1403}
1404
1405#[derive(Debug)]
1406pub struct DeallocatePlan {
1407 pub name: Option<String>,
1408}
1409
1410#[derive(Debug)]
1411pub struct RaisePlan {
1412 pub severity: NoticeSeverity,
1413}
1414
1415#[derive(Debug)]
1416pub struct GrantRolePlan {
1417 pub role_ids: Vec<RoleId>,
1419 pub member_ids: Vec<RoleId>,
1421 pub grantor_id: RoleId,
1423}
1424
1425#[derive(Debug)]
1426pub struct RevokeRolePlan {
1427 pub role_ids: Vec<RoleId>,
1429 pub member_ids: Vec<RoleId>,
1431 pub grantor_id: RoleId,
1433}
1434
1435#[derive(Debug)]
1436pub struct UpdatePrivilege {
1437 pub acl_mode: AclMode,
1439 pub target_id: SystemObjectId,
1441 pub grantor: RoleId,
1443}
1444
1445#[derive(Debug)]
1446pub struct GrantPrivilegesPlan {
1447 pub update_privileges: Vec<UpdatePrivilege>,
1449 pub grantees: Vec<RoleId>,
1451}
1452
1453#[derive(Debug)]
1454pub struct RevokePrivilegesPlan {
1455 pub update_privileges: Vec<UpdatePrivilege>,
1457 pub revokees: Vec<RoleId>,
1459}
1460#[derive(Debug)]
1461pub struct AlterDefaultPrivilegesPlan {
1462 pub privilege_objects: Vec<DefaultPrivilegeObject>,
1464 pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1466 pub is_grant: bool,
1468}
1469
1470#[derive(Debug)]
1471pub struct ReassignOwnedPlan {
1472 pub old_roles: Vec<RoleId>,
1474 pub new_role: RoleId,
1476 pub reassign_ids: Vec<ObjectId>,
1478}
1479
1480#[derive(Debug)]
1481pub struct CommentPlan {
1482 pub object_id: CommentObjectId,
1484 pub sub_component: Option<usize>,
1488 pub comment: Option<String>,
1490}
1491
1492#[derive(Clone, Debug)]
1493pub enum TableDataSource {
1494 TableWrites { defaults: Vec<Expr<Aug>> },
1496
1497 DataSource {
1500 desc: DataSourceDesc,
1501 timeline: Timeline,
1502 },
1503}
1504
1505#[derive(Clone, Debug)]
1506pub struct Table {
1507 pub create_sql: String,
1508 pub desc: VersionedRelationDesc,
1509 pub temporary: bool,
1510 pub compaction_window: Option<CompactionWindow>,
1511 pub data_source: TableDataSource,
1512}
1513
1514#[derive(Clone, Debug)]
1515pub struct Source {
1516 pub create_sql: String,
1517 pub data_source: DataSourceDesc,
1518 pub desc: RelationDesc,
1519 pub compaction_window: Option<CompactionWindow>,
1520}
1521
1522#[derive(Debug, Clone)]
1523pub enum DataSourceDesc {
1524 Ingestion(SourceDesc<ReferencedConnection>),
1526 OldSyntaxIngestion {
1528 desc: SourceDesc<ReferencedConnection>,
1529 progress_subsource: CatalogItemId,
1532 data_config: SourceExportDataConfig<ReferencedConnection>,
1533 details: SourceExportDetails,
1534 },
1535 IngestionExport {
1538 ingestion_id: CatalogItemId,
1539 external_reference: UnresolvedItemName,
1540 details: SourceExportDetails,
1541 data_config: SourceExportDataConfig<ReferencedConnection>,
1542 },
1543 Progress,
1545 Webhook {
1547 validate_using: Option<WebhookValidation>,
1548 body_format: WebhookBodyFormat,
1549 headers: WebhookHeaders,
1550 cluster_id: Option<StorageInstanceId>,
1552 },
1553}
1554
1555#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1556pub struct WebhookValidation {
1557 pub expression: MirScalarExpr,
1559 pub relation_desc: RelationDesc,
1561 pub bodies: Vec<(usize, bool)>,
1563 pub headers: Vec<(usize, bool)>,
1565 pub secrets: Vec<WebhookValidationSecret>,
1567}
1568
1569impl WebhookValidation {
1570 const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1571
1572 pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1577 let WebhookValidation {
1578 expression,
1579 relation_desc,
1580 ..
1581 } = self;
1582
1583 let mut expression_ = expression.clone();
1585 let desc_ = relation_desc.clone();
1586 let reduce_task = mz_ore::task::spawn_blocking(
1587 || "webhook-validation-reduce",
1588 move || {
1589 let repr_col_types: Vec<ReprColumnType> = desc_
1590 .typ()
1591 .column_types
1592 .iter()
1593 .map(ReprColumnType::from)
1594 .collect();
1595 expression_.reduce(&repr_col_types);
1596 expression_
1597 },
1598 );
1599
1600 match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1601 Ok(reduced_expr) => {
1602 *expression = reduced_expr;
1603 Ok(())
1604 }
1605 Err(_) => Err("timeout"),
1606 }
1607 }
1608}
1609
1610#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1611pub struct WebhookHeaders {
1612 pub header_column: Option<WebhookHeaderFilters>,
1614 pub mapped_headers: BTreeMap<usize, (String, bool)>,
1616}
1617
1618impl WebhookHeaders {
1619 pub fn num_columns(&self) -> usize {
1621 let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1622 let mapped_headers = self.mapped_headers.len();
1623
1624 header_column + mapped_headers
1625 }
1626}
1627
1628#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1629pub struct WebhookHeaderFilters {
1630 pub block: BTreeSet<String>,
1631 pub allow: BTreeSet<String>,
1632}
1633
1634#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Arbitrary)]
1635pub enum WebhookBodyFormat {
1636 Json { array: bool },
1637 Bytes,
1638 Text,
1639}
1640
1641impl From<WebhookBodyFormat> for SqlScalarType {
1642 fn from(value: WebhookBodyFormat) -> Self {
1643 match value {
1644 WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1645 WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1646 WebhookBodyFormat::Text => SqlScalarType::String,
1647 }
1648 }
1649}
1650
1651#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1652pub struct WebhookValidationSecret {
1653 pub id: CatalogItemId,
1655 pub column_idx: usize,
1657 pub use_bytes: bool,
1659}
1660
1661#[derive(Clone, Debug)]
1662pub struct Connection {
1663 pub create_sql: String,
1664 pub details: ConnectionDetails,
1665}
1666
1667#[derive(Clone, Debug, Serialize)]
1668pub enum ConnectionDetails {
1669 Kafka(KafkaConnection<ReferencedConnection>),
1670 Csr(CsrConnection<ReferencedConnection>),
1671 Postgres(PostgresConnection<ReferencedConnection>),
1672 Ssh {
1673 connection: SshConnection,
1674 key_1: SshKey,
1675 key_2: SshKey,
1676 },
1677 Aws(AwsConnection),
1678 AwsPrivatelink(AwsPrivatelinkConnection),
1679 MySql(MySqlConnection<ReferencedConnection>),
1680 SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1681 IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1682}
1683
1684impl ConnectionDetails {
1685 pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1686 match self {
1687 ConnectionDetails::Kafka(c) => {
1688 mz_storage_types::connections::Connection::Kafka(c.clone())
1689 }
1690 ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1691 ConnectionDetails::Postgres(c) => {
1692 mz_storage_types::connections::Connection::Postgres(c.clone())
1693 }
1694 ConnectionDetails::Ssh { connection, .. } => {
1695 mz_storage_types::connections::Connection::Ssh(connection.clone())
1696 }
1697 ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1698 ConnectionDetails::AwsPrivatelink(c) => {
1699 mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1700 }
1701 ConnectionDetails::MySql(c) => {
1702 mz_storage_types::connections::Connection::MySql(c.clone())
1703 }
1704 ConnectionDetails::SqlServer(c) => {
1705 mz_storage_types::connections::Connection::SqlServer(c.clone())
1706 }
1707 ConnectionDetails::IcebergCatalog(c) => {
1708 mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1709 }
1710 }
1711 }
1712}
1713
1714#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1715pub struct NetworkPolicyRule {
1716 pub name: String,
1717 pub action: NetworkPolicyRuleAction,
1718 pub address: PolicyAddress,
1719 pub direction: NetworkPolicyRuleDirection,
1720}
1721
1722#[derive(
1723 Debug,
1724 Clone,
1725 Serialize,
1726 Deserialize,
1727 PartialEq,
1728 Eq,
1729 Ord,
1730 PartialOrd,
1731 Hash
1732)]
1733pub enum NetworkPolicyRuleAction {
1734 Allow,
1735}
1736
1737impl std::fmt::Display for NetworkPolicyRuleAction {
1738 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1739 match self {
1740 Self::Allow => write!(f, "allow"),
1741 }
1742 }
1743}
1744impl TryFrom<&str> for NetworkPolicyRuleAction {
1745 type Error = PlanError;
1746 fn try_from(value: &str) -> Result<Self, Self::Error> {
1747 match value.to_uppercase().as_str() {
1748 "ALLOW" => Ok(Self::Allow),
1749 _ => Err(PlanError::Unstructured(
1750 "Allow is the only valid option".into(),
1751 )),
1752 }
1753 }
1754}
1755
1756#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1757pub enum NetworkPolicyRuleDirection {
1758 Ingress,
1759}
1760impl std::fmt::Display for NetworkPolicyRuleDirection {
1761 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1762 match self {
1763 Self::Ingress => write!(f, "ingress"),
1764 }
1765 }
1766}
1767impl TryFrom<&str> for NetworkPolicyRuleDirection {
1768 type Error = PlanError;
1769 fn try_from(value: &str) -> Result<Self, Self::Error> {
1770 match value.to_uppercase().as_str() {
1771 "INGRESS" => Ok(Self::Ingress),
1772 _ => Err(PlanError::Unstructured(
1773 "Ingress is the only valid option".into(),
1774 )),
1775 }
1776 }
1777}
1778
1779#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1780pub struct PolicyAddress(pub IpNet);
1781impl std::fmt::Display for PolicyAddress {
1782 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1783 write!(f, "{}", &self.0.to_string())
1784 }
1785}
1786impl From<String> for PolicyAddress {
1787 fn from(value: String) -> Self {
1788 Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1789 }
1790}
1791impl TryFrom<&str> for PolicyAddress {
1792 type Error = PlanError;
1793 fn try_from(value: &str) -> Result<Self, Self::Error> {
1794 let net = IpNet::from_str(value)
1795 .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1796 Ok(Self(net))
1797 }
1798}
1799
1800impl Serialize for PolicyAddress {
1801 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1802 where
1803 S: serde::Serializer,
1804 {
1805 serializer.serialize_str(&format!("{}", &self.0))
1806 }
1807}
1808
1809#[derive(Clone, Debug, Serialize)]
1810pub enum SshKey {
1811 PublicOnly(String),
1812 Both(SshKeyPair),
1813}
1814
1815impl SshKey {
1816 pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1817 match self {
1818 SshKey::PublicOnly(_) => None,
1819 SshKey::Both(key_pair) => Some(key_pair),
1820 }
1821 }
1822
1823 pub fn public_key(&self) -> String {
1824 match self {
1825 SshKey::PublicOnly(s) => s.into(),
1826 SshKey::Both(p) => p.ssh_public_key(),
1827 }
1828 }
1829}
1830
1831#[derive(Clone, Debug)]
1832pub struct Secret {
1833 pub create_sql: String,
1834 pub secret_as: MirScalarExpr,
1835}
1836
1837#[derive(Clone, Debug)]
1838pub struct Sink {
1839 pub create_sql: String,
1841 pub from: GlobalId,
1843 pub connection: StorageSinkConnection<ReferencedConnection>,
1845 pub envelope: SinkEnvelope,
1847 pub version: u64,
1848 pub commit_interval: Option<Duration>,
1849}
1850
1851#[derive(Clone, Debug)]
1852pub struct View {
1853 pub create_sql: String,
1855 pub expr: HirRelationExpr,
1857 pub dependencies: DependencyIds,
1859 pub column_names: Vec<ColumnName>,
1861 pub temporary: bool,
1863}
1864
1865#[derive(Clone, Debug)]
1866pub struct MaterializedView {
1867 pub create_sql: String,
1869 pub expr: HirRelationExpr,
1871 pub dependencies: DependencyIds,
1873 pub column_names: Vec<ColumnName>,
1875 pub replacement_target: Option<CatalogItemId>,
1876 pub cluster_id: ClusterId,
1878 pub target_replica: Option<ReplicaId>,
1880 pub non_null_assertions: Vec<usize>,
1881 pub compaction_window: Option<CompactionWindow>,
1882 pub refresh_schedule: Option<RefreshSchedule>,
1883 pub as_of: Option<Timestamp>,
1884}
1885
1886#[derive(Clone, Debug)]
1887pub struct Index {
1888 pub create_sql: String,
1890 pub on: GlobalId,
1892 pub keys: Vec<mz_expr::MirScalarExpr>,
1893 pub compaction_window: Option<CompactionWindow>,
1894 pub cluster_id: ClusterId,
1895}
1896
1897#[derive(Clone, Debug)]
1898pub struct Type {
1899 pub create_sql: String,
1900 pub inner: CatalogType<IdReference>,
1901}
1902
1903#[derive(Deserialize, Clone, Debug, PartialEq)]
1905pub enum QueryWhen {
1906 Immediately,
1909 FreshestTableWrite,
1912 AtTimestamp(Timestamp),
1917 AtLeastTimestamp(Timestamp),
1920}
1921
1922impl QueryWhen {
1923 pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1925 match self {
1926 QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1927 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1928 }
1929 }
1930 pub fn constrains_upper(&self) -> bool {
1934 match self {
1935 QueryWhen::AtTimestamp(_) => true,
1936 QueryWhen::AtLeastTimestamp(_)
1937 | QueryWhen::Immediately
1938 | QueryWhen::FreshestTableWrite => false,
1939 }
1940 }
1941 pub fn advance_to_since(&self) -> bool {
1943 match self {
1944 QueryWhen::Immediately
1945 | QueryWhen::AtLeastTimestamp(_)
1946 | QueryWhen::FreshestTableWrite => true,
1947 QueryWhen::AtTimestamp(_) => false,
1948 }
1949 }
1950 pub fn can_advance_to_upper(&self) -> bool {
1952 match self {
1953 QueryWhen::Immediately => true,
1954 QueryWhen::FreshestTableWrite
1955 | QueryWhen::AtTimestamp(_)
1956 | QueryWhen::AtLeastTimestamp(_) => false,
1957 }
1958 }
1959
1960 pub fn can_advance_to_timeline_ts(&self) -> bool {
1962 match self {
1963 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1964 QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1965 }
1966 }
1967 pub fn must_advance_to_timeline_ts(&self) -> bool {
1969 match self {
1970 QueryWhen::FreshestTableWrite => true,
1971 QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1972 false
1973 }
1974 }
1975 }
1976 pub fn is_transactional(&self) -> bool {
1978 match self {
1979 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1980 QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1981 }
1982 }
1983}
1984
1985#[derive(Debug, Copy, Clone)]
1986pub enum MutationKind {
1987 Insert,
1988 Update,
1989 Delete,
1990}
1991
1992#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1993pub enum CopyFormat {
1994 Text,
1995 Csv,
1996 Binary,
1997 Parquet,
1998}
1999
2000#[derive(Debug, Copy, Clone)]
2001pub enum ExecuteTimeout {
2002 None,
2003 Seconds(f64),
2004 WaitOnce,
2005}
2006
2007#[derive(Clone, Debug)]
2008pub enum IndexOption {
2009 RetainHistory(CompactionWindow),
2011}
2012
2013#[derive(Clone, Debug)]
2014pub enum TableOption {
2015 RetainHistory(CompactionWindow),
2017}
2018
2019#[derive(Clone, Debug)]
2020pub struct PlanClusterOption {
2021 pub availability_zones: AlterOptionParameter<Vec<String>>,
2022 pub introspection_debugging: AlterOptionParameter<bool>,
2023 pub introspection_interval: AlterOptionParameter<OptionalDuration>,
2024 pub managed: AlterOptionParameter<bool>,
2025 pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
2026 pub replication_factor: AlterOptionParameter<u32>,
2027 pub size: AlterOptionParameter,
2028 pub schedule: AlterOptionParameter<ClusterSchedule>,
2029 pub workload_class: AlterOptionParameter<Option<String>>,
2030}
2031
2032impl Default for PlanClusterOption {
2033 fn default() -> Self {
2034 Self {
2035 availability_zones: AlterOptionParameter::Unchanged,
2036 introspection_debugging: AlterOptionParameter::Unchanged,
2037 introspection_interval: AlterOptionParameter::Unchanged,
2038 managed: AlterOptionParameter::Unchanged,
2039 replicas: AlterOptionParameter::Unchanged,
2040 replication_factor: AlterOptionParameter::Unchanged,
2041 size: AlterOptionParameter::Unchanged,
2042 schedule: AlterOptionParameter::Unchanged,
2043 workload_class: AlterOptionParameter::Unchanged,
2044 }
2045 }
2046}
2047
2048#[derive(Clone, Debug, PartialEq, Eq)]
2049pub enum AlterClusterPlanStrategy {
2050 None,
2051 For(Duration),
2052 UntilReady {
2053 on_timeout: OnTimeoutAction,
2054 timeout: Duration,
2055 },
2056}
2057
2058#[derive(Clone, Debug, PartialEq, Eq)]
2059pub enum OnTimeoutAction {
2060 Commit,
2061 Rollback,
2062}
2063
2064impl Default for OnTimeoutAction {
2065 fn default() -> Self {
2066 Self::Commit
2067 }
2068}
2069
2070impl TryFrom<&str> for OnTimeoutAction {
2071 type Error = PlanError;
2072 fn try_from(value: &str) -> Result<Self, Self::Error> {
2073 match value.to_uppercase().as_str() {
2074 "COMMIT" => Ok(Self::Commit),
2075 "ROLLBACK" => Ok(Self::Rollback),
2076 _ => Err(PlanError::Unstructured(
2077 "Valid options are COMMIT, ROLLBACK".into(),
2078 )),
2079 }
2080 }
2081}
2082
2083impl AlterClusterPlanStrategy {
2084 pub fn is_none(&self) -> bool {
2085 matches!(self, Self::None)
2086 }
2087 pub fn is_some(&self) -> bool {
2088 !matches!(self, Self::None)
2089 }
2090}
2091
2092impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2093 type Error = PlanError;
2094
2095 fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2096 Ok(match value.wait {
2097 Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2098 Some(ClusterAlterOptionValue::UntilReady(options)) => {
2099 let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2100 Self::UntilReady {
2101 timeout: match extracted.timeout {
2102 Some(d) => d,
2103 None => Err(PlanError::UntilReadyTimeoutRequired)?,
2104 },
2105 on_timeout: match extracted.on_timeout {
2106 Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2107 PlanError::InvalidOptionValue {
2108 option_name: "ON TIMEOUT".into(),
2109 err: Box::new(e),
2110 }
2111 })?,
2112 None => OnTimeoutAction::default(),
2113 },
2114 }
2115 }
2116 None => Self::None,
2117 })
2118 }
2119}
2120
2121#[derive(Debug, Clone)]
2123pub struct Params {
2124 pub datums: Row,
2126 pub execute_types: Vec<SqlScalarType>,
2128 pub expected_types: Vec<SqlScalarType>,
2130}
2131
2132impl Params {
2133 pub fn empty() -> Params {
2135 Params {
2136 datums: Row::pack_slice(&[]),
2137 execute_types: vec![],
2138 expected_types: vec![],
2139 }
2140 }
2141}
2142
2143#[derive(
2145 Ord,
2146 PartialOrd,
2147 Clone,
2148 Debug,
2149 Eq,
2150 PartialEq,
2151 Serialize,
2152 Deserialize,
2153 Hash,
2154 Copy
2155)]
2156pub struct PlanContext {
2157 pub wall_time: DateTime<Utc>,
2158 pub ignore_if_exists_errors: bool,
2159}
2160
2161impl PlanContext {
2162 pub fn new(wall_time: DateTime<Utc>) -> Self {
2163 Self {
2164 wall_time,
2165 ignore_if_exists_errors: false,
2166 }
2167 }
2168
2169 pub fn zero() -> Self {
2173 PlanContext {
2174 wall_time: now::to_datetime(NOW_ZERO()),
2175 ignore_if_exists_errors: false,
2176 }
2177 }
2178
2179 pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2180 self.ignore_if_exists_errors = value;
2181 self
2182 }
2183}