1use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{
41 CollectionPlan, ColumnOrder, MapFilterProject, MirRelationExpr, MirScalarExpr, RowSetFinishing,
42};
43use mz_ore::now::{self, NOW_ZERO};
44use mz_pgcopy::CopyFormatParams;
45use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
46use mz_repr::explain::{ExplainConfig, ExplainFormat};
47use mz_repr::network_policy_id::NetworkPolicyId;
48use mz_repr::optimize::OptimizerFeatureOverrides;
49use mz_repr::refresh_schedule::RefreshSchedule;
50use mz_repr::role_id::RoleId;
51use mz_repr::{
52 CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, Row, SqlColumnType,
53 SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
54};
55use mz_sql_parser::ast::{
56 AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
57 RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
58 Value, WithOptionValue,
59};
60use mz_ssh_util::keys::SshKeyPair;
61use mz_storage_types::connections::aws::AwsConnection;
62use mz_storage_types::connections::inline::ReferencedConnection;
63use mz_storage_types::connections::{
64 AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
65 MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
66};
67use mz_storage_types::instances::StorageInstanceId;
68use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
69use mz_storage_types::sources::{
70 SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
71};
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74
75use crate::ast::{
76 ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
77 TransactionAccessMode,
78};
79use crate::catalog::{
80 CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
81 RoleAttributesRaw,
82};
83use crate::names::{
84 Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
85 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
86};
87
88pub(crate) mod error;
89pub(crate) mod explain;
90pub(crate) mod hir;
91pub(crate) mod literal;
92pub(crate) mod lowering;
93pub(crate) mod notice;
94pub(crate) mod plan_utils;
95pub(crate) mod query;
96pub(crate) mod scope;
97pub(crate) mod side_effecting_func;
98pub(crate) mod statement;
99pub(crate) mod transform_ast;
100pub(crate) mod transform_hir;
101pub(crate) mod typeconv;
102pub(crate) mod with_options;
103
104use crate::plan;
105use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
106use crate::plan::with_options::OptionalDuration;
107pub use error::PlanError;
108pub use explain::normalize_subqueries;
109pub use hir::{
110 AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
111 WindowExprType,
112};
113pub use lowering::Config as HirToMirConfig;
114pub use notice::PlanNotice;
115pub use query::{ExprContext, QueryContext, QueryLifetime};
116pub use scope::Scope;
117pub use side_effecting_func::SideEffectingFunc;
118pub use statement::ddl::{
119 AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
120 PlannedAlterRoleOption, PlannedRoleVariable, SqlServerConfigOptionExtracted,
121};
122pub use statement::{
123 StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
124 resolve_cluster_for_materialized_view,
125};
126
127use self::statement::ddl::ClusterAlterOptionExtracted;
128use self::with_options::TryFromValue;
129
130#[derive(Debug, EnumKind)]
132#[enum_kind(PlanKind)]
133pub enum Plan {
134 CreateConnection(CreateConnectionPlan),
135 CreateDatabase(CreateDatabasePlan),
136 CreateSchema(CreateSchemaPlan),
137 CreateRole(CreateRolePlan),
138 CreateCluster(CreateClusterPlan),
139 CreateClusterReplica(CreateClusterReplicaPlan),
140 CreateSource(CreateSourcePlan),
141 CreateSources(Vec<CreateSourcePlanBundle>),
142 CreateSecret(CreateSecretPlan),
143 CreateSink(CreateSinkPlan),
144 CreateTable(CreateTablePlan),
145 CreateView(CreateViewPlan),
146 CreateMaterializedView(CreateMaterializedViewPlan),
147 CreateContinualTask(CreateContinualTaskPlan),
148 CreateNetworkPolicy(CreateNetworkPolicyPlan),
149 CreateIndex(CreateIndexPlan),
150 CreateType(CreateTypePlan),
151 Comment(CommentPlan),
152 DiscardTemp,
153 DiscardAll,
154 DropObjects(DropObjectsPlan),
155 DropOwned(DropOwnedPlan),
156 EmptyQuery,
157 ShowAllVariables,
158 ShowCreate(ShowCreatePlan),
159 ShowColumns(ShowColumnsPlan),
160 ShowVariable(ShowVariablePlan),
161 InspectShard(InspectShardPlan),
162 SetVariable(SetVariablePlan),
163 ResetVariable(ResetVariablePlan),
164 SetTransaction(SetTransactionPlan),
165 StartTransaction(StartTransactionPlan),
166 CommitTransaction(CommitTransactionPlan),
167 AbortTransaction(AbortTransactionPlan),
168 Select(SelectPlan),
169 Subscribe(SubscribePlan),
170 CopyFrom(CopyFromPlan),
171 CopyTo(CopyToPlan),
172 ExplainPlan(ExplainPlanPlan),
173 ExplainPushdown(ExplainPushdownPlan),
174 ExplainTimestamp(ExplainTimestampPlan),
175 ExplainSinkSchema(ExplainSinkSchemaPlan),
176 Insert(InsertPlan),
177 AlterCluster(AlterClusterPlan),
178 AlterClusterSwap(AlterClusterSwapPlan),
179 AlterNoop(AlterNoopPlan),
180 AlterSetCluster(AlterSetClusterPlan),
181 AlterConnection(AlterConnectionPlan),
182 AlterSource(AlterSourcePlan),
183 AlterClusterRename(AlterClusterRenamePlan),
184 AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
185 AlterItemRename(AlterItemRenamePlan),
186 AlterSchemaRename(AlterSchemaRenamePlan),
187 AlterSchemaSwap(AlterSchemaSwapPlan),
188 AlterSecret(AlterSecretPlan),
189 AlterSink(AlterSinkPlan),
190 AlterSystemSet(AlterSystemSetPlan),
191 AlterSystemReset(AlterSystemResetPlan),
192 AlterSystemResetAll(AlterSystemResetAllPlan),
193 AlterRole(AlterRolePlan),
194 AlterOwner(AlterOwnerPlan),
195 AlterTableAddColumn(AlterTablePlan),
196 AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementPlan),
197 AlterNetworkPolicy(AlterNetworkPolicyPlan),
198 Declare(DeclarePlan),
199 Fetch(FetchPlan),
200 Close(ClosePlan),
201 ReadThenWrite(ReadThenWritePlan),
202 Prepare(PreparePlan),
203 Execute(ExecutePlan),
204 Deallocate(DeallocatePlan),
205 Raise(RaisePlan),
206 GrantRole(GrantRolePlan),
207 RevokeRole(RevokeRolePlan),
208 GrantPrivileges(GrantPrivilegesPlan),
209 RevokePrivileges(RevokePrivilegesPlan),
210 AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
211 ReassignOwned(ReassignOwnedPlan),
212 SideEffectingFunc(SideEffectingFunc),
213 ValidateConnection(ValidateConnectionPlan),
214 AlterRetainHistory(AlterRetainHistoryPlan),
215}
216
217impl Plan {
218 pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
221 match stmt {
222 StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
223 StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
224 StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
225 StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
226 StatementKind::AlterObjectRename => &[
227 PlanKind::AlterClusterRename,
228 PlanKind::AlterClusterReplicaRename,
229 PlanKind::AlterItemRename,
230 PlanKind::AlterSchemaRename,
231 PlanKind::AlterNoop,
232 ],
233 StatementKind::AlterObjectSwap => &[
234 PlanKind::AlterClusterSwap,
235 PlanKind::AlterSchemaSwap,
236 PlanKind::AlterNoop,
237 ],
238 StatementKind::AlterRole => &[PlanKind::AlterRole],
239 StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
240 StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
241 StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
242 StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
243 StatementKind::AlterSource => &[
244 PlanKind::AlterNoop,
245 PlanKind::AlterSource,
246 PlanKind::AlterRetainHistory,
247 ],
248 StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
249 StatementKind::AlterSystemResetAll => {
250 &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
251 }
252 StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
253 StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
254 StatementKind::AlterTableAddColumn => {
255 &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
256 }
257 StatementKind::AlterMaterializedViewApplyReplacement => &[
258 PlanKind::AlterNoop,
259 PlanKind::AlterMaterializedViewApplyReplacement,
260 ],
261 StatementKind::Close => &[PlanKind::Close],
262 StatementKind::Comment => &[PlanKind::Comment],
263 StatementKind::Commit => &[PlanKind::CommitTransaction],
264 StatementKind::Copy => &[
265 PlanKind::CopyFrom,
266 PlanKind::Select,
267 PlanKind::Subscribe,
268 PlanKind::CopyTo,
269 ],
270 StatementKind::CreateCluster => &[PlanKind::CreateCluster],
271 StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
272 StatementKind::CreateConnection => &[PlanKind::CreateConnection],
273 StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
274 StatementKind::CreateIndex => &[PlanKind::CreateIndex],
275 StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
276 StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
277 StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
278 StatementKind::CreateRole => &[PlanKind::CreateRole],
279 StatementKind::CreateSchema => &[PlanKind::CreateSchema],
280 StatementKind::CreateSecret => &[PlanKind::CreateSecret],
281 StatementKind::CreateSink => &[PlanKind::CreateSink],
282 StatementKind::CreateSource | StatementKind::CreateSubsource => {
283 &[PlanKind::CreateSource]
284 }
285 StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
286 StatementKind::CreateTable => &[PlanKind::CreateTable],
287 StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
288 StatementKind::CreateType => &[PlanKind::CreateType],
289 StatementKind::CreateView => &[PlanKind::CreateView],
290 StatementKind::Deallocate => &[PlanKind::Deallocate],
291 StatementKind::Declare => &[PlanKind::Declare],
292 StatementKind::Delete => &[PlanKind::ReadThenWrite],
293 StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
294 StatementKind::DropObjects => &[PlanKind::DropObjects],
295 StatementKind::DropOwned => &[PlanKind::DropOwned],
296 StatementKind::Execute => &[PlanKind::Execute],
297 StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
298 StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
299 StatementKind::ExplainAnalyzeObject => &[PlanKind::Select],
300 StatementKind::ExplainAnalyzeCluster => &[PlanKind::Select],
301 StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
302 StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
303 StatementKind::Fetch => &[PlanKind::Fetch],
304 StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
305 StatementKind::GrantRole => &[PlanKind::GrantRole],
306 StatementKind::Insert => &[PlanKind::Insert],
307 StatementKind::Prepare => &[PlanKind::Prepare],
308 StatementKind::Raise => &[PlanKind::Raise],
309 StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
310 StatementKind::ResetVariable => &[PlanKind::ResetVariable],
311 StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
312 StatementKind::RevokeRole => &[PlanKind::RevokeRole],
313 StatementKind::Rollback => &[PlanKind::AbortTransaction],
314 StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
315 StatementKind::SetTransaction => &[PlanKind::SetTransaction],
316 StatementKind::SetVariable => &[PlanKind::SetVariable],
317 StatementKind::Show => &[
318 PlanKind::Select,
319 PlanKind::ShowVariable,
320 PlanKind::ShowCreate,
321 PlanKind::ShowColumns,
322 PlanKind::ShowAllVariables,
323 PlanKind::InspectShard,
324 ],
325 StatementKind::StartTransaction => &[PlanKind::StartTransaction],
326 StatementKind::Subscribe => &[PlanKind::Subscribe],
327 StatementKind::Update => &[PlanKind::ReadThenWrite],
328 StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
329 StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
330 }
331 }
332
333 pub fn name(&self) -> &str {
335 match self {
336 Plan::CreateConnection(_) => "create connection",
337 Plan::CreateDatabase(_) => "create database",
338 Plan::CreateSchema(_) => "create schema",
339 Plan::CreateRole(_) => "create role",
340 Plan::CreateCluster(_) => "create cluster",
341 Plan::CreateClusterReplica(_) => "create cluster replica",
342 Plan::CreateSource(_) => "create source",
343 Plan::CreateSources(_) => "create source",
344 Plan::CreateSecret(_) => "create secret",
345 Plan::CreateSink(_) => "create sink",
346 Plan::CreateTable(_) => "create table",
347 Plan::CreateView(_) => "create view",
348 Plan::CreateMaterializedView(_) => "create materialized view",
349 Plan::CreateContinualTask(_) => "create continual task",
350 Plan::CreateIndex(_) => "create index",
351 Plan::CreateType(_) => "create type",
352 Plan::CreateNetworkPolicy(_) => "create network policy",
353 Plan::Comment(_) => "comment",
354 Plan::DiscardTemp => "discard temp",
355 Plan::DiscardAll => "discard all",
356 Plan::DropObjects(plan) => match plan.object_type {
357 ObjectType::Table => "drop table",
358 ObjectType::View => "drop view",
359 ObjectType::MaterializedView => "drop materialized view",
360 ObjectType::Source => "drop source",
361 ObjectType::Sink => "drop sink",
362 ObjectType::Index => "drop index",
363 ObjectType::Type => "drop type",
364 ObjectType::Role => "drop roles",
365 ObjectType::Cluster => "drop clusters",
366 ObjectType::ClusterReplica => "drop cluster replicas",
367 ObjectType::Secret => "drop secret",
368 ObjectType::Connection => "drop connection",
369 ObjectType::Database => "drop database",
370 ObjectType::Schema => "drop schema",
371 ObjectType::Func => "drop function",
372 ObjectType::ContinualTask => "drop continual task",
373 ObjectType::NetworkPolicy => "drop network policy",
374 },
375 Plan::DropOwned(_) => "drop owned",
376 Plan::EmptyQuery => "do nothing",
377 Plan::ShowAllVariables => "show all variables",
378 Plan::ShowCreate(_) => "show create",
379 Plan::ShowColumns(_) => "show columns",
380 Plan::ShowVariable(_) => "show variable",
381 Plan::InspectShard(_) => "inspect shard",
382 Plan::SetVariable(_) => "set variable",
383 Plan::ResetVariable(_) => "reset variable",
384 Plan::SetTransaction(_) => "set transaction",
385 Plan::StartTransaction(_) => "start transaction",
386 Plan::CommitTransaction(_) => "commit",
387 Plan::AbortTransaction(_) => "abort",
388 Plan::Select(_) => "select",
389 Plan::Subscribe(_) => "subscribe",
390 Plan::CopyFrom(_) => "copy from",
391 Plan::CopyTo(_) => "copy to",
392 Plan::ExplainPlan(_) => "explain plan",
393 Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
394 Plan::ExplainTimestamp(_) => "explain timestamp",
395 Plan::ExplainSinkSchema(_) => "explain schema",
396 Plan::Insert(_) => "insert",
397 Plan::AlterNoop(plan) => match plan.object_type {
398 ObjectType::Table => "alter table",
399 ObjectType::View => "alter view",
400 ObjectType::MaterializedView => "alter materialized view",
401 ObjectType::Source => "alter source",
402 ObjectType::Sink => "alter sink",
403 ObjectType::Index => "alter index",
404 ObjectType::Type => "alter type",
405 ObjectType::Role => "alter role",
406 ObjectType::Cluster => "alter cluster",
407 ObjectType::ClusterReplica => "alter cluster replica",
408 ObjectType::Secret => "alter secret",
409 ObjectType::Connection => "alter connection",
410 ObjectType::Database => "alter database",
411 ObjectType::Schema => "alter schema",
412 ObjectType::Func => "alter function",
413 ObjectType::ContinualTask => "alter continual task",
414 ObjectType::NetworkPolicy => "alter network policy",
415 },
416 Plan::AlterCluster(_) => "alter cluster",
417 Plan::AlterClusterRename(_) => "alter cluster rename",
418 Plan::AlterClusterSwap(_) => "alter cluster swap",
419 Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
420 Plan::AlterSetCluster(_) => "alter set cluster",
421 Plan::AlterConnection(_) => "alter connection",
422 Plan::AlterSource(_) => "alter source",
423 Plan::AlterItemRename(_) => "rename item",
424 Plan::AlterSchemaRename(_) => "alter rename schema",
425 Plan::AlterSchemaSwap(_) => "alter swap schema",
426 Plan::AlterSecret(_) => "alter secret",
427 Plan::AlterSink(_) => "alter sink",
428 Plan::AlterSystemSet(_) => "alter system",
429 Plan::AlterSystemReset(_) => "alter system",
430 Plan::AlterSystemResetAll(_) => "alter system",
431 Plan::AlterRole(_) => "alter role",
432 Plan::AlterNetworkPolicy(_) => "alter network policy",
433 Plan::AlterOwner(plan) => match plan.object_type {
434 ObjectType::Table => "alter table owner",
435 ObjectType::View => "alter view owner",
436 ObjectType::MaterializedView => "alter materialized view owner",
437 ObjectType::Source => "alter source owner",
438 ObjectType::Sink => "alter sink owner",
439 ObjectType::Index => "alter index owner",
440 ObjectType::Type => "alter type owner",
441 ObjectType::Role => "alter role owner",
442 ObjectType::Cluster => "alter cluster owner",
443 ObjectType::ClusterReplica => "alter cluster replica owner",
444 ObjectType::Secret => "alter secret owner",
445 ObjectType::Connection => "alter connection owner",
446 ObjectType::Database => "alter database owner",
447 ObjectType::Schema => "alter schema owner",
448 ObjectType::Func => "alter function owner",
449 ObjectType::ContinualTask => "alter continual task owner",
450 ObjectType::NetworkPolicy => "alter network policy owner",
451 },
452 Plan::AlterTableAddColumn(_) => "alter table add column",
453 Plan::AlterMaterializedViewApplyReplacement(_) => {
454 "alter materialized view apply replacement"
455 }
456 Plan::Declare(_) => "declare",
457 Plan::Fetch(_) => "fetch",
458 Plan::Close(_) => "close",
459 Plan::ReadThenWrite(plan) => match plan.kind {
460 MutationKind::Insert => "insert into select",
461 MutationKind::Update => "update",
462 MutationKind::Delete => "delete",
463 },
464 Plan::Prepare(_) => "prepare",
465 Plan::Execute(_) => "execute",
466 Plan::Deallocate(_) => "deallocate",
467 Plan::Raise(_) => "raise",
468 Plan::GrantRole(_) => "grant role",
469 Plan::RevokeRole(_) => "revoke role",
470 Plan::GrantPrivileges(_) => "grant privilege",
471 Plan::RevokePrivileges(_) => "revoke privilege",
472 Plan::AlterDefaultPrivileges(_) => "alter default privileges",
473 Plan::ReassignOwned(_) => "reassign owned",
474 Plan::SideEffectingFunc(_) => "side effecting func",
475 Plan::ValidateConnection(_) => "validate connection",
476 Plan::AlterRetainHistory(_) => "alter retain history",
477 }
478 }
479
480 pub fn allowed_in_read_only(&self) -> bool {
486 match self {
487 Plan::SetVariable(_) => true,
490 Plan::ResetVariable(_) => true,
491 Plan::SetTransaction(_) => true,
492 Plan::StartTransaction(_) => true,
493 Plan::CommitTransaction(_) => true,
494 Plan::AbortTransaction(_) => true,
495 Plan::Select(_) => true,
496 Plan::EmptyQuery => true,
497 Plan::ShowAllVariables => true,
498 Plan::ShowCreate(_) => true,
499 Plan::ShowColumns(_) => true,
500 Plan::ShowVariable(_) => true,
501 Plan::InspectShard(_) => true,
502 Plan::Subscribe(_) => true,
503 Plan::CopyTo(_) => true,
504 Plan::ExplainPlan(_) => true,
505 Plan::ExplainPushdown(_) => true,
506 Plan::ExplainTimestamp(_) => true,
507 Plan::ExplainSinkSchema(_) => true,
508 Plan::ValidateConnection(_) => true,
509 _ => false,
510 }
511 }
512}
513
514#[derive(Debug)]
515pub struct StartTransactionPlan {
516 pub access: Option<TransactionAccessMode>,
517 pub isolation_level: Option<TransactionIsolationLevel>,
518}
519
520#[derive(Debug)]
521pub enum TransactionType {
522 Explicit,
523 Implicit,
524}
525
526impl TransactionType {
527 pub fn is_explicit(&self) -> bool {
528 matches!(self, TransactionType::Explicit)
529 }
530
531 pub fn is_implicit(&self) -> bool {
532 matches!(self, TransactionType::Implicit)
533 }
534}
535
536#[derive(Debug)]
537pub struct CommitTransactionPlan {
538 pub transaction_type: TransactionType,
539}
540
541#[derive(Debug)]
542pub struct AbortTransactionPlan {
543 pub transaction_type: TransactionType,
544}
545
546#[derive(Debug)]
547pub struct CreateDatabasePlan {
548 pub name: String,
549 pub if_not_exists: bool,
550}
551
552#[derive(Debug)]
553pub struct CreateSchemaPlan {
554 pub database_spec: ResolvedDatabaseSpecifier,
555 pub schema_name: String,
556 pub if_not_exists: bool,
557}
558
559#[derive(Debug)]
560pub struct CreateRolePlan {
561 pub name: String,
562 pub attributes: RoleAttributesRaw,
563}
564
565#[derive(Debug, PartialEq, Eq, Clone)]
566pub struct CreateClusterPlan {
567 pub name: String,
568 pub variant: CreateClusterVariant,
569 pub workload_class: Option<String>,
570}
571
572#[derive(Debug, PartialEq, Eq, Clone)]
573pub enum CreateClusterVariant {
574 Managed(CreateClusterManagedPlan),
575 Unmanaged(CreateClusterUnmanagedPlan),
576}
577
578#[derive(Debug, PartialEq, Eq, Clone)]
579pub struct CreateClusterUnmanagedPlan {
580 pub replicas: Vec<(String, ReplicaConfig)>,
581}
582
583#[derive(Debug, PartialEq, Eq, Clone)]
584pub struct CreateClusterManagedPlan {
585 pub replication_factor: u32,
586 pub size: String,
587 pub availability_zones: Vec<String>,
588 pub compute: ComputeReplicaConfig,
589 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
590 pub schedule: ClusterSchedule,
591}
592
593#[derive(Debug)]
594pub struct CreateClusterReplicaPlan {
595 pub cluster_id: ClusterId,
596 pub name: String,
597 pub config: ReplicaConfig,
598}
599
600#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq)]
602pub struct ComputeReplicaIntrospectionConfig {
603 pub debugging: bool,
605 pub interval: Duration,
607}
608
609#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
610pub struct ComputeReplicaConfig {
611 pub introspection: Option<ComputeReplicaIntrospectionConfig>,
612}
613
614#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
615pub enum ReplicaConfig {
616 Unorchestrated {
617 storagectl_addrs: Vec<String>,
618 computectl_addrs: Vec<String>,
619 compute: ComputeReplicaConfig,
620 },
621 Orchestrated {
622 size: String,
623 availability_zone: Option<String>,
624 compute: ComputeReplicaConfig,
625 internal: bool,
626 billed_as: Option<String>,
627 },
628}
629
630#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
631pub enum ClusterSchedule {
632 Manual,
634 Refresh { hydration_time_estimate: Duration },
638}
639
640impl Default for ClusterSchedule {
641 fn default() -> Self {
642 ClusterSchedule::Manual
644 }
645}
646
647#[derive(Debug)]
648pub struct CreateSourcePlan {
649 pub name: QualifiedItemName,
650 pub source: Source,
651 pub if_not_exists: bool,
652 pub timeline: Timeline,
653 pub in_cluster: Option<ClusterId>,
655}
656
657#[derive(Clone, Debug, PartialEq, Eq)]
658pub struct SourceReferences {
659 pub updated_at: u64,
660 pub references: Vec<SourceReference>,
661}
662
663#[derive(Clone, Debug, PartialEq, Eq)]
666pub struct SourceReference {
667 pub name: String,
668 pub namespace: Option<String>,
669 pub columns: Vec<String>,
670}
671
672#[derive(Debug)]
674pub struct CreateSourcePlanBundle {
675 pub item_id: CatalogItemId,
677 pub global_id: GlobalId,
679 pub plan: CreateSourcePlan,
681 pub resolved_ids: ResolvedIds,
683 pub available_source_references: Option<SourceReferences>,
687}
688
689#[derive(Debug)]
690pub struct CreateConnectionPlan {
691 pub name: QualifiedItemName,
692 pub if_not_exists: bool,
693 pub connection: Connection,
694 pub validate: bool,
695}
696
697#[derive(Debug)]
698pub struct ValidateConnectionPlan {
699 pub id: CatalogItemId,
701 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
703}
704
705#[derive(Debug)]
706pub struct CreateSecretPlan {
707 pub name: QualifiedItemName,
708 pub secret: Secret,
709 pub if_not_exists: bool,
710}
711
712#[derive(Debug)]
713pub struct CreateSinkPlan {
714 pub name: QualifiedItemName,
715 pub sink: Sink,
716 pub with_snapshot: bool,
717 pub if_not_exists: bool,
718 pub in_cluster: ClusterId,
719}
720
721#[derive(Debug)]
722pub struct CreateTablePlan {
723 pub name: QualifiedItemName,
724 pub table: Table,
725 pub if_not_exists: bool,
726}
727
728#[derive(Debug, Clone)]
729pub struct CreateViewPlan {
730 pub name: QualifiedItemName,
731 pub view: View,
732 pub replace: Option<CatalogItemId>,
734 pub drop_ids: Vec<CatalogItemId>,
736 pub if_not_exists: bool,
737 pub ambiguous_columns: bool,
740}
741
742#[derive(Debug, Clone)]
743pub struct CreateMaterializedViewPlan {
744 pub name: QualifiedItemName,
745 pub materialized_view: MaterializedView,
746 pub replace: Option<CatalogItemId>,
748 pub drop_ids: Vec<CatalogItemId>,
750 pub if_not_exists: bool,
751 pub ambiguous_columns: bool,
754}
755
756#[derive(Debug, Clone)]
757pub struct CreateContinualTaskPlan {
758 pub name: QualifiedItemName,
759 pub placeholder_id: Option<mz_expr::LocalId>,
762 pub desc: RelationDesc,
763 pub input_id: GlobalId,
765 pub with_snapshot: bool,
766 pub continual_task: MaterializedView,
768}
769
770#[derive(Debug, Clone)]
771pub struct CreateNetworkPolicyPlan {
772 pub name: String,
773 pub rules: Vec<NetworkPolicyRule>,
774}
775
776#[derive(Debug, Clone)]
777pub struct AlterNetworkPolicyPlan {
778 pub id: NetworkPolicyId,
779 pub name: String,
780 pub rules: Vec<NetworkPolicyRule>,
781}
782
783#[derive(Debug, Clone)]
784pub struct CreateIndexPlan {
785 pub name: QualifiedItemName,
786 pub index: Index,
787 pub if_not_exists: bool,
788}
789
790#[derive(Debug)]
791pub struct CreateTypePlan {
792 pub name: QualifiedItemName,
793 pub typ: Type,
794}
795
796#[derive(Debug)]
797pub struct DropObjectsPlan {
798 pub referenced_ids: Vec<ObjectId>,
800 pub drop_ids: Vec<ObjectId>,
802 pub object_type: ObjectType,
805}
806
807#[derive(Debug)]
808pub struct DropOwnedPlan {
809 pub role_ids: Vec<RoleId>,
811 pub drop_ids: Vec<ObjectId>,
813 pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
815 pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
817}
818
819#[derive(Debug)]
820pub struct ShowVariablePlan {
821 pub name: String,
822}
823
824#[derive(Debug)]
825pub struct InspectShardPlan {
826 pub id: GlobalId,
828}
829
830#[derive(Debug)]
831pub struct SetVariablePlan {
832 pub name: String,
833 pub value: VariableValue,
834 pub local: bool,
835}
836
837#[derive(Debug)]
838pub enum VariableValue {
839 Default,
840 Values(Vec<String>),
841}
842
843#[derive(Debug)]
844pub struct ResetVariablePlan {
845 pub name: String,
846}
847
848#[derive(Debug)]
849pub struct SetTransactionPlan {
850 pub local: bool,
851 pub modes: Vec<TransactionMode>,
852}
853
854#[derive(Clone, Debug)]
856pub struct SelectPlan {
857 pub select: Option<Box<SelectStatement<Aug>>>,
860 pub source: HirRelationExpr,
862 pub when: QueryWhen,
864 pub finishing: RowSetFinishing,
866 pub copy_to: Option<CopyFormat>,
868}
869
870impl SelectPlan {
871 pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
872 let arity = typ.arity();
873 SelectPlan {
874 select: None,
875 source: HirRelationExpr::Constant { rows, typ },
876 when: QueryWhen::Immediately,
877 finishing: RowSetFinishing::trivial(arity),
878 copy_to: None,
879 }
880 }
881}
882
883#[derive(Debug)]
884pub enum SubscribeOutput {
885 Diffs,
886 WithinTimestampOrderBy {
887 order_by: Vec<ColumnOrder>,
889 },
890 EnvelopeUpsert {
891 order_by_keys: Vec<ColumnOrder>,
893 },
894 EnvelopeDebezium {
895 order_by_keys: Vec<ColumnOrder>,
897 },
898}
899
900#[derive(Debug)]
901pub struct SubscribePlan {
902 pub from: SubscribeFrom,
903 pub with_snapshot: bool,
904 pub when: QueryWhen,
905 pub up_to: Option<Timestamp>,
906 pub copy_to: Option<CopyFormat>,
907 pub emit_progress: bool,
908 pub output: SubscribeOutput,
909}
910
911#[derive(Debug, Clone)]
912pub enum SubscribeFrom {
913 Id(GlobalId),
915 Query {
917 expr: MirRelationExpr,
918 desc: RelationDesc,
919 },
920}
921
922impl SubscribeFrom {
923 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
924 match self {
925 SubscribeFrom::Id(id) => BTreeSet::from([*id]),
926 SubscribeFrom::Query { expr, .. } => expr.depends_on(),
927 }
928 }
929
930 pub fn contains_temporal(&self) -> bool {
931 match self {
932 SubscribeFrom::Id(_) => false,
933 SubscribeFrom::Query { expr, .. } => expr.contains_temporal(),
934 }
935 }
936}
937
938#[derive(Debug)]
939pub struct ShowCreatePlan {
940 pub id: ObjectId,
941 pub row: Row,
942}
943
944#[derive(Debug)]
945pub struct ShowColumnsPlan {
946 pub id: CatalogItemId,
947 pub select_plan: SelectPlan,
948 pub new_resolved_ids: ResolvedIds,
949}
950
951#[derive(Debug)]
952pub struct CopyFromPlan {
953 pub target_id: CatalogItemId,
955 pub target_name: String,
957 pub source: CopyFromSource,
959 pub columns: Vec<ColumnIndex>,
963 pub source_desc: RelationDesc,
965 pub mfp: MapFilterProject,
967 pub params: CopyFormatParams<'static>,
969 pub filter: Option<CopyFromFilter>,
971}
972
973#[derive(Debug)]
974pub enum CopyFromSource {
975 Stdin,
977 Url(HirScalarExpr),
981 AwsS3 {
983 uri: HirScalarExpr,
985 connection: AwsConnection,
987 connection_id: CatalogItemId,
989 },
990}
991
992#[derive(Debug)]
993pub enum CopyFromFilter {
994 Files(Vec<String>),
995 Pattern(String),
996}
997
998#[derive(Debug, Clone)]
1003pub struct CopyToPlan {
1004 pub select_plan: SelectPlan,
1006 pub desc: RelationDesc,
1007 pub to: HirScalarExpr,
1009 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
1010 pub connection_id: CatalogItemId,
1012 pub format: S3SinkFormat,
1013 pub max_file_size: u64,
1014}
1015
1016#[derive(Clone, Debug)]
1017pub struct ExplainPlanPlan {
1018 pub stage: ExplainStage,
1019 pub format: ExplainFormat,
1020 pub config: ExplainConfig,
1021 pub explainee: Explainee,
1022}
1023
1024#[derive(Clone, Debug)]
1026pub enum Explainee {
1027 View(CatalogItemId),
1029 MaterializedView(CatalogItemId),
1031 Index(CatalogItemId),
1033 ReplanView(CatalogItemId),
1035 ReplanMaterializedView(CatalogItemId),
1037 ReplanIndex(CatalogItemId),
1039 Statement(ExplaineeStatement),
1041}
1042
1043#[derive(Clone, Debug, EnumKind)]
1045#[enum_kind(ExplaineeStatementKind)]
1046pub enum ExplaineeStatement {
1047 Select {
1049 broken: bool,
1051 plan: plan::SelectPlan,
1052 desc: RelationDesc,
1053 },
1054 CreateView {
1056 broken: bool,
1058 plan: plan::CreateViewPlan,
1059 },
1060 CreateMaterializedView {
1062 broken: bool,
1064 plan: plan::CreateMaterializedViewPlan,
1065 },
1066 CreateIndex {
1068 broken: bool,
1070 plan: plan::CreateIndexPlan,
1071 },
1072}
1073
1074impl ExplaineeStatement {
1075 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1076 match self {
1077 Self::Select { plan, .. } => plan.source.depends_on(),
1078 Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1079 Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1080 Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1081 }
1082 }
1083
1084 pub fn broken(&self) -> bool {
1095 match self {
1096 Self::Select { broken, .. } => *broken,
1097 Self::CreateView { broken, .. } => *broken,
1098 Self::CreateMaterializedView { broken, .. } => *broken,
1099 Self::CreateIndex { broken, .. } => *broken,
1100 }
1101 }
1102}
1103
1104impl ExplaineeStatementKind {
1105 pub fn supports(&self, stage: &ExplainStage) -> bool {
1106 use ExplainStage::*;
1107 match self {
1108 Self::Select => true,
1109 Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1110 Self::CreateMaterializedView => true,
1111 Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1112 }
1113 }
1114}
1115
1116impl std::fmt::Display for ExplaineeStatementKind {
1117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1118 match self {
1119 Self::Select => write!(f, "SELECT"),
1120 Self::CreateView => write!(f, "CREATE VIEW"),
1121 Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1122 Self::CreateIndex => write!(f, "CREATE INDEX"),
1123 }
1124 }
1125}
1126
1127#[derive(Clone, Debug)]
1128pub struct ExplainPushdownPlan {
1129 pub explainee: Explainee,
1130}
1131
1132#[derive(Clone, Debug)]
1133pub struct ExplainTimestampPlan {
1134 pub format: ExplainFormat,
1135 pub raw_plan: HirRelationExpr,
1136 pub when: QueryWhen,
1137}
1138
1139#[derive(Debug)]
1140pub struct ExplainSinkSchemaPlan {
1141 pub sink_from: GlobalId,
1142 pub json_schema: String,
1143}
1144
1145#[derive(Debug)]
1146pub struct SendDiffsPlan {
1147 pub id: CatalogItemId,
1148 pub updates: Vec<(Row, Diff)>,
1149 pub kind: MutationKind,
1150 pub returning: Vec<(Row, NonZeroUsize)>,
1151 pub max_result_size: u64,
1152}
1153
1154#[derive(Debug)]
1155pub struct InsertPlan {
1156 pub id: CatalogItemId,
1157 pub values: HirRelationExpr,
1158 pub returning: Vec<mz_expr::MirScalarExpr>,
1159}
1160
1161#[derive(Debug)]
1162pub struct ReadThenWritePlan {
1163 pub id: CatalogItemId,
1164 pub selection: HirRelationExpr,
1165 pub finishing: RowSetFinishing,
1166 pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1167 pub kind: MutationKind,
1168 pub returning: Vec<mz_expr::MirScalarExpr>,
1169}
1170
1171#[derive(Debug)]
1173pub struct AlterNoopPlan {
1174 pub object_type: ObjectType,
1175}
1176
1177#[derive(Debug)]
1178pub struct AlterSetClusterPlan {
1179 pub id: CatalogItemId,
1180 pub set_cluster: ClusterId,
1181}
1182
1183#[derive(Debug)]
1184pub struct AlterRetainHistoryPlan {
1185 pub id: CatalogItemId,
1186 pub value: Option<Value>,
1187 pub window: CompactionWindow,
1188 pub object_type: ObjectType,
1189}
1190
1191#[derive(Debug, Clone)]
1192
1193pub enum AlterOptionParameter<T = String> {
1194 Set(T),
1195 Reset,
1196 Unchanged,
1197}
1198
1199#[derive(Debug)]
1200pub enum AlterConnectionAction {
1201 RotateKeys,
1202 AlterOptions {
1203 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1204 drop_options: BTreeSet<ConnectionOptionName>,
1205 validate: bool,
1206 },
1207}
1208
1209#[derive(Debug)]
1210pub struct AlterConnectionPlan {
1211 pub id: CatalogItemId,
1212 pub action: AlterConnectionAction,
1213}
1214
1215#[derive(Debug)]
1216pub enum AlterSourceAction {
1217 AddSubsourceExports {
1218 subsources: Vec<CreateSourcePlanBundle>,
1219 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1220 },
1221 RefreshReferences {
1222 references: SourceReferences,
1223 },
1224}
1225
1226#[derive(Debug)]
1227pub struct AlterSourcePlan {
1228 pub item_id: CatalogItemId,
1229 pub ingestion_id: GlobalId,
1230 pub action: AlterSourceAction,
1231}
1232
1233#[derive(Debug, Clone)]
1234pub struct AlterSinkPlan {
1235 pub item_id: CatalogItemId,
1236 pub global_id: GlobalId,
1237 pub sink: Sink,
1238 pub with_snapshot: bool,
1239 pub in_cluster: ClusterId,
1240}
1241
1242#[derive(Debug, Clone)]
1243pub struct AlterClusterPlan {
1244 pub id: ClusterId,
1245 pub name: String,
1246 pub options: PlanClusterOption,
1247 pub strategy: AlterClusterPlanStrategy,
1248}
1249
1250#[derive(Debug)]
1251pub struct AlterClusterRenamePlan {
1252 pub id: ClusterId,
1253 pub name: String,
1254 pub to_name: String,
1255}
1256
1257#[derive(Debug)]
1258pub struct AlterClusterReplicaRenamePlan {
1259 pub cluster_id: ClusterId,
1260 pub replica_id: ReplicaId,
1261 pub name: QualifiedReplica,
1262 pub to_name: String,
1263}
1264
1265#[derive(Debug)]
1266pub struct AlterItemRenamePlan {
1267 pub id: CatalogItemId,
1268 pub current_full_name: FullItemName,
1269 pub to_name: String,
1270 pub object_type: ObjectType,
1271}
1272
1273#[derive(Debug)]
1274pub struct AlterSchemaRenamePlan {
1275 pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1276 pub new_schema_name: String,
1277}
1278
1279#[derive(Debug)]
1280pub struct AlterSchemaSwapPlan {
1281 pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1282 pub schema_a_name: String,
1283 pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1284 pub schema_b_name: String,
1285 pub name_temp: String,
1286}
1287
1288#[derive(Debug)]
1289pub struct AlterClusterSwapPlan {
1290 pub id_a: ClusterId,
1291 pub id_b: ClusterId,
1292 pub name_a: String,
1293 pub name_b: String,
1294 pub name_temp: String,
1295}
1296
1297#[derive(Debug)]
1298pub struct AlterSecretPlan {
1299 pub id: CatalogItemId,
1300 pub secret_as: MirScalarExpr,
1301}
1302
1303#[derive(Debug)]
1304pub struct AlterSystemSetPlan {
1305 pub name: String,
1306 pub value: VariableValue,
1307}
1308
1309#[derive(Debug)]
1310pub struct AlterSystemResetPlan {
1311 pub name: String,
1312}
1313
1314#[derive(Debug)]
1315pub struct AlterSystemResetAllPlan {}
1316
1317#[derive(Debug)]
1318pub struct AlterRolePlan {
1319 pub id: RoleId,
1320 pub name: String,
1321 pub option: PlannedAlterRoleOption,
1322}
1323
1324#[derive(Debug)]
1325pub struct AlterOwnerPlan {
1326 pub id: ObjectId,
1327 pub object_type: ObjectType,
1328 pub new_owner: RoleId,
1329}
1330
1331#[derive(Debug)]
1332pub struct AlterTablePlan {
1333 pub relation_id: CatalogItemId,
1334 pub column_name: ColumnName,
1335 pub column_type: SqlColumnType,
1336 pub raw_sql_type: RawDataType,
1337}
1338
1339#[derive(Debug)]
1340pub struct AlterMaterializedViewApplyReplacementPlan {
1341 pub id: CatalogItemId,
1342 pub replacement_id: CatalogItemId,
1343}
1344
1345#[derive(Debug)]
1346pub struct DeclarePlan {
1347 pub name: String,
1348 pub stmt: Statement<Raw>,
1349 pub sql: String,
1350 pub params: Params,
1351}
1352
1353#[derive(Debug)]
1354pub struct FetchPlan {
1355 pub name: String,
1356 pub count: Option<FetchDirection>,
1357 pub timeout: ExecuteTimeout,
1358}
1359
1360#[derive(Debug)]
1361pub struct ClosePlan {
1362 pub name: String,
1363}
1364
1365#[derive(Debug)]
1366pub struct PreparePlan {
1367 pub name: String,
1368 pub stmt: Statement<Raw>,
1369 pub sql: String,
1370 pub desc: StatementDesc,
1371}
1372
1373#[derive(Debug)]
1374pub struct ExecutePlan {
1375 pub name: String,
1376 pub params: Params,
1377}
1378
1379#[derive(Debug)]
1380pub struct DeallocatePlan {
1381 pub name: Option<String>,
1382}
1383
1384#[derive(Debug)]
1385pub struct RaisePlan {
1386 pub severity: NoticeSeverity,
1387}
1388
1389#[derive(Debug)]
1390pub struct GrantRolePlan {
1391 pub role_ids: Vec<RoleId>,
1393 pub member_ids: Vec<RoleId>,
1395 pub grantor_id: RoleId,
1397}
1398
1399#[derive(Debug)]
1400pub struct RevokeRolePlan {
1401 pub role_ids: Vec<RoleId>,
1403 pub member_ids: Vec<RoleId>,
1405 pub grantor_id: RoleId,
1407}
1408
1409#[derive(Debug)]
1410pub struct UpdatePrivilege {
1411 pub acl_mode: AclMode,
1413 pub target_id: SystemObjectId,
1415 pub grantor: RoleId,
1417}
1418
1419#[derive(Debug)]
1420pub struct GrantPrivilegesPlan {
1421 pub update_privileges: Vec<UpdatePrivilege>,
1423 pub grantees: Vec<RoleId>,
1425}
1426
1427#[derive(Debug)]
1428pub struct RevokePrivilegesPlan {
1429 pub update_privileges: Vec<UpdatePrivilege>,
1431 pub revokees: Vec<RoleId>,
1433}
1434#[derive(Debug)]
1435pub struct AlterDefaultPrivilegesPlan {
1436 pub privilege_objects: Vec<DefaultPrivilegeObject>,
1438 pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1440 pub is_grant: bool,
1442}
1443
1444#[derive(Debug)]
1445pub struct ReassignOwnedPlan {
1446 pub old_roles: Vec<RoleId>,
1448 pub new_role: RoleId,
1450 pub reassign_ids: Vec<ObjectId>,
1452}
1453
1454#[derive(Debug)]
1455pub struct CommentPlan {
1456 pub object_id: CommentObjectId,
1458 pub sub_component: Option<usize>,
1462 pub comment: Option<String>,
1464}
1465
1466#[derive(Clone, Debug)]
1467pub enum TableDataSource {
1468 TableWrites { defaults: Vec<Expr<Aug>> },
1470
1471 DataSource {
1474 desc: DataSourceDesc,
1475 timeline: Timeline,
1476 },
1477}
1478
1479#[derive(Clone, Debug)]
1480pub struct Table {
1481 pub create_sql: String,
1482 pub desc: VersionedRelationDesc,
1483 pub temporary: bool,
1484 pub compaction_window: Option<CompactionWindow>,
1485 pub data_source: TableDataSource,
1486}
1487
1488#[derive(Clone, Debug)]
1489pub struct Source {
1490 pub create_sql: String,
1491 pub data_source: DataSourceDesc,
1492 pub desc: RelationDesc,
1493 pub compaction_window: Option<CompactionWindow>,
1494}
1495
1496#[derive(Debug, Clone)]
1497pub enum DataSourceDesc {
1498 Ingestion(SourceDesc<ReferencedConnection>),
1500 OldSyntaxIngestion {
1502 desc: SourceDesc<ReferencedConnection>,
1503 progress_subsource: CatalogItemId,
1506 data_config: SourceExportDataConfig<ReferencedConnection>,
1507 details: SourceExportDetails,
1508 },
1509 IngestionExport {
1512 ingestion_id: CatalogItemId,
1513 external_reference: UnresolvedItemName,
1514 details: SourceExportDetails,
1515 data_config: SourceExportDataConfig<ReferencedConnection>,
1516 },
1517 Progress,
1519 Webhook {
1521 validate_using: Option<WebhookValidation>,
1522 body_format: WebhookBodyFormat,
1523 headers: WebhookHeaders,
1524 cluster_id: Option<StorageInstanceId>,
1526 },
1527}
1528
1529#[derive(Clone, Debug, Serialize)]
1530pub struct WebhookValidation {
1531 pub expression: MirScalarExpr,
1533 pub relation_desc: RelationDesc,
1535 pub bodies: Vec<(usize, bool)>,
1537 pub headers: Vec<(usize, bool)>,
1539 pub secrets: Vec<WebhookValidationSecret>,
1541}
1542
1543impl WebhookValidation {
1544 const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1545
1546 pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1551 let WebhookValidation {
1552 expression,
1553 relation_desc,
1554 ..
1555 } = self;
1556
1557 let mut expression_ = expression.clone();
1559 let desc_ = relation_desc.clone();
1560 let reduce_task = mz_ore::task::spawn_blocking(
1561 || "webhook-validation-reduce",
1562 move || {
1563 expression_.reduce(&desc_.typ().column_types);
1564 expression_
1565 },
1566 );
1567
1568 match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1569 Ok(reduced_expr) => {
1570 *expression = reduced_expr;
1571 Ok(())
1572 }
1573 Err(_) => Err("timeout"),
1574 }
1575 }
1576}
1577
1578#[derive(Clone, Debug, Default, Serialize)]
1579pub struct WebhookHeaders {
1580 pub header_column: Option<WebhookHeaderFilters>,
1582 pub mapped_headers: BTreeMap<usize, (String, bool)>,
1584}
1585
1586impl WebhookHeaders {
1587 pub fn num_columns(&self) -> usize {
1589 let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1590 let mapped_headers = self.mapped_headers.len();
1591
1592 header_column + mapped_headers
1593 }
1594}
1595
1596#[derive(Clone, Debug, Default, Serialize)]
1597pub struct WebhookHeaderFilters {
1598 pub block: BTreeSet<String>,
1599 pub allow: BTreeSet<String>,
1600}
1601
1602#[derive(Copy, Clone, Debug, Serialize, Arbitrary)]
1603pub enum WebhookBodyFormat {
1604 Json { array: bool },
1605 Bytes,
1606 Text,
1607}
1608
1609impl From<WebhookBodyFormat> for SqlScalarType {
1610 fn from(value: WebhookBodyFormat) -> Self {
1611 match value {
1612 WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1613 WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1614 WebhookBodyFormat::Text => SqlScalarType::String,
1615 }
1616 }
1617}
1618
1619#[derive(Clone, Debug, Serialize)]
1620pub struct WebhookValidationSecret {
1621 pub id: CatalogItemId,
1623 pub column_idx: usize,
1625 pub use_bytes: bool,
1627}
1628
1629#[derive(Clone, Debug)]
1630pub struct Connection {
1631 pub create_sql: String,
1632 pub details: ConnectionDetails,
1633}
1634
1635#[derive(Clone, Debug, Serialize)]
1636pub enum ConnectionDetails {
1637 Kafka(KafkaConnection<ReferencedConnection>),
1638 Csr(CsrConnection<ReferencedConnection>),
1639 Postgres(PostgresConnection<ReferencedConnection>),
1640 Ssh {
1641 connection: SshConnection,
1642 key_1: SshKey,
1643 key_2: SshKey,
1644 },
1645 Aws(AwsConnection),
1646 AwsPrivatelink(AwsPrivatelinkConnection),
1647 MySql(MySqlConnection<ReferencedConnection>),
1648 SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1649 IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1650}
1651
1652impl ConnectionDetails {
1653 pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1654 match self {
1655 ConnectionDetails::Kafka(c) => {
1656 mz_storage_types::connections::Connection::Kafka(c.clone())
1657 }
1658 ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1659 ConnectionDetails::Postgres(c) => {
1660 mz_storage_types::connections::Connection::Postgres(c.clone())
1661 }
1662 ConnectionDetails::Ssh { connection, .. } => {
1663 mz_storage_types::connections::Connection::Ssh(connection.clone())
1664 }
1665 ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1666 ConnectionDetails::AwsPrivatelink(c) => {
1667 mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1668 }
1669 ConnectionDetails::MySql(c) => {
1670 mz_storage_types::connections::Connection::MySql(c.clone())
1671 }
1672 ConnectionDetails::SqlServer(c) => {
1673 mz_storage_types::connections::Connection::SqlServer(c.clone())
1674 }
1675 ConnectionDetails::IcebergCatalog(c) => {
1676 mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1677 }
1678 }
1679 }
1680}
1681
1682#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1683pub struct NetworkPolicyRule {
1684 pub name: String,
1685 pub action: NetworkPolicyRuleAction,
1686 pub address: PolicyAddress,
1687 pub direction: NetworkPolicyRuleDirection,
1688}
1689
1690#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1691pub enum NetworkPolicyRuleAction {
1692 Allow,
1693}
1694
1695impl std::fmt::Display for NetworkPolicyRuleAction {
1696 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1697 match self {
1698 Self::Allow => write!(f, "allow"),
1699 }
1700 }
1701}
1702impl TryFrom<&str> for NetworkPolicyRuleAction {
1703 type Error = PlanError;
1704 fn try_from(value: &str) -> Result<Self, Self::Error> {
1705 match value.to_uppercase().as_str() {
1706 "ALLOW" => Ok(Self::Allow),
1707 _ => Err(PlanError::Unstructured(
1708 "Allow is the only valid option".into(),
1709 )),
1710 }
1711 }
1712}
1713
1714#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1715pub enum NetworkPolicyRuleDirection {
1716 Ingress,
1717}
1718impl std::fmt::Display for NetworkPolicyRuleDirection {
1719 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1720 match self {
1721 Self::Ingress => write!(f, "ingress"),
1722 }
1723 }
1724}
1725impl TryFrom<&str> for NetworkPolicyRuleDirection {
1726 type Error = PlanError;
1727 fn try_from(value: &str) -> Result<Self, Self::Error> {
1728 match value.to_uppercase().as_str() {
1729 "INGRESS" => Ok(Self::Ingress),
1730 _ => Err(PlanError::Unstructured(
1731 "Ingress is the only valid option".into(),
1732 )),
1733 }
1734 }
1735}
1736
1737#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1738pub struct PolicyAddress(pub IpNet);
1739impl std::fmt::Display for PolicyAddress {
1740 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1741 write!(f, "{}", &self.0.to_string())
1742 }
1743}
1744impl From<String> for PolicyAddress {
1745 fn from(value: String) -> Self {
1746 Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1747 }
1748}
1749impl TryFrom<&str> for PolicyAddress {
1750 type Error = PlanError;
1751 fn try_from(value: &str) -> Result<Self, Self::Error> {
1752 let net = IpNet::from_str(value)
1753 .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1754 Ok(Self(net))
1755 }
1756}
1757
1758impl Serialize for PolicyAddress {
1759 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1760 where
1761 S: serde::Serializer,
1762 {
1763 serializer.serialize_str(&format!("{}", &self.0))
1764 }
1765}
1766
1767#[derive(Clone, Debug, Serialize)]
1768pub enum SshKey {
1769 PublicOnly(String),
1770 Both(SshKeyPair),
1771}
1772
1773impl SshKey {
1774 pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1775 match self {
1776 SshKey::PublicOnly(_) => None,
1777 SshKey::Both(key_pair) => Some(key_pair),
1778 }
1779 }
1780
1781 pub fn public_key(&self) -> String {
1782 match self {
1783 SshKey::PublicOnly(s) => s.into(),
1784 SshKey::Both(p) => p.ssh_public_key(),
1785 }
1786 }
1787}
1788
1789#[derive(Clone, Debug)]
1790pub struct Secret {
1791 pub create_sql: String,
1792 pub secret_as: MirScalarExpr,
1793}
1794
1795#[derive(Clone, Debug)]
1796pub struct Sink {
1797 pub create_sql: String,
1799 pub from: GlobalId,
1801 pub connection: StorageSinkConnection<ReferencedConnection>,
1803 pub envelope: SinkEnvelope,
1805 pub version: u64,
1806}
1807
1808#[derive(Clone, Debug)]
1809pub struct View {
1810 pub create_sql: String,
1812 pub expr: HirRelationExpr,
1814 pub dependencies: DependencyIds,
1816 pub column_names: Vec<ColumnName>,
1818 pub temporary: bool,
1820}
1821
1822#[derive(Clone, Debug)]
1823pub struct MaterializedView {
1824 pub create_sql: String,
1826 pub expr: HirRelationExpr,
1828 pub dependencies: DependencyIds,
1830 pub column_names: Vec<ColumnName>,
1832 pub replacement_target: Option<CatalogItemId>,
1833 pub cluster_id: ClusterId,
1835 pub non_null_assertions: Vec<usize>,
1836 pub compaction_window: Option<CompactionWindow>,
1837 pub refresh_schedule: Option<RefreshSchedule>,
1838 pub as_of: Option<Timestamp>,
1839}
1840
1841#[derive(Clone, Debug)]
1842pub struct Index {
1843 pub create_sql: String,
1845 pub on: GlobalId,
1847 pub keys: Vec<mz_expr::MirScalarExpr>,
1848 pub compaction_window: Option<CompactionWindow>,
1849 pub cluster_id: ClusterId,
1850}
1851
1852#[derive(Clone, Debug)]
1853pub struct Type {
1854 pub create_sql: String,
1855 pub inner: CatalogType<IdReference>,
1856}
1857
1858#[derive(Deserialize, Clone, Debug, PartialEq)]
1860pub enum QueryWhen {
1861 Immediately,
1864 FreshestTableWrite,
1867 AtTimestamp(Timestamp),
1872 AtLeastTimestamp(Timestamp),
1875}
1876
1877impl QueryWhen {
1878 pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1880 match self {
1881 QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1882 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1883 }
1884 }
1885 pub fn constrains_upper(&self) -> bool {
1889 match self {
1890 QueryWhen::AtTimestamp(_) => true,
1891 QueryWhen::AtLeastTimestamp(_)
1892 | QueryWhen::Immediately
1893 | QueryWhen::FreshestTableWrite => false,
1894 }
1895 }
1896 pub fn advance_to_since(&self) -> bool {
1898 match self {
1899 QueryWhen::Immediately
1900 | QueryWhen::AtLeastTimestamp(_)
1901 | QueryWhen::FreshestTableWrite => true,
1902 QueryWhen::AtTimestamp(_) => false,
1903 }
1904 }
1905 pub fn can_advance_to_upper(&self) -> bool {
1907 match self {
1908 QueryWhen::Immediately => true,
1909 QueryWhen::FreshestTableWrite
1910 | QueryWhen::AtTimestamp(_)
1911 | QueryWhen::AtLeastTimestamp(_) => false,
1912 }
1913 }
1914
1915 pub fn can_advance_to_timeline_ts(&self) -> bool {
1917 match self {
1918 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1919 QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1920 }
1921 }
1922 pub fn must_advance_to_timeline_ts(&self) -> bool {
1924 match self {
1925 QueryWhen::FreshestTableWrite => true,
1926 QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1927 false
1928 }
1929 }
1930 }
1931 pub fn is_transactional(&self) -> bool {
1933 match self {
1934 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1935 QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1936 }
1937 }
1938}
1939
1940#[derive(Debug, Copy, Clone)]
1941pub enum MutationKind {
1942 Insert,
1943 Update,
1944 Delete,
1945}
1946
1947#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1948pub enum CopyFormat {
1949 Text,
1950 Csv,
1951 Binary,
1952 Parquet,
1953}
1954
1955#[derive(Debug, Copy, Clone)]
1956pub enum ExecuteTimeout {
1957 None,
1958 Seconds(f64),
1959 WaitOnce,
1960}
1961
1962#[derive(Clone, Debug)]
1963pub enum IndexOption {
1964 RetainHistory(CompactionWindow),
1966}
1967
1968#[derive(Clone, Debug)]
1969pub enum TableOption {
1970 RetainHistory(CompactionWindow),
1972}
1973
1974#[derive(Clone, Debug)]
1975pub struct PlanClusterOption {
1976 pub availability_zones: AlterOptionParameter<Vec<String>>,
1977 pub introspection_debugging: AlterOptionParameter<bool>,
1978 pub introspection_interval: AlterOptionParameter<OptionalDuration>,
1979 pub managed: AlterOptionParameter<bool>,
1980 pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
1981 pub replication_factor: AlterOptionParameter<u32>,
1982 pub size: AlterOptionParameter,
1983 pub schedule: AlterOptionParameter<ClusterSchedule>,
1984 pub workload_class: AlterOptionParameter<Option<String>>,
1985}
1986
1987impl Default for PlanClusterOption {
1988 fn default() -> Self {
1989 Self {
1990 availability_zones: AlterOptionParameter::Unchanged,
1991 introspection_debugging: AlterOptionParameter::Unchanged,
1992 introspection_interval: AlterOptionParameter::Unchanged,
1993 managed: AlterOptionParameter::Unchanged,
1994 replicas: AlterOptionParameter::Unchanged,
1995 replication_factor: AlterOptionParameter::Unchanged,
1996 size: AlterOptionParameter::Unchanged,
1997 schedule: AlterOptionParameter::Unchanged,
1998 workload_class: AlterOptionParameter::Unchanged,
1999 }
2000 }
2001}
2002
2003#[derive(Clone, Debug, PartialEq, Eq)]
2004pub enum AlterClusterPlanStrategy {
2005 None,
2006 For(Duration),
2007 UntilReady {
2008 on_timeout: OnTimeoutAction,
2009 timeout: Duration,
2010 },
2011}
2012
2013#[derive(Clone, Debug, PartialEq, Eq)]
2014pub enum OnTimeoutAction {
2015 Commit,
2016 Rollback,
2017}
2018
2019impl Default for OnTimeoutAction {
2020 fn default() -> Self {
2021 Self::Commit
2022 }
2023}
2024
2025impl TryFrom<&str> for OnTimeoutAction {
2026 type Error = PlanError;
2027 fn try_from(value: &str) -> Result<Self, Self::Error> {
2028 match value.to_uppercase().as_str() {
2029 "COMMIT" => Ok(Self::Commit),
2030 "ROLLBACK" => Ok(Self::Rollback),
2031 _ => Err(PlanError::Unstructured(
2032 "Valid options are COMMIT, ROLLBACK".into(),
2033 )),
2034 }
2035 }
2036}
2037
2038impl AlterClusterPlanStrategy {
2039 pub fn is_none(&self) -> bool {
2040 matches!(self, Self::None)
2041 }
2042 pub fn is_some(&self) -> bool {
2043 !matches!(self, Self::None)
2044 }
2045}
2046
2047impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2048 type Error = PlanError;
2049
2050 fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2051 Ok(match value.wait {
2052 Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2053 Some(ClusterAlterOptionValue::UntilReady(options)) => {
2054 let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2055 Self::UntilReady {
2056 timeout: match extracted.timeout {
2057 Some(d) => d,
2058 None => Err(PlanError::UntilReadyTimeoutRequired)?,
2059 },
2060 on_timeout: match extracted.on_timeout {
2061 Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2062 PlanError::InvalidOptionValue {
2063 option_name: "ON TIMEOUT".into(),
2064 err: Box::new(e),
2065 }
2066 })?,
2067 None => OnTimeoutAction::default(),
2068 },
2069 }
2070 }
2071 None => Self::None,
2072 })
2073 }
2074}
2075
2076#[derive(Debug, Clone)]
2078pub struct Params {
2079 pub datums: Row,
2081 pub execute_types: Vec<SqlScalarType>,
2083 pub expected_types: Vec<SqlScalarType>,
2085}
2086
2087impl Params {
2088 pub fn empty() -> Params {
2090 Params {
2091 datums: Row::pack_slice(&[]),
2092 execute_types: vec![],
2093 expected_types: vec![],
2094 }
2095 }
2096}
2097
2098#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, Copy)]
2100pub struct PlanContext {
2101 pub wall_time: DateTime<Utc>,
2102 pub ignore_if_exists_errors: bool,
2103}
2104
2105impl PlanContext {
2106 pub fn new(wall_time: DateTime<Utc>) -> Self {
2107 Self {
2108 wall_time,
2109 ignore_if_exists_errors: false,
2110 }
2111 }
2112
2113 pub fn zero() -> Self {
2117 PlanContext {
2118 wall_time: now::to_datetime(NOW_ZERO()),
2119 ignore_if_exists_errors: false,
2120 }
2121 }
2122
2123 pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2124 self.ignore_if_exists_errors = value;
2125 self
2126 }
2127}