1use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{
41 CollectionPlan, ColumnOrder, MapFilterProject, MirRelationExpr, MirScalarExpr, RowSetFinishing,
42};
43use mz_ore::now::{self, NOW_ZERO};
44use mz_pgcopy::CopyFormatParams;
45use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
46use mz_repr::explain::{ExplainConfig, ExplainFormat};
47use mz_repr::network_policy_id::NetworkPolicyId;
48use mz_repr::optimize::OptimizerFeatureOverrides;
49use mz_repr::refresh_schedule::RefreshSchedule;
50use mz_repr::role_id::RoleId;
51use mz_repr::{
52 CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, Row, SqlColumnType,
53 SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
54};
55use mz_sql_parser::ast::{
56 AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
57 RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
58 Value, WithOptionValue,
59};
60use mz_ssh_util::keys::SshKeyPair;
61use mz_storage_types::connections::aws::AwsConnection;
62use mz_storage_types::connections::inline::ReferencedConnection;
63use mz_storage_types::connections::{
64 AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
65 MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
66};
67use mz_storage_types::instances::StorageInstanceId;
68use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
69use mz_storage_types::sources::{
70 SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
71};
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74
75use crate::ast::{
76 ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
77 TransactionAccessMode,
78};
79use crate::catalog::{
80 CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
81 RoleAttributesRaw,
82};
83use crate::names::{
84 Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
85 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
86};
87
88pub(crate) mod error;
89pub(crate) mod explain;
90pub(crate) mod hir;
91pub(crate) mod literal;
92pub(crate) mod lowering;
93pub(crate) mod notice;
94pub(crate) mod plan_utils;
95pub(crate) mod query;
96pub(crate) mod scope;
97pub(crate) mod side_effecting_func;
98pub(crate) mod statement;
99pub(crate) mod transform_ast;
100pub(crate) mod transform_hir;
101pub(crate) mod typeconv;
102pub(crate) mod with_options;
103
104use crate::plan;
105use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
106use crate::plan::with_options::OptionalDuration;
107pub use error::PlanError;
108pub use explain::normalize_subqueries;
109pub use hir::{
110 AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
111 WindowExprType,
112};
113pub use lowering::Config as HirToMirConfig;
114pub use notice::PlanNotice;
115pub use query::{ExprContext, QueryContext, QueryLifetime};
116pub use scope::Scope;
117pub use side_effecting_func::SideEffectingFunc;
118pub use statement::ddl::{
119 AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
120 PlannedAlterRoleOption, PlannedRoleVariable, SqlServerConfigOptionExtracted,
121};
122pub use statement::{
123 StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
124 resolve_cluster_for_materialized_view,
125};
126
127use self::statement::ddl::ClusterAlterOptionExtracted;
128use self::with_options::TryFromValue;
129
130#[derive(Debug, EnumKind)]
132#[enum_kind(PlanKind)]
133pub enum Plan {
134 CreateConnection(CreateConnectionPlan),
135 CreateDatabase(CreateDatabasePlan),
136 CreateSchema(CreateSchemaPlan),
137 CreateRole(CreateRolePlan),
138 CreateCluster(CreateClusterPlan),
139 CreateClusterReplica(CreateClusterReplicaPlan),
140 CreateSource(CreateSourcePlan),
141 CreateSources(Vec<CreateSourcePlanBundle>),
142 CreateSecret(CreateSecretPlan),
143 CreateSink(CreateSinkPlan),
144 CreateTable(CreateTablePlan),
145 CreateView(CreateViewPlan),
146 CreateMaterializedView(CreateMaterializedViewPlan),
147 CreateContinualTask(CreateContinualTaskPlan),
148 CreateNetworkPolicy(CreateNetworkPolicyPlan),
149 CreateIndex(CreateIndexPlan),
150 CreateType(CreateTypePlan),
151 Comment(CommentPlan),
152 DiscardTemp,
153 DiscardAll,
154 DropObjects(DropObjectsPlan),
155 DropOwned(DropOwnedPlan),
156 EmptyQuery,
157 ShowAllVariables,
158 ShowCreate(ShowCreatePlan),
159 ShowColumns(ShowColumnsPlan),
160 ShowVariable(ShowVariablePlan),
161 InspectShard(InspectShardPlan),
162 SetVariable(SetVariablePlan),
163 ResetVariable(ResetVariablePlan),
164 SetTransaction(SetTransactionPlan),
165 StartTransaction(StartTransactionPlan),
166 CommitTransaction(CommitTransactionPlan),
167 AbortTransaction(AbortTransactionPlan),
168 Select(SelectPlan),
169 Subscribe(SubscribePlan),
170 CopyFrom(CopyFromPlan),
171 CopyTo(CopyToPlan),
172 ExplainPlan(ExplainPlanPlan),
173 ExplainPushdown(ExplainPushdownPlan),
174 ExplainTimestamp(ExplainTimestampPlan),
175 ExplainSinkSchema(ExplainSinkSchemaPlan),
176 Insert(InsertPlan),
177 AlterCluster(AlterClusterPlan),
178 AlterClusterSwap(AlterClusterSwapPlan),
179 AlterNoop(AlterNoopPlan),
180 AlterSetCluster(AlterSetClusterPlan),
181 AlterConnection(AlterConnectionPlan),
182 AlterSource(AlterSourcePlan),
183 AlterClusterRename(AlterClusterRenamePlan),
184 AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
185 AlterItemRename(AlterItemRenamePlan),
186 AlterSchemaRename(AlterSchemaRenamePlan),
187 AlterSchemaSwap(AlterSchemaSwapPlan),
188 AlterSecret(AlterSecretPlan),
189 AlterSink(AlterSinkPlan),
190 AlterSystemSet(AlterSystemSetPlan),
191 AlterSystemReset(AlterSystemResetPlan),
192 AlterSystemResetAll(AlterSystemResetAllPlan),
193 AlterRole(AlterRolePlan),
194 AlterOwner(AlterOwnerPlan),
195 AlterTableAddColumn(AlterTablePlan),
196 AlterNetworkPolicy(AlterNetworkPolicyPlan),
197 Declare(DeclarePlan),
198 Fetch(FetchPlan),
199 Close(ClosePlan),
200 ReadThenWrite(ReadThenWritePlan),
201 Prepare(PreparePlan),
202 Execute(ExecutePlan),
203 Deallocate(DeallocatePlan),
204 Raise(RaisePlan),
205 GrantRole(GrantRolePlan),
206 RevokeRole(RevokeRolePlan),
207 GrantPrivileges(GrantPrivilegesPlan),
208 RevokePrivileges(RevokePrivilegesPlan),
209 AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
210 ReassignOwned(ReassignOwnedPlan),
211 SideEffectingFunc(SideEffectingFunc),
212 ValidateConnection(ValidateConnectionPlan),
213 AlterRetainHistory(AlterRetainHistoryPlan),
214}
215
216impl Plan {
217 pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
220 match stmt {
221 StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
222 StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
223 StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
224 StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
225 StatementKind::AlterObjectRename => &[
226 PlanKind::AlterClusterRename,
227 PlanKind::AlterClusterReplicaRename,
228 PlanKind::AlterItemRename,
229 PlanKind::AlterSchemaRename,
230 PlanKind::AlterNoop,
231 ],
232 StatementKind::AlterObjectSwap => &[
233 PlanKind::AlterClusterSwap,
234 PlanKind::AlterSchemaSwap,
235 PlanKind::AlterNoop,
236 ],
237 StatementKind::AlterRole => &[PlanKind::AlterRole],
238 StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
239 StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
240 StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
241 StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
242 StatementKind::AlterSource => &[
243 PlanKind::AlterNoop,
244 PlanKind::AlterSource,
245 PlanKind::AlterRetainHistory,
246 ],
247 StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
248 StatementKind::AlterSystemResetAll => {
249 &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
250 }
251 StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
252 StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
253 StatementKind::AlterTableAddColumn => {
254 &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
255 }
256 StatementKind::Close => &[PlanKind::Close],
257 StatementKind::Comment => &[PlanKind::Comment],
258 StatementKind::Commit => &[PlanKind::CommitTransaction],
259 StatementKind::Copy => &[
260 PlanKind::CopyFrom,
261 PlanKind::Select,
262 PlanKind::Subscribe,
263 PlanKind::CopyTo,
264 ],
265 StatementKind::CreateCluster => &[PlanKind::CreateCluster],
266 StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
267 StatementKind::CreateConnection => &[PlanKind::CreateConnection],
268 StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
269 StatementKind::CreateIndex => &[PlanKind::CreateIndex],
270 StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
271 StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
272 StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
273 StatementKind::CreateRole => &[PlanKind::CreateRole],
274 StatementKind::CreateSchema => &[PlanKind::CreateSchema],
275 StatementKind::CreateSecret => &[PlanKind::CreateSecret],
276 StatementKind::CreateSink => &[PlanKind::CreateSink],
277 StatementKind::CreateSource | StatementKind::CreateSubsource => {
278 &[PlanKind::CreateSource]
279 }
280 StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
281 StatementKind::CreateTable => &[PlanKind::CreateTable],
282 StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
283 StatementKind::CreateType => &[PlanKind::CreateType],
284 StatementKind::CreateView => &[PlanKind::CreateView],
285 StatementKind::Deallocate => &[PlanKind::Deallocate],
286 StatementKind::Declare => &[PlanKind::Declare],
287 StatementKind::Delete => &[PlanKind::ReadThenWrite],
288 StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
289 StatementKind::DropObjects => &[PlanKind::DropObjects],
290 StatementKind::DropOwned => &[PlanKind::DropOwned],
291 StatementKind::Execute => &[PlanKind::Execute],
292 StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
293 StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
294 StatementKind::ExplainAnalyzeObject => &[PlanKind::Select],
295 StatementKind::ExplainAnalyzeCluster => &[PlanKind::Select],
296 StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
297 StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
298 StatementKind::Fetch => &[PlanKind::Fetch],
299 StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
300 StatementKind::GrantRole => &[PlanKind::GrantRole],
301 StatementKind::Insert => &[PlanKind::Insert],
302 StatementKind::Prepare => &[PlanKind::Prepare],
303 StatementKind::Raise => &[PlanKind::Raise],
304 StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
305 StatementKind::ResetVariable => &[PlanKind::ResetVariable],
306 StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
307 StatementKind::RevokeRole => &[PlanKind::RevokeRole],
308 StatementKind::Rollback => &[PlanKind::AbortTransaction],
309 StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
310 StatementKind::SetTransaction => &[PlanKind::SetTransaction],
311 StatementKind::SetVariable => &[PlanKind::SetVariable],
312 StatementKind::Show => &[
313 PlanKind::Select,
314 PlanKind::ShowVariable,
315 PlanKind::ShowCreate,
316 PlanKind::ShowColumns,
317 PlanKind::ShowAllVariables,
318 PlanKind::InspectShard,
319 ],
320 StatementKind::StartTransaction => &[PlanKind::StartTransaction],
321 StatementKind::Subscribe => &[PlanKind::Subscribe],
322 StatementKind::Update => &[PlanKind::ReadThenWrite],
323 StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
324 StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
325 }
326 }
327
328 pub fn name(&self) -> &str {
330 match self {
331 Plan::CreateConnection(_) => "create connection",
332 Plan::CreateDatabase(_) => "create database",
333 Plan::CreateSchema(_) => "create schema",
334 Plan::CreateRole(_) => "create role",
335 Plan::CreateCluster(_) => "create cluster",
336 Plan::CreateClusterReplica(_) => "create cluster replica",
337 Plan::CreateSource(_) => "create source",
338 Plan::CreateSources(_) => "create source",
339 Plan::CreateSecret(_) => "create secret",
340 Plan::CreateSink(_) => "create sink",
341 Plan::CreateTable(_) => "create table",
342 Plan::CreateView(_) => "create view",
343 Plan::CreateMaterializedView(_) => "create materialized view",
344 Plan::CreateContinualTask(_) => "create continual task",
345 Plan::CreateIndex(_) => "create index",
346 Plan::CreateType(_) => "create type",
347 Plan::CreateNetworkPolicy(_) => "create network policy",
348 Plan::Comment(_) => "comment",
349 Plan::DiscardTemp => "discard temp",
350 Plan::DiscardAll => "discard all",
351 Plan::DropObjects(plan) => match plan.object_type {
352 ObjectType::Table => "drop table",
353 ObjectType::View => "drop view",
354 ObjectType::MaterializedView => "drop materialized view",
355 ObjectType::Source => "drop source",
356 ObjectType::Sink => "drop sink",
357 ObjectType::Index => "drop index",
358 ObjectType::Type => "drop type",
359 ObjectType::Role => "drop roles",
360 ObjectType::Cluster => "drop clusters",
361 ObjectType::ClusterReplica => "drop cluster replicas",
362 ObjectType::Secret => "drop secret",
363 ObjectType::Connection => "drop connection",
364 ObjectType::Database => "drop database",
365 ObjectType::Schema => "drop schema",
366 ObjectType::Func => "drop function",
367 ObjectType::ContinualTask => "drop continual task",
368 ObjectType::NetworkPolicy => "drop network policy",
369 },
370 Plan::DropOwned(_) => "drop owned",
371 Plan::EmptyQuery => "do nothing",
372 Plan::ShowAllVariables => "show all variables",
373 Plan::ShowCreate(_) => "show create",
374 Plan::ShowColumns(_) => "show columns",
375 Plan::ShowVariable(_) => "show variable",
376 Plan::InspectShard(_) => "inspect shard",
377 Plan::SetVariable(_) => "set variable",
378 Plan::ResetVariable(_) => "reset variable",
379 Plan::SetTransaction(_) => "set transaction",
380 Plan::StartTransaction(_) => "start transaction",
381 Plan::CommitTransaction(_) => "commit",
382 Plan::AbortTransaction(_) => "abort",
383 Plan::Select(_) => "select",
384 Plan::Subscribe(_) => "subscribe",
385 Plan::CopyFrom(_) => "copy from",
386 Plan::CopyTo(_) => "copy to",
387 Plan::ExplainPlan(_) => "explain plan",
388 Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
389 Plan::ExplainTimestamp(_) => "explain timestamp",
390 Plan::ExplainSinkSchema(_) => "explain schema",
391 Plan::Insert(_) => "insert",
392 Plan::AlterNoop(plan) => match plan.object_type {
393 ObjectType::Table => "alter table",
394 ObjectType::View => "alter view",
395 ObjectType::MaterializedView => "alter materialized view",
396 ObjectType::Source => "alter source",
397 ObjectType::Sink => "alter sink",
398 ObjectType::Index => "alter index",
399 ObjectType::Type => "alter type",
400 ObjectType::Role => "alter role",
401 ObjectType::Cluster => "alter cluster",
402 ObjectType::ClusterReplica => "alter cluster replica",
403 ObjectType::Secret => "alter secret",
404 ObjectType::Connection => "alter connection",
405 ObjectType::Database => "alter database",
406 ObjectType::Schema => "alter schema",
407 ObjectType::Func => "alter function",
408 ObjectType::ContinualTask => "alter continual task",
409 ObjectType::NetworkPolicy => "alter network policy",
410 },
411 Plan::AlterCluster(_) => "alter cluster",
412 Plan::AlterClusterRename(_) => "alter cluster rename",
413 Plan::AlterClusterSwap(_) => "alter cluster swap",
414 Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
415 Plan::AlterSetCluster(_) => "alter set cluster",
416 Plan::AlterConnection(_) => "alter connection",
417 Plan::AlterSource(_) => "alter source",
418 Plan::AlterItemRename(_) => "rename item",
419 Plan::AlterSchemaRename(_) => "alter rename schema",
420 Plan::AlterSchemaSwap(_) => "alter swap schema",
421 Plan::AlterSecret(_) => "alter secret",
422 Plan::AlterSink(_) => "alter sink",
423 Plan::AlterSystemSet(_) => "alter system",
424 Plan::AlterSystemReset(_) => "alter system",
425 Plan::AlterSystemResetAll(_) => "alter system",
426 Plan::AlterRole(_) => "alter role",
427 Plan::AlterNetworkPolicy(_) => "alter network policy",
428 Plan::AlterOwner(plan) => match plan.object_type {
429 ObjectType::Table => "alter table owner",
430 ObjectType::View => "alter view owner",
431 ObjectType::MaterializedView => "alter materialized view owner",
432 ObjectType::Source => "alter source owner",
433 ObjectType::Sink => "alter sink owner",
434 ObjectType::Index => "alter index owner",
435 ObjectType::Type => "alter type owner",
436 ObjectType::Role => "alter role owner",
437 ObjectType::Cluster => "alter cluster owner",
438 ObjectType::ClusterReplica => "alter cluster replica owner",
439 ObjectType::Secret => "alter secret owner",
440 ObjectType::Connection => "alter connection owner",
441 ObjectType::Database => "alter database owner",
442 ObjectType::Schema => "alter schema owner",
443 ObjectType::Func => "alter function owner",
444 ObjectType::ContinualTask => "alter continual task owner",
445 ObjectType::NetworkPolicy => "alter network policy owner",
446 },
447 Plan::AlterTableAddColumn(_) => "alter table add column",
448 Plan::Declare(_) => "declare",
449 Plan::Fetch(_) => "fetch",
450 Plan::Close(_) => "close",
451 Plan::ReadThenWrite(plan) => match plan.kind {
452 MutationKind::Insert => "insert into select",
453 MutationKind::Update => "update",
454 MutationKind::Delete => "delete",
455 },
456 Plan::Prepare(_) => "prepare",
457 Plan::Execute(_) => "execute",
458 Plan::Deallocate(_) => "deallocate",
459 Plan::Raise(_) => "raise",
460 Plan::GrantRole(_) => "grant role",
461 Plan::RevokeRole(_) => "revoke role",
462 Plan::GrantPrivileges(_) => "grant privilege",
463 Plan::RevokePrivileges(_) => "revoke privilege",
464 Plan::AlterDefaultPrivileges(_) => "alter default privileges",
465 Plan::ReassignOwned(_) => "reassign owned",
466 Plan::SideEffectingFunc(_) => "side effecting func",
467 Plan::ValidateConnection(_) => "validate connection",
468 Plan::AlterRetainHistory(_) => "alter retain history",
469 }
470 }
471
472 pub fn allowed_in_read_only(&self) -> bool {
478 match self {
479 Plan::SetVariable(_) => true,
482 Plan::ResetVariable(_) => true,
483 Plan::SetTransaction(_) => true,
484 Plan::StartTransaction(_) => true,
485 Plan::CommitTransaction(_) => true,
486 Plan::AbortTransaction(_) => true,
487 Plan::Select(_) => true,
488 Plan::EmptyQuery => true,
489 Plan::ShowAllVariables => true,
490 Plan::ShowCreate(_) => true,
491 Plan::ShowColumns(_) => true,
492 Plan::ShowVariable(_) => true,
493 Plan::InspectShard(_) => true,
494 Plan::Subscribe(_) => true,
495 Plan::CopyTo(_) => true,
496 Plan::ExplainPlan(_) => true,
497 Plan::ExplainPushdown(_) => true,
498 Plan::ExplainTimestamp(_) => true,
499 Plan::ExplainSinkSchema(_) => true,
500 Plan::ValidateConnection(_) => true,
501 _ => false,
502 }
503 }
504}
505
506#[derive(Debug)]
507pub struct StartTransactionPlan {
508 pub access: Option<TransactionAccessMode>,
509 pub isolation_level: Option<TransactionIsolationLevel>,
510}
511
512#[derive(Debug)]
513pub enum TransactionType {
514 Explicit,
515 Implicit,
516}
517
518impl TransactionType {
519 pub fn is_explicit(&self) -> bool {
520 matches!(self, TransactionType::Explicit)
521 }
522
523 pub fn is_implicit(&self) -> bool {
524 matches!(self, TransactionType::Implicit)
525 }
526}
527
528#[derive(Debug)]
529pub struct CommitTransactionPlan {
530 pub transaction_type: TransactionType,
531}
532
533#[derive(Debug)]
534pub struct AbortTransactionPlan {
535 pub transaction_type: TransactionType,
536}
537
538#[derive(Debug)]
539pub struct CreateDatabasePlan {
540 pub name: String,
541 pub if_not_exists: bool,
542}
543
544#[derive(Debug)]
545pub struct CreateSchemaPlan {
546 pub database_spec: ResolvedDatabaseSpecifier,
547 pub schema_name: String,
548 pub if_not_exists: bool,
549}
550
551#[derive(Debug)]
552pub struct CreateRolePlan {
553 pub name: String,
554 pub attributes: RoleAttributesRaw,
555}
556
557#[derive(Debug, PartialEq, Eq, Clone)]
558pub struct CreateClusterPlan {
559 pub name: String,
560 pub variant: CreateClusterVariant,
561 pub workload_class: Option<String>,
562}
563
564#[derive(Debug, PartialEq, Eq, Clone)]
565pub enum CreateClusterVariant {
566 Managed(CreateClusterManagedPlan),
567 Unmanaged(CreateClusterUnmanagedPlan),
568}
569
570#[derive(Debug, PartialEq, Eq, Clone)]
571pub struct CreateClusterUnmanagedPlan {
572 pub replicas: Vec<(String, ReplicaConfig)>,
573}
574
575#[derive(Debug, PartialEq, Eq, Clone)]
576pub struct CreateClusterManagedPlan {
577 pub replication_factor: u32,
578 pub size: String,
579 pub availability_zones: Vec<String>,
580 pub compute: ComputeReplicaConfig,
581 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
582 pub schedule: ClusterSchedule,
583}
584
585#[derive(Debug)]
586pub struct CreateClusterReplicaPlan {
587 pub cluster_id: ClusterId,
588 pub name: String,
589 pub config: ReplicaConfig,
590}
591
592#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq)]
594pub struct ComputeReplicaIntrospectionConfig {
595 pub debugging: bool,
597 pub interval: Duration,
599}
600
601#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
602pub struct ComputeReplicaConfig {
603 pub introspection: Option<ComputeReplicaIntrospectionConfig>,
604}
605
606#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
607pub enum ReplicaConfig {
608 Unorchestrated {
609 storagectl_addrs: Vec<String>,
610 computectl_addrs: Vec<String>,
611 compute: ComputeReplicaConfig,
612 },
613 Orchestrated {
614 size: String,
615 availability_zone: Option<String>,
616 compute: ComputeReplicaConfig,
617 internal: bool,
618 billed_as: Option<String>,
619 },
620}
621
622#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
623pub enum ClusterSchedule {
624 Manual,
626 Refresh { hydration_time_estimate: Duration },
630}
631
632impl Default for ClusterSchedule {
633 fn default() -> Self {
634 ClusterSchedule::Manual
636 }
637}
638
639#[derive(Debug)]
640pub struct CreateSourcePlan {
641 pub name: QualifiedItemName,
642 pub source: Source,
643 pub if_not_exists: bool,
644 pub timeline: Timeline,
645 pub in_cluster: Option<ClusterId>,
647}
648
649#[derive(Clone, Debug, PartialEq, Eq)]
650pub struct SourceReferences {
651 pub updated_at: u64,
652 pub references: Vec<SourceReference>,
653}
654
655#[derive(Clone, Debug, PartialEq, Eq)]
658pub struct SourceReference {
659 pub name: String,
660 pub namespace: Option<String>,
661 pub columns: Vec<String>,
662}
663
664#[derive(Debug)]
666pub struct CreateSourcePlanBundle {
667 pub item_id: CatalogItemId,
669 pub global_id: GlobalId,
671 pub plan: CreateSourcePlan,
673 pub resolved_ids: ResolvedIds,
675 pub available_source_references: Option<SourceReferences>,
679}
680
681#[derive(Debug)]
682pub struct CreateConnectionPlan {
683 pub name: QualifiedItemName,
684 pub if_not_exists: bool,
685 pub connection: Connection,
686 pub validate: bool,
687}
688
689#[derive(Debug)]
690pub struct ValidateConnectionPlan {
691 pub id: CatalogItemId,
693 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
695}
696
697#[derive(Debug)]
698pub struct CreateSecretPlan {
699 pub name: QualifiedItemName,
700 pub secret: Secret,
701 pub if_not_exists: bool,
702}
703
704#[derive(Debug)]
705pub struct CreateSinkPlan {
706 pub name: QualifiedItemName,
707 pub sink: Sink,
708 pub with_snapshot: bool,
709 pub if_not_exists: bool,
710 pub in_cluster: ClusterId,
711}
712
713#[derive(Debug)]
714pub struct CreateTablePlan {
715 pub name: QualifiedItemName,
716 pub table: Table,
717 pub if_not_exists: bool,
718}
719
720#[derive(Debug, Clone)]
721pub struct CreateViewPlan {
722 pub name: QualifiedItemName,
723 pub view: View,
724 pub replace: Option<CatalogItemId>,
726 pub drop_ids: Vec<CatalogItemId>,
728 pub if_not_exists: bool,
729 pub ambiguous_columns: bool,
732}
733
734#[derive(Debug, Clone)]
735pub struct CreateMaterializedViewPlan {
736 pub name: QualifiedItemName,
737 pub materialized_view: MaterializedView,
738 pub replace: Option<CatalogItemId>,
740 pub drop_ids: Vec<CatalogItemId>,
742 pub if_not_exists: bool,
743 pub ambiguous_columns: bool,
746}
747
748#[derive(Debug, Clone)]
749pub struct CreateContinualTaskPlan {
750 pub name: QualifiedItemName,
751 pub placeholder_id: Option<mz_expr::LocalId>,
754 pub desc: RelationDesc,
755 pub input_id: GlobalId,
757 pub with_snapshot: bool,
758 pub continual_task: MaterializedView,
760}
761
762#[derive(Debug, Clone)]
763pub struct CreateNetworkPolicyPlan {
764 pub name: String,
765 pub rules: Vec<NetworkPolicyRule>,
766}
767
768#[derive(Debug, Clone)]
769pub struct AlterNetworkPolicyPlan {
770 pub id: NetworkPolicyId,
771 pub name: String,
772 pub rules: Vec<NetworkPolicyRule>,
773}
774
775#[derive(Debug, Clone)]
776pub struct CreateIndexPlan {
777 pub name: QualifiedItemName,
778 pub index: Index,
779 pub if_not_exists: bool,
780}
781
782#[derive(Debug)]
783pub struct CreateTypePlan {
784 pub name: QualifiedItemName,
785 pub typ: Type,
786}
787
788#[derive(Debug)]
789pub struct DropObjectsPlan {
790 pub referenced_ids: Vec<ObjectId>,
792 pub drop_ids: Vec<ObjectId>,
794 pub object_type: ObjectType,
797}
798
799#[derive(Debug)]
800pub struct DropOwnedPlan {
801 pub role_ids: Vec<RoleId>,
803 pub drop_ids: Vec<ObjectId>,
805 pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
807 pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
809}
810
811#[derive(Debug)]
812pub struct ShowVariablePlan {
813 pub name: String,
814}
815
816#[derive(Debug)]
817pub struct InspectShardPlan {
818 pub id: GlobalId,
820}
821
822#[derive(Debug)]
823pub struct SetVariablePlan {
824 pub name: String,
825 pub value: VariableValue,
826 pub local: bool,
827}
828
829#[derive(Debug)]
830pub enum VariableValue {
831 Default,
832 Values(Vec<String>),
833}
834
835#[derive(Debug)]
836pub struct ResetVariablePlan {
837 pub name: String,
838}
839
840#[derive(Debug)]
841pub struct SetTransactionPlan {
842 pub local: bool,
843 pub modes: Vec<TransactionMode>,
844}
845
846#[derive(Clone, Debug)]
848pub struct SelectPlan {
849 pub select: Option<Box<SelectStatement<Aug>>>,
852 pub source: HirRelationExpr,
854 pub when: QueryWhen,
856 pub finishing: RowSetFinishing,
858 pub copy_to: Option<CopyFormat>,
860}
861
862impl SelectPlan {
863 pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
864 let arity = typ.arity();
865 SelectPlan {
866 select: None,
867 source: HirRelationExpr::Constant { rows, typ },
868 when: QueryWhen::Immediately,
869 finishing: RowSetFinishing::trivial(arity),
870 copy_to: None,
871 }
872 }
873}
874
875#[derive(Debug)]
876pub enum SubscribeOutput {
877 Diffs,
878 WithinTimestampOrderBy {
879 order_by: Vec<ColumnOrder>,
881 },
882 EnvelopeUpsert {
883 order_by_keys: Vec<ColumnOrder>,
885 },
886 EnvelopeDebezium {
887 order_by_keys: Vec<ColumnOrder>,
889 },
890}
891
892#[derive(Debug)]
893pub struct SubscribePlan {
894 pub from: SubscribeFrom,
895 pub with_snapshot: bool,
896 pub when: QueryWhen,
897 pub up_to: Option<Timestamp>,
898 pub copy_to: Option<CopyFormat>,
899 pub emit_progress: bool,
900 pub output: SubscribeOutput,
901}
902
903#[derive(Debug, Clone)]
904pub enum SubscribeFrom {
905 Id(GlobalId),
907 Query {
909 expr: MirRelationExpr,
910 desc: RelationDesc,
911 },
912}
913
914impl SubscribeFrom {
915 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
916 match self {
917 SubscribeFrom::Id(id) => BTreeSet::from([*id]),
918 SubscribeFrom::Query { expr, .. } => expr.depends_on(),
919 }
920 }
921
922 pub fn contains_temporal(&self) -> bool {
923 match self {
924 SubscribeFrom::Id(_) => false,
925 SubscribeFrom::Query { expr, .. } => expr.contains_temporal(),
926 }
927 }
928}
929
930#[derive(Debug)]
931pub struct ShowCreatePlan {
932 pub id: ObjectId,
933 pub row: Row,
934}
935
936#[derive(Debug)]
937pub struct ShowColumnsPlan {
938 pub id: CatalogItemId,
939 pub select_plan: SelectPlan,
940 pub new_resolved_ids: ResolvedIds,
941}
942
943#[derive(Debug)]
944pub struct CopyFromPlan {
945 pub target_id: CatalogItemId,
947 pub target_name: String,
949 pub source: CopyFromSource,
951 pub columns: Vec<ColumnIndex>,
955 pub source_desc: RelationDesc,
957 pub mfp: MapFilterProject,
959 pub params: CopyFormatParams<'static>,
961 pub filter: Option<CopyFromFilter>,
963}
964
965#[derive(Debug)]
966pub enum CopyFromSource {
967 Stdin,
969 Url(HirScalarExpr),
973 AwsS3 {
975 uri: HirScalarExpr,
977 connection: AwsConnection,
979 connection_id: CatalogItemId,
981 },
982}
983
984#[derive(Debug)]
985pub enum CopyFromFilter {
986 Files(Vec<String>),
987 Pattern(String),
988}
989
990#[derive(Debug, Clone)]
991pub struct CopyToPlan {
992 pub select_plan: SelectPlan,
994 pub desc: RelationDesc,
995 pub to: HirScalarExpr,
997 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
998 pub connection_id: CatalogItemId,
1000 pub format: S3SinkFormat,
1001 pub max_file_size: u64,
1002}
1003
1004#[derive(Clone, Debug)]
1005pub struct ExplainPlanPlan {
1006 pub stage: ExplainStage,
1007 pub format: ExplainFormat,
1008 pub config: ExplainConfig,
1009 pub explainee: Explainee,
1010}
1011
1012#[derive(Clone, Debug)]
1014pub enum Explainee {
1015 View(CatalogItemId),
1017 MaterializedView(CatalogItemId),
1019 Index(CatalogItemId),
1021 ReplanView(CatalogItemId),
1023 ReplanMaterializedView(CatalogItemId),
1025 ReplanIndex(CatalogItemId),
1027 Statement(ExplaineeStatement),
1029}
1030
1031#[derive(Clone, Debug, EnumKind)]
1033#[enum_kind(ExplaineeStatementKind)]
1034pub enum ExplaineeStatement {
1035 Select {
1037 broken: bool,
1039 plan: plan::SelectPlan,
1040 desc: RelationDesc,
1041 },
1042 CreateView {
1044 broken: bool,
1046 plan: plan::CreateViewPlan,
1047 },
1048 CreateMaterializedView {
1050 broken: bool,
1052 plan: plan::CreateMaterializedViewPlan,
1053 },
1054 CreateIndex {
1056 broken: bool,
1058 plan: plan::CreateIndexPlan,
1059 },
1060}
1061
1062impl ExplaineeStatement {
1063 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1064 match self {
1065 Self::Select { plan, .. } => plan.source.depends_on(),
1066 Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1067 Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1068 Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1069 }
1070 }
1071
1072 pub fn broken(&self) -> bool {
1083 match self {
1084 Self::Select { broken, .. } => *broken,
1085 Self::CreateView { broken, .. } => *broken,
1086 Self::CreateMaterializedView { broken, .. } => *broken,
1087 Self::CreateIndex { broken, .. } => *broken,
1088 }
1089 }
1090}
1091
1092impl ExplaineeStatementKind {
1093 pub fn supports(&self, stage: &ExplainStage) -> bool {
1094 use ExplainStage::*;
1095 match self {
1096 Self::Select => true,
1097 Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1098 Self::CreateMaterializedView => true,
1099 Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1100 }
1101 }
1102}
1103
1104impl std::fmt::Display for ExplaineeStatementKind {
1105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1106 match self {
1107 Self::Select => write!(f, "SELECT"),
1108 Self::CreateView => write!(f, "CREATE VIEW"),
1109 Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1110 Self::CreateIndex => write!(f, "CREATE INDEX"),
1111 }
1112 }
1113}
1114
1115#[derive(Clone, Debug)]
1116pub struct ExplainPushdownPlan {
1117 pub explainee: Explainee,
1118}
1119
1120#[derive(Clone, Debug)]
1121pub struct ExplainTimestampPlan {
1122 pub format: ExplainFormat,
1123 pub raw_plan: HirRelationExpr,
1124 pub when: QueryWhen,
1125}
1126
1127#[derive(Debug)]
1128pub struct ExplainSinkSchemaPlan {
1129 pub sink_from: GlobalId,
1130 pub json_schema: String,
1131}
1132
1133#[derive(Debug)]
1134pub struct SendDiffsPlan {
1135 pub id: CatalogItemId,
1136 pub updates: Vec<(Row, Diff)>,
1137 pub kind: MutationKind,
1138 pub returning: Vec<(Row, NonZeroUsize)>,
1139 pub max_result_size: u64,
1140}
1141
1142#[derive(Debug)]
1143pub struct InsertPlan {
1144 pub id: CatalogItemId,
1145 pub values: HirRelationExpr,
1146 pub returning: Vec<mz_expr::MirScalarExpr>,
1147}
1148
1149#[derive(Debug)]
1150pub struct ReadThenWritePlan {
1151 pub id: CatalogItemId,
1152 pub selection: HirRelationExpr,
1153 pub finishing: RowSetFinishing,
1154 pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1155 pub kind: MutationKind,
1156 pub returning: Vec<mz_expr::MirScalarExpr>,
1157}
1158
1159#[derive(Debug)]
1161pub struct AlterNoopPlan {
1162 pub object_type: ObjectType,
1163}
1164
1165#[derive(Debug)]
1166pub struct AlterSetClusterPlan {
1167 pub id: CatalogItemId,
1168 pub set_cluster: ClusterId,
1169}
1170
1171#[derive(Debug)]
1172pub struct AlterRetainHistoryPlan {
1173 pub id: CatalogItemId,
1174 pub value: Option<Value>,
1175 pub window: CompactionWindow,
1176 pub object_type: ObjectType,
1177}
1178
1179#[derive(Debug, Clone)]
1180
1181pub enum AlterOptionParameter<T = String> {
1182 Set(T),
1183 Reset,
1184 Unchanged,
1185}
1186
1187#[derive(Debug)]
1188pub enum AlterConnectionAction {
1189 RotateKeys,
1190 AlterOptions {
1191 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1192 drop_options: BTreeSet<ConnectionOptionName>,
1193 validate: bool,
1194 },
1195}
1196
1197#[derive(Debug)]
1198pub struct AlterConnectionPlan {
1199 pub id: CatalogItemId,
1200 pub action: AlterConnectionAction,
1201}
1202
1203#[derive(Debug)]
1204pub enum AlterSourceAction {
1205 AddSubsourceExports {
1206 subsources: Vec<CreateSourcePlanBundle>,
1207 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1208 },
1209 RefreshReferences {
1210 references: SourceReferences,
1211 },
1212}
1213
1214#[derive(Debug)]
1215pub struct AlterSourcePlan {
1216 pub item_id: CatalogItemId,
1217 pub ingestion_id: GlobalId,
1218 pub action: AlterSourceAction,
1219}
1220
1221#[derive(Debug, Clone)]
1222pub struct AlterSinkPlan {
1223 pub item_id: CatalogItemId,
1224 pub global_id: GlobalId,
1225 pub sink: Sink,
1226 pub with_snapshot: bool,
1227 pub in_cluster: ClusterId,
1228}
1229
1230#[derive(Debug, Clone)]
1231pub struct AlterClusterPlan {
1232 pub id: ClusterId,
1233 pub name: String,
1234 pub options: PlanClusterOption,
1235 pub strategy: AlterClusterPlanStrategy,
1236}
1237
1238#[derive(Debug)]
1239pub struct AlterClusterRenamePlan {
1240 pub id: ClusterId,
1241 pub name: String,
1242 pub to_name: String,
1243}
1244
1245#[derive(Debug)]
1246pub struct AlterClusterReplicaRenamePlan {
1247 pub cluster_id: ClusterId,
1248 pub replica_id: ReplicaId,
1249 pub name: QualifiedReplica,
1250 pub to_name: String,
1251}
1252
1253#[derive(Debug)]
1254pub struct AlterItemRenamePlan {
1255 pub id: CatalogItemId,
1256 pub current_full_name: FullItemName,
1257 pub to_name: String,
1258 pub object_type: ObjectType,
1259}
1260
1261#[derive(Debug)]
1262pub struct AlterSchemaRenamePlan {
1263 pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1264 pub new_schema_name: String,
1265}
1266
1267#[derive(Debug)]
1268pub struct AlterSchemaSwapPlan {
1269 pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1270 pub schema_a_name: String,
1271 pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1272 pub schema_b_name: String,
1273 pub name_temp: String,
1274}
1275
1276#[derive(Debug)]
1277pub struct AlterClusterSwapPlan {
1278 pub id_a: ClusterId,
1279 pub id_b: ClusterId,
1280 pub name_a: String,
1281 pub name_b: String,
1282 pub name_temp: String,
1283}
1284
1285#[derive(Debug)]
1286pub struct AlterSecretPlan {
1287 pub id: CatalogItemId,
1288 pub secret_as: MirScalarExpr,
1289}
1290
1291#[derive(Debug)]
1292pub struct AlterSystemSetPlan {
1293 pub name: String,
1294 pub value: VariableValue,
1295}
1296
1297#[derive(Debug)]
1298pub struct AlterSystemResetPlan {
1299 pub name: String,
1300}
1301
1302#[derive(Debug)]
1303pub struct AlterSystemResetAllPlan {}
1304
1305#[derive(Debug)]
1306pub struct AlterRolePlan {
1307 pub id: RoleId,
1308 pub name: String,
1309 pub option: PlannedAlterRoleOption,
1310}
1311
1312#[derive(Debug)]
1313pub struct AlterOwnerPlan {
1314 pub id: ObjectId,
1315 pub object_type: ObjectType,
1316 pub new_owner: RoleId,
1317}
1318
1319#[derive(Debug)]
1320pub struct AlterTablePlan {
1321 pub relation_id: CatalogItemId,
1322 pub column_name: ColumnName,
1323 pub column_type: SqlColumnType,
1324 pub raw_sql_type: RawDataType,
1325}
1326
1327#[derive(Debug)]
1328pub struct DeclarePlan {
1329 pub name: String,
1330 pub stmt: Statement<Raw>,
1331 pub sql: String,
1332 pub params: Params,
1333}
1334
1335#[derive(Debug)]
1336pub struct FetchPlan {
1337 pub name: String,
1338 pub count: Option<FetchDirection>,
1339 pub timeout: ExecuteTimeout,
1340}
1341
1342#[derive(Debug)]
1343pub struct ClosePlan {
1344 pub name: String,
1345}
1346
1347#[derive(Debug)]
1348pub struct PreparePlan {
1349 pub name: String,
1350 pub stmt: Statement<Raw>,
1351 pub sql: String,
1352 pub desc: StatementDesc,
1353}
1354
1355#[derive(Debug)]
1356pub struct ExecutePlan {
1357 pub name: String,
1358 pub params: Params,
1359}
1360
1361#[derive(Debug)]
1362pub struct DeallocatePlan {
1363 pub name: Option<String>,
1364}
1365
1366#[derive(Debug)]
1367pub struct RaisePlan {
1368 pub severity: NoticeSeverity,
1369}
1370
1371#[derive(Debug)]
1372pub struct GrantRolePlan {
1373 pub role_ids: Vec<RoleId>,
1375 pub member_ids: Vec<RoleId>,
1377 pub grantor_id: RoleId,
1379}
1380
1381#[derive(Debug)]
1382pub struct RevokeRolePlan {
1383 pub role_ids: Vec<RoleId>,
1385 pub member_ids: Vec<RoleId>,
1387 pub grantor_id: RoleId,
1389}
1390
1391#[derive(Debug)]
1392pub struct UpdatePrivilege {
1393 pub acl_mode: AclMode,
1395 pub target_id: SystemObjectId,
1397 pub grantor: RoleId,
1399}
1400
1401#[derive(Debug)]
1402pub struct GrantPrivilegesPlan {
1403 pub update_privileges: Vec<UpdatePrivilege>,
1405 pub grantees: Vec<RoleId>,
1407}
1408
1409#[derive(Debug)]
1410pub struct RevokePrivilegesPlan {
1411 pub update_privileges: Vec<UpdatePrivilege>,
1413 pub revokees: Vec<RoleId>,
1415}
1416#[derive(Debug)]
1417pub struct AlterDefaultPrivilegesPlan {
1418 pub privilege_objects: Vec<DefaultPrivilegeObject>,
1420 pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1422 pub is_grant: bool,
1424}
1425
1426#[derive(Debug)]
1427pub struct ReassignOwnedPlan {
1428 pub old_roles: Vec<RoleId>,
1430 pub new_role: RoleId,
1432 pub reassign_ids: Vec<ObjectId>,
1434}
1435
1436#[derive(Debug)]
1437pub struct CommentPlan {
1438 pub object_id: CommentObjectId,
1440 pub sub_component: Option<usize>,
1444 pub comment: Option<String>,
1446}
1447
1448#[derive(Clone, Debug)]
1449pub enum TableDataSource {
1450 TableWrites { defaults: Vec<Expr<Aug>> },
1452
1453 DataSource {
1456 desc: DataSourceDesc,
1457 timeline: Timeline,
1458 },
1459}
1460
1461#[derive(Clone, Debug)]
1462pub struct Table {
1463 pub create_sql: String,
1464 pub desc: VersionedRelationDesc,
1465 pub temporary: bool,
1466 pub compaction_window: Option<CompactionWindow>,
1467 pub data_source: TableDataSource,
1468}
1469
1470#[derive(Clone, Debug)]
1471pub struct Source {
1472 pub create_sql: String,
1473 pub data_source: DataSourceDesc,
1474 pub desc: RelationDesc,
1475 pub compaction_window: Option<CompactionWindow>,
1476}
1477
1478#[derive(Debug, Clone)]
1479pub enum DataSourceDesc {
1480 Ingestion(SourceDesc<ReferencedConnection>),
1482 OldSyntaxIngestion {
1484 desc: SourceDesc<ReferencedConnection>,
1485 progress_subsource: CatalogItemId,
1488 data_config: SourceExportDataConfig<ReferencedConnection>,
1489 details: SourceExportDetails,
1490 },
1491 IngestionExport {
1494 ingestion_id: CatalogItemId,
1495 external_reference: UnresolvedItemName,
1496 details: SourceExportDetails,
1497 data_config: SourceExportDataConfig<ReferencedConnection>,
1498 },
1499 Progress,
1501 Webhook {
1503 validate_using: Option<WebhookValidation>,
1504 body_format: WebhookBodyFormat,
1505 headers: WebhookHeaders,
1506 cluster_id: Option<StorageInstanceId>,
1508 },
1509}
1510
1511#[derive(Clone, Debug, Serialize)]
1512pub struct WebhookValidation {
1513 pub expression: MirScalarExpr,
1515 pub relation_desc: RelationDesc,
1517 pub bodies: Vec<(usize, bool)>,
1519 pub headers: Vec<(usize, bool)>,
1521 pub secrets: Vec<WebhookValidationSecret>,
1523}
1524
1525impl WebhookValidation {
1526 const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1527
1528 pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1533 let WebhookValidation {
1534 expression,
1535 relation_desc,
1536 ..
1537 } = self;
1538
1539 let mut expression_ = expression.clone();
1541 let desc_ = relation_desc.clone();
1542 let reduce_task = mz_ore::task::spawn_blocking(
1543 || "webhook-validation-reduce",
1544 move || {
1545 expression_.reduce(&desc_.typ().column_types);
1546 expression_
1547 },
1548 );
1549
1550 match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1551 Ok(Ok(reduced_expr)) => {
1552 *expression = reduced_expr;
1553 Ok(())
1554 }
1555 Ok(Err(_)) => Err("joining task"),
1556 Err(_) => Err("timeout"),
1557 }
1558 }
1559}
1560
1561#[derive(Clone, Debug, Default, Serialize)]
1562pub struct WebhookHeaders {
1563 pub header_column: Option<WebhookHeaderFilters>,
1565 pub mapped_headers: BTreeMap<usize, (String, bool)>,
1567}
1568
1569impl WebhookHeaders {
1570 pub fn num_columns(&self) -> usize {
1572 let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1573 let mapped_headers = self.mapped_headers.len();
1574
1575 header_column + mapped_headers
1576 }
1577}
1578
1579#[derive(Clone, Debug, Default, Serialize)]
1580pub struct WebhookHeaderFilters {
1581 pub block: BTreeSet<String>,
1582 pub allow: BTreeSet<String>,
1583}
1584
1585#[derive(Copy, Clone, Debug, Serialize, Arbitrary)]
1586pub enum WebhookBodyFormat {
1587 Json { array: bool },
1588 Bytes,
1589 Text,
1590}
1591
1592impl From<WebhookBodyFormat> for SqlScalarType {
1593 fn from(value: WebhookBodyFormat) -> Self {
1594 match value {
1595 WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1596 WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1597 WebhookBodyFormat::Text => SqlScalarType::String,
1598 }
1599 }
1600}
1601
1602#[derive(Clone, Debug, Serialize)]
1603pub struct WebhookValidationSecret {
1604 pub id: CatalogItemId,
1606 pub column_idx: usize,
1608 pub use_bytes: bool,
1610}
1611
1612#[derive(Clone, Debug)]
1613pub struct Connection {
1614 pub create_sql: String,
1615 pub details: ConnectionDetails,
1616}
1617
1618#[derive(Clone, Debug, Serialize)]
1619pub enum ConnectionDetails {
1620 Kafka(KafkaConnection<ReferencedConnection>),
1621 Csr(CsrConnection<ReferencedConnection>),
1622 Postgres(PostgresConnection<ReferencedConnection>),
1623 Ssh {
1624 connection: SshConnection,
1625 key_1: SshKey,
1626 key_2: SshKey,
1627 },
1628 Aws(AwsConnection),
1629 AwsPrivatelink(AwsPrivatelinkConnection),
1630 MySql(MySqlConnection<ReferencedConnection>),
1631 SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1632 IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1633}
1634
1635impl ConnectionDetails {
1636 pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1637 match self {
1638 ConnectionDetails::Kafka(c) => {
1639 mz_storage_types::connections::Connection::Kafka(c.clone())
1640 }
1641 ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1642 ConnectionDetails::Postgres(c) => {
1643 mz_storage_types::connections::Connection::Postgres(c.clone())
1644 }
1645 ConnectionDetails::Ssh { connection, .. } => {
1646 mz_storage_types::connections::Connection::Ssh(connection.clone())
1647 }
1648 ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1649 ConnectionDetails::AwsPrivatelink(c) => {
1650 mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1651 }
1652 ConnectionDetails::MySql(c) => {
1653 mz_storage_types::connections::Connection::MySql(c.clone())
1654 }
1655 ConnectionDetails::SqlServer(c) => {
1656 mz_storage_types::connections::Connection::SqlServer(c.clone())
1657 }
1658 ConnectionDetails::IcebergCatalog(c) => {
1659 mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1660 }
1661 }
1662 }
1663}
1664
1665#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1666pub struct NetworkPolicyRule {
1667 pub name: String,
1668 pub action: NetworkPolicyRuleAction,
1669 pub address: PolicyAddress,
1670 pub direction: NetworkPolicyRuleDirection,
1671}
1672
1673#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1674pub enum NetworkPolicyRuleAction {
1675 Allow,
1676}
1677
1678impl std::fmt::Display for NetworkPolicyRuleAction {
1679 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1680 match self {
1681 Self::Allow => write!(f, "allow"),
1682 }
1683 }
1684}
1685impl TryFrom<&str> for NetworkPolicyRuleAction {
1686 type Error = PlanError;
1687 fn try_from(value: &str) -> Result<Self, Self::Error> {
1688 match value.to_uppercase().as_str() {
1689 "ALLOW" => Ok(Self::Allow),
1690 _ => Err(PlanError::Unstructured(
1691 "Allow is the only valid option".into(),
1692 )),
1693 }
1694 }
1695}
1696
1697#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1698pub enum NetworkPolicyRuleDirection {
1699 Ingress,
1700}
1701impl std::fmt::Display for NetworkPolicyRuleDirection {
1702 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1703 match self {
1704 Self::Ingress => write!(f, "ingress"),
1705 }
1706 }
1707}
1708impl TryFrom<&str> for NetworkPolicyRuleDirection {
1709 type Error = PlanError;
1710 fn try_from(value: &str) -> Result<Self, Self::Error> {
1711 match value.to_uppercase().as_str() {
1712 "INGRESS" => Ok(Self::Ingress),
1713 _ => Err(PlanError::Unstructured(
1714 "Ingress is the only valid option".into(),
1715 )),
1716 }
1717 }
1718}
1719
1720#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1721pub struct PolicyAddress(pub IpNet);
1722impl std::fmt::Display for PolicyAddress {
1723 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1724 write!(f, "{}", &self.0.to_string())
1725 }
1726}
1727impl From<String> for PolicyAddress {
1728 fn from(value: String) -> Self {
1729 Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1730 }
1731}
1732impl TryFrom<&str> for PolicyAddress {
1733 type Error = PlanError;
1734 fn try_from(value: &str) -> Result<Self, Self::Error> {
1735 let net = IpNet::from_str(value)
1736 .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1737 Ok(Self(net))
1738 }
1739}
1740
1741impl Serialize for PolicyAddress {
1742 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1743 where
1744 S: serde::Serializer,
1745 {
1746 serializer.serialize_str(&format!("{}", &self.0))
1747 }
1748}
1749
1750#[derive(Clone, Debug, Serialize)]
1751pub enum SshKey {
1752 PublicOnly(String),
1753 Both(SshKeyPair),
1754}
1755
1756impl SshKey {
1757 pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1758 match self {
1759 SshKey::PublicOnly(_) => None,
1760 SshKey::Both(key_pair) => Some(key_pair),
1761 }
1762 }
1763
1764 pub fn public_key(&self) -> String {
1765 match self {
1766 SshKey::PublicOnly(s) => s.into(),
1767 SshKey::Both(p) => p.ssh_public_key(),
1768 }
1769 }
1770}
1771
1772#[derive(Clone, Debug)]
1773pub struct Secret {
1774 pub create_sql: String,
1775 pub secret_as: MirScalarExpr,
1776}
1777
1778#[derive(Clone, Debug)]
1779pub struct Sink {
1780 pub create_sql: String,
1782 pub from: GlobalId,
1784 pub connection: StorageSinkConnection<ReferencedConnection>,
1786 pub envelope: SinkEnvelope,
1788 pub version: u64,
1789}
1790
1791#[derive(Clone, Debug)]
1792pub struct View {
1793 pub create_sql: String,
1795 pub expr: HirRelationExpr,
1797 pub dependencies: DependencyIds,
1799 pub column_names: Vec<ColumnName>,
1801 pub temporary: bool,
1803}
1804
1805#[derive(Clone, Debug)]
1806pub struct MaterializedView {
1807 pub create_sql: String,
1809 pub expr: HirRelationExpr,
1811 pub dependencies: DependencyIds,
1813 pub column_names: Vec<ColumnName>,
1815 pub cluster_id: ClusterId,
1817 pub non_null_assertions: Vec<usize>,
1818 pub compaction_window: Option<CompactionWindow>,
1819 pub refresh_schedule: Option<RefreshSchedule>,
1820 pub as_of: Option<Timestamp>,
1821}
1822
1823#[derive(Clone, Debug)]
1824pub struct Index {
1825 pub create_sql: String,
1827 pub on: GlobalId,
1829 pub keys: Vec<mz_expr::MirScalarExpr>,
1830 pub compaction_window: Option<CompactionWindow>,
1831 pub cluster_id: ClusterId,
1832}
1833
1834#[derive(Clone, Debug)]
1835pub struct Type {
1836 pub create_sql: String,
1837 pub inner: CatalogType<IdReference>,
1838}
1839
1840#[derive(Deserialize, Clone, Debug, PartialEq)]
1842pub enum QueryWhen {
1843 Immediately,
1846 FreshestTableWrite,
1849 AtTimestamp(Timestamp),
1854 AtLeastTimestamp(Timestamp),
1857}
1858
1859impl QueryWhen {
1860 pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1862 match self {
1863 QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1864 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1865 }
1866 }
1867 pub fn constrains_upper(&self) -> bool {
1871 match self {
1872 QueryWhen::AtTimestamp(_) => true,
1873 QueryWhen::AtLeastTimestamp(_)
1874 | QueryWhen::Immediately
1875 | QueryWhen::FreshestTableWrite => false,
1876 }
1877 }
1878 pub fn advance_to_since(&self) -> bool {
1880 match self {
1881 QueryWhen::Immediately
1882 | QueryWhen::AtLeastTimestamp(_)
1883 | QueryWhen::FreshestTableWrite => true,
1884 QueryWhen::AtTimestamp(_) => false,
1885 }
1886 }
1887 pub fn can_advance_to_upper(&self) -> bool {
1889 match self {
1890 QueryWhen::Immediately => true,
1891 QueryWhen::FreshestTableWrite
1892 | QueryWhen::AtTimestamp(_)
1893 | QueryWhen::AtLeastTimestamp(_) => false,
1894 }
1895 }
1896
1897 pub fn can_advance_to_timeline_ts(&self) -> bool {
1899 match self {
1900 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1901 QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1902 }
1903 }
1904 pub fn must_advance_to_timeline_ts(&self) -> bool {
1906 match self {
1907 QueryWhen::FreshestTableWrite => true,
1908 QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1909 false
1910 }
1911 }
1912 }
1913 pub fn is_transactional(&self) -> bool {
1915 match self {
1916 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1917 QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1918 }
1919 }
1920}
1921
1922#[derive(Debug, Copy, Clone)]
1923pub enum MutationKind {
1924 Insert,
1925 Update,
1926 Delete,
1927}
1928
1929#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1930pub enum CopyFormat {
1931 Text,
1932 Csv,
1933 Binary,
1934 Parquet,
1935}
1936
1937#[derive(Debug, Copy, Clone)]
1938pub enum ExecuteTimeout {
1939 None,
1940 Seconds(f64),
1941 WaitOnce,
1942}
1943
1944#[derive(Clone, Debug)]
1945pub enum IndexOption {
1946 RetainHistory(CompactionWindow),
1948}
1949
1950#[derive(Clone, Debug)]
1951pub enum TableOption {
1952 RetainHistory(CompactionWindow),
1954}
1955
1956#[derive(Clone, Debug)]
1957pub struct PlanClusterOption {
1958 pub availability_zones: AlterOptionParameter<Vec<String>>,
1959 pub introspection_debugging: AlterOptionParameter<bool>,
1960 pub introspection_interval: AlterOptionParameter<OptionalDuration>,
1961 pub managed: AlterOptionParameter<bool>,
1962 pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
1963 pub replication_factor: AlterOptionParameter<u32>,
1964 pub size: AlterOptionParameter,
1965 pub schedule: AlterOptionParameter<ClusterSchedule>,
1966 pub workload_class: AlterOptionParameter<Option<String>>,
1967}
1968
1969impl Default for PlanClusterOption {
1970 fn default() -> Self {
1971 Self {
1972 availability_zones: AlterOptionParameter::Unchanged,
1973 introspection_debugging: AlterOptionParameter::Unchanged,
1974 introspection_interval: AlterOptionParameter::Unchanged,
1975 managed: AlterOptionParameter::Unchanged,
1976 replicas: AlterOptionParameter::Unchanged,
1977 replication_factor: AlterOptionParameter::Unchanged,
1978 size: AlterOptionParameter::Unchanged,
1979 schedule: AlterOptionParameter::Unchanged,
1980 workload_class: AlterOptionParameter::Unchanged,
1981 }
1982 }
1983}
1984
1985#[derive(Clone, Debug, PartialEq, Eq)]
1986pub enum AlterClusterPlanStrategy {
1987 None,
1988 For(Duration),
1989 UntilReady {
1990 on_timeout: OnTimeoutAction,
1991 timeout: Duration,
1992 },
1993}
1994
1995#[derive(Clone, Debug, PartialEq, Eq)]
1996pub enum OnTimeoutAction {
1997 Commit,
1998 Rollback,
1999}
2000
2001impl Default for OnTimeoutAction {
2002 fn default() -> Self {
2003 Self::Commit
2004 }
2005}
2006
2007impl TryFrom<&str> for OnTimeoutAction {
2008 type Error = PlanError;
2009 fn try_from(value: &str) -> Result<Self, Self::Error> {
2010 match value.to_uppercase().as_str() {
2011 "COMMIT" => Ok(Self::Commit),
2012 "ROLLBACK" => Ok(Self::Rollback),
2013 _ => Err(PlanError::Unstructured(
2014 "Valid options are COMMIT, ROLLBACK".into(),
2015 )),
2016 }
2017 }
2018}
2019
2020impl AlterClusterPlanStrategy {
2021 pub fn is_none(&self) -> bool {
2022 matches!(self, Self::None)
2023 }
2024 pub fn is_some(&self) -> bool {
2025 !matches!(self, Self::None)
2026 }
2027}
2028
2029impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2030 type Error = PlanError;
2031
2032 fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2033 Ok(match value.wait {
2034 Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2035 Some(ClusterAlterOptionValue::UntilReady(options)) => {
2036 let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2037 Self::UntilReady {
2038 timeout: match extracted.timeout {
2039 Some(d) => d,
2040 None => Err(PlanError::UntilReadyTimeoutRequired)?,
2041 },
2042 on_timeout: match extracted.on_timeout {
2043 Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2044 PlanError::InvalidOptionValue {
2045 option_name: "ON TIMEOUT".into(),
2046 err: Box::new(e),
2047 }
2048 })?,
2049 None => OnTimeoutAction::default(),
2050 },
2051 }
2052 }
2053 None => Self::None,
2054 })
2055 }
2056}
2057
2058#[derive(Debug, Clone)]
2060pub struct Params {
2061 pub datums: Row,
2063 pub execute_types: Vec<SqlScalarType>,
2065 pub expected_types: Vec<SqlScalarType>,
2067}
2068
2069impl Params {
2070 pub fn empty() -> Params {
2072 Params {
2073 datums: Row::pack_slice(&[]),
2074 execute_types: vec![],
2075 expected_types: vec![],
2076 }
2077 }
2078}
2079
2080#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, Copy)]
2082pub struct PlanContext {
2083 pub wall_time: DateTime<Utc>,
2084 pub ignore_if_exists_errors: bool,
2085}
2086
2087impl PlanContext {
2088 pub fn new(wall_time: DateTime<Utc>) -> Self {
2089 Self {
2090 wall_time,
2091 ignore_if_exists_errors: false,
2092 }
2093 }
2094
2095 pub fn zero() -> Self {
2099 PlanContext {
2100 wall_time: now::to_datetime(NOW_ZERO()),
2101 ignore_if_exists_errors: false,
2102 }
2103 }
2104
2105 pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2106 self.ignore_if_exists_errors = value;
2107 self
2108 }
2109}