1use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{
41 CollectionPlan, ColumnOrder, MapFilterProject, MirRelationExpr, MirScalarExpr, RowSetFinishing,
42};
43use mz_ore::now::{self, NOW_ZERO};
44use mz_pgcopy::CopyFormatParams;
45use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
46use mz_repr::explain::{ExplainConfig, ExplainFormat};
47use mz_repr::network_policy_id::NetworkPolicyId;
48use mz_repr::optimize::OptimizerFeatureOverrides;
49use mz_repr::refresh_schedule::RefreshSchedule;
50use mz_repr::role_id::RoleId;
51use mz_repr::{
52 CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, Row, SqlColumnType,
53 SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
54};
55use mz_sql_parser::ast::{
56 AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
57 RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
58 Value, WithOptionValue,
59};
60use mz_ssh_util::keys::SshKeyPair;
61use mz_storage_types::connections::aws::AwsConnection;
62use mz_storage_types::connections::inline::ReferencedConnection;
63use mz_storage_types::connections::{
64 AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
65 MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
66};
67use mz_storage_types::instances::StorageInstanceId;
68use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
69use mz_storage_types::sources::{
70 SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
71};
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74
75use crate::ast::{
76 ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
77 TransactionAccessMode,
78};
79use crate::catalog::{
80 CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
81 RoleAttributesRaw,
82};
83use crate::names::{
84 Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
85 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
86};
87
88pub(crate) mod error;
89pub(crate) mod explain;
90pub(crate) mod hir;
91pub(crate) mod literal;
92pub(crate) mod lowering;
93pub(crate) mod notice;
94pub(crate) mod plan_utils;
95pub(crate) mod query;
96pub(crate) mod scope;
97pub(crate) mod side_effecting_func;
98pub(crate) mod statement;
99pub(crate) mod transform_ast;
100pub(crate) mod transform_hir;
101pub(crate) mod typeconv;
102pub(crate) mod with_options;
103
104use crate::plan;
105use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
106use crate::plan::with_options::OptionalDuration;
107pub use error::PlanError;
108pub use explain::normalize_subqueries;
109pub use hir::{
110 AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
111 WindowExprType,
112};
113pub use lowering::Config as HirToMirConfig;
114pub use notice::PlanNotice;
115pub use query::{ExprContext, QueryContext, QueryLifetime};
116pub use scope::Scope;
117pub use side_effecting_func::SideEffectingFunc;
118pub use statement::ddl::{
119 AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
120 PlannedAlterRoleOption, PlannedRoleVariable, SqlServerConfigOptionExtracted,
121};
122pub use statement::{
123 StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
124 resolve_cluster_for_materialized_view,
125};
126
127use self::statement::ddl::ClusterAlterOptionExtracted;
128use self::with_options::TryFromValue;
129
130#[derive(Debug, EnumKind)]
132#[enum_kind(PlanKind)]
133pub enum Plan {
134 CreateConnection(CreateConnectionPlan),
135 CreateDatabase(CreateDatabasePlan),
136 CreateSchema(CreateSchemaPlan),
137 CreateRole(CreateRolePlan),
138 CreateCluster(CreateClusterPlan),
139 CreateClusterReplica(CreateClusterReplicaPlan),
140 CreateSource(CreateSourcePlan),
141 CreateSources(Vec<CreateSourcePlanBundle>),
142 CreateSecret(CreateSecretPlan),
143 CreateSink(CreateSinkPlan),
144 CreateTable(CreateTablePlan),
145 CreateView(CreateViewPlan),
146 CreateMaterializedView(CreateMaterializedViewPlan),
147 CreateContinualTask(CreateContinualTaskPlan),
148 CreateNetworkPolicy(CreateNetworkPolicyPlan),
149 CreateIndex(CreateIndexPlan),
150 CreateType(CreateTypePlan),
151 Comment(CommentPlan),
152 DiscardTemp,
153 DiscardAll,
154 DropObjects(DropObjectsPlan),
155 DropOwned(DropOwnedPlan),
156 EmptyQuery,
157 ShowAllVariables,
158 ShowCreate(ShowCreatePlan),
159 ShowColumns(ShowColumnsPlan),
160 ShowVariable(ShowVariablePlan),
161 InspectShard(InspectShardPlan),
162 SetVariable(SetVariablePlan),
163 ResetVariable(ResetVariablePlan),
164 SetTransaction(SetTransactionPlan),
165 StartTransaction(StartTransactionPlan),
166 CommitTransaction(CommitTransactionPlan),
167 AbortTransaction(AbortTransactionPlan),
168 Select(SelectPlan),
169 Subscribe(SubscribePlan),
170 CopyFrom(CopyFromPlan),
171 CopyTo(CopyToPlan),
172 ExplainPlan(ExplainPlanPlan),
173 ExplainPushdown(ExplainPushdownPlan),
174 ExplainTimestamp(ExplainTimestampPlan),
175 ExplainSinkSchema(ExplainSinkSchemaPlan),
176 Insert(InsertPlan),
177 AlterCluster(AlterClusterPlan),
178 AlterClusterSwap(AlterClusterSwapPlan),
179 AlterNoop(AlterNoopPlan),
180 AlterSetCluster(AlterSetClusterPlan),
181 AlterConnection(AlterConnectionPlan),
182 AlterSource(AlterSourcePlan),
183 AlterClusterRename(AlterClusterRenamePlan),
184 AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
185 AlterItemRename(AlterItemRenamePlan),
186 AlterSchemaRename(AlterSchemaRenamePlan),
187 AlterSchemaSwap(AlterSchemaSwapPlan),
188 AlterSecret(AlterSecretPlan),
189 AlterSink(AlterSinkPlan),
190 AlterSystemSet(AlterSystemSetPlan),
191 AlterSystemReset(AlterSystemResetPlan),
192 AlterSystemResetAll(AlterSystemResetAllPlan),
193 AlterRole(AlterRolePlan),
194 AlterOwner(AlterOwnerPlan),
195 AlterTableAddColumn(AlterTablePlan),
196 AlterNetworkPolicy(AlterNetworkPolicyPlan),
197 Declare(DeclarePlan),
198 Fetch(FetchPlan),
199 Close(ClosePlan),
200 ReadThenWrite(ReadThenWritePlan),
201 Prepare(PreparePlan),
202 Execute(ExecutePlan),
203 Deallocate(DeallocatePlan),
204 Raise(RaisePlan),
205 GrantRole(GrantRolePlan),
206 RevokeRole(RevokeRolePlan),
207 GrantPrivileges(GrantPrivilegesPlan),
208 RevokePrivileges(RevokePrivilegesPlan),
209 AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
210 ReassignOwned(ReassignOwnedPlan),
211 SideEffectingFunc(SideEffectingFunc),
212 ValidateConnection(ValidateConnectionPlan),
213 AlterRetainHistory(AlterRetainHistoryPlan),
214}
215
216impl Plan {
217 pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
220 match stmt {
221 StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
222 StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
223 StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
224 StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
225 StatementKind::AlterObjectRename => &[
226 PlanKind::AlterClusterRename,
227 PlanKind::AlterClusterReplicaRename,
228 PlanKind::AlterItemRename,
229 PlanKind::AlterSchemaRename,
230 PlanKind::AlterNoop,
231 ],
232 StatementKind::AlterObjectSwap => &[
233 PlanKind::AlterClusterSwap,
234 PlanKind::AlterSchemaSwap,
235 PlanKind::AlterNoop,
236 ],
237 StatementKind::AlterRole => &[PlanKind::AlterRole],
238 StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
239 StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
240 StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
241 StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
242 StatementKind::AlterSource => &[
243 PlanKind::AlterNoop,
244 PlanKind::AlterSource,
245 PlanKind::AlterRetainHistory,
246 ],
247 StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
248 StatementKind::AlterSystemResetAll => {
249 &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
250 }
251 StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
252 StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
253 StatementKind::AlterTableAddColumn => {
254 &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
255 }
256 StatementKind::Close => &[PlanKind::Close],
257 StatementKind::Comment => &[PlanKind::Comment],
258 StatementKind::Commit => &[PlanKind::CommitTransaction],
259 StatementKind::Copy => &[
260 PlanKind::CopyFrom,
261 PlanKind::Select,
262 PlanKind::Subscribe,
263 PlanKind::CopyTo,
264 ],
265 StatementKind::CreateCluster => &[PlanKind::CreateCluster],
266 StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
267 StatementKind::CreateConnection => &[PlanKind::CreateConnection],
268 StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
269 StatementKind::CreateIndex => &[PlanKind::CreateIndex],
270 StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
271 StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
272 StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
273 StatementKind::CreateRole => &[PlanKind::CreateRole],
274 StatementKind::CreateSchema => &[PlanKind::CreateSchema],
275 StatementKind::CreateSecret => &[PlanKind::CreateSecret],
276 StatementKind::CreateSink => &[PlanKind::CreateSink],
277 StatementKind::CreateSource | StatementKind::CreateSubsource => {
278 &[PlanKind::CreateSource]
279 }
280 StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
281 StatementKind::CreateTable => &[PlanKind::CreateTable],
282 StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
283 StatementKind::CreateType => &[PlanKind::CreateType],
284 StatementKind::CreateView => &[PlanKind::CreateView],
285 StatementKind::Deallocate => &[PlanKind::Deallocate],
286 StatementKind::Declare => &[PlanKind::Declare],
287 StatementKind::Delete => &[PlanKind::ReadThenWrite],
288 StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
289 StatementKind::DropObjects => &[PlanKind::DropObjects],
290 StatementKind::DropOwned => &[PlanKind::DropOwned],
291 StatementKind::Execute => &[PlanKind::Execute],
292 StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
293 StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
294 StatementKind::ExplainAnalyze => &[PlanKind::Select],
295 StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
296 StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
297 StatementKind::Fetch => &[PlanKind::Fetch],
298 StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
299 StatementKind::GrantRole => &[PlanKind::GrantRole],
300 StatementKind::Insert => &[PlanKind::Insert],
301 StatementKind::Prepare => &[PlanKind::Prepare],
302 StatementKind::Raise => &[PlanKind::Raise],
303 StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
304 StatementKind::ResetVariable => &[PlanKind::ResetVariable],
305 StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
306 StatementKind::RevokeRole => &[PlanKind::RevokeRole],
307 StatementKind::Rollback => &[PlanKind::AbortTransaction],
308 StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
309 StatementKind::SetTransaction => &[PlanKind::SetTransaction],
310 StatementKind::SetVariable => &[PlanKind::SetVariable],
311 StatementKind::Show => &[
312 PlanKind::Select,
313 PlanKind::ShowVariable,
314 PlanKind::ShowCreate,
315 PlanKind::ShowColumns,
316 PlanKind::ShowAllVariables,
317 PlanKind::InspectShard,
318 ],
319 StatementKind::StartTransaction => &[PlanKind::StartTransaction],
320 StatementKind::Subscribe => &[PlanKind::Subscribe],
321 StatementKind::Update => &[PlanKind::ReadThenWrite],
322 StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
323 StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
324 }
325 }
326
327 pub fn name(&self) -> &str {
329 match self {
330 Plan::CreateConnection(_) => "create connection",
331 Plan::CreateDatabase(_) => "create database",
332 Plan::CreateSchema(_) => "create schema",
333 Plan::CreateRole(_) => "create role",
334 Plan::CreateCluster(_) => "create cluster",
335 Plan::CreateClusterReplica(_) => "create cluster replica",
336 Plan::CreateSource(_) => "create source",
337 Plan::CreateSources(_) => "create source",
338 Plan::CreateSecret(_) => "create secret",
339 Plan::CreateSink(_) => "create sink",
340 Plan::CreateTable(_) => "create table",
341 Plan::CreateView(_) => "create view",
342 Plan::CreateMaterializedView(_) => "create materialized view",
343 Plan::CreateContinualTask(_) => "create continual task",
344 Plan::CreateIndex(_) => "create index",
345 Plan::CreateType(_) => "create type",
346 Plan::CreateNetworkPolicy(_) => "create network policy",
347 Plan::Comment(_) => "comment",
348 Plan::DiscardTemp => "discard temp",
349 Plan::DiscardAll => "discard all",
350 Plan::DropObjects(plan) => match plan.object_type {
351 ObjectType::Table => "drop table",
352 ObjectType::View => "drop view",
353 ObjectType::MaterializedView => "drop materialized view",
354 ObjectType::Source => "drop source",
355 ObjectType::Sink => "drop sink",
356 ObjectType::Index => "drop index",
357 ObjectType::Type => "drop type",
358 ObjectType::Role => "drop roles",
359 ObjectType::Cluster => "drop clusters",
360 ObjectType::ClusterReplica => "drop cluster replicas",
361 ObjectType::Secret => "drop secret",
362 ObjectType::Connection => "drop connection",
363 ObjectType::Database => "drop database",
364 ObjectType::Schema => "drop schema",
365 ObjectType::Func => "drop function",
366 ObjectType::ContinualTask => "drop continual task",
367 ObjectType::NetworkPolicy => "drop network policy",
368 },
369 Plan::DropOwned(_) => "drop owned",
370 Plan::EmptyQuery => "do nothing",
371 Plan::ShowAllVariables => "show all variables",
372 Plan::ShowCreate(_) => "show create",
373 Plan::ShowColumns(_) => "show columns",
374 Plan::ShowVariable(_) => "show variable",
375 Plan::InspectShard(_) => "inspect shard",
376 Plan::SetVariable(_) => "set variable",
377 Plan::ResetVariable(_) => "reset variable",
378 Plan::SetTransaction(_) => "set transaction",
379 Plan::StartTransaction(_) => "start transaction",
380 Plan::CommitTransaction(_) => "commit",
381 Plan::AbortTransaction(_) => "abort",
382 Plan::Select(_) => "select",
383 Plan::Subscribe(_) => "subscribe",
384 Plan::CopyFrom(_) => "copy from",
385 Plan::CopyTo(_) => "copy to",
386 Plan::ExplainPlan(_) => "explain plan",
387 Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
388 Plan::ExplainTimestamp(_) => "explain timestamp",
389 Plan::ExplainSinkSchema(_) => "explain schema",
390 Plan::Insert(_) => "insert",
391 Plan::AlterNoop(plan) => match plan.object_type {
392 ObjectType::Table => "alter table",
393 ObjectType::View => "alter view",
394 ObjectType::MaterializedView => "alter materialized view",
395 ObjectType::Source => "alter source",
396 ObjectType::Sink => "alter sink",
397 ObjectType::Index => "alter index",
398 ObjectType::Type => "alter type",
399 ObjectType::Role => "alter role",
400 ObjectType::Cluster => "alter cluster",
401 ObjectType::ClusterReplica => "alter cluster replica",
402 ObjectType::Secret => "alter secret",
403 ObjectType::Connection => "alter connection",
404 ObjectType::Database => "alter database",
405 ObjectType::Schema => "alter schema",
406 ObjectType::Func => "alter function",
407 ObjectType::ContinualTask => "alter continual task",
408 ObjectType::NetworkPolicy => "alter network policy",
409 },
410 Plan::AlterCluster(_) => "alter cluster",
411 Plan::AlterClusterRename(_) => "alter cluster rename",
412 Plan::AlterClusterSwap(_) => "alter cluster swap",
413 Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
414 Plan::AlterSetCluster(_) => "alter set cluster",
415 Plan::AlterConnection(_) => "alter connection",
416 Plan::AlterSource(_) => "alter source",
417 Plan::AlterItemRename(_) => "rename item",
418 Plan::AlterSchemaRename(_) => "alter rename schema",
419 Plan::AlterSchemaSwap(_) => "alter swap schema",
420 Plan::AlterSecret(_) => "alter secret",
421 Plan::AlterSink(_) => "alter sink",
422 Plan::AlterSystemSet(_) => "alter system",
423 Plan::AlterSystemReset(_) => "alter system",
424 Plan::AlterSystemResetAll(_) => "alter system",
425 Plan::AlterRole(_) => "alter role",
426 Plan::AlterNetworkPolicy(_) => "alter network policy",
427 Plan::AlterOwner(plan) => match plan.object_type {
428 ObjectType::Table => "alter table owner",
429 ObjectType::View => "alter view owner",
430 ObjectType::MaterializedView => "alter materialized view owner",
431 ObjectType::Source => "alter source owner",
432 ObjectType::Sink => "alter sink owner",
433 ObjectType::Index => "alter index owner",
434 ObjectType::Type => "alter type owner",
435 ObjectType::Role => "alter role owner",
436 ObjectType::Cluster => "alter cluster owner",
437 ObjectType::ClusterReplica => "alter cluster replica owner",
438 ObjectType::Secret => "alter secret owner",
439 ObjectType::Connection => "alter connection owner",
440 ObjectType::Database => "alter database owner",
441 ObjectType::Schema => "alter schema owner",
442 ObjectType::Func => "alter function owner",
443 ObjectType::ContinualTask => "alter continual task owner",
444 ObjectType::NetworkPolicy => "alter network policy owner",
445 },
446 Plan::AlterTableAddColumn(_) => "alter table add column",
447 Plan::Declare(_) => "declare",
448 Plan::Fetch(_) => "fetch",
449 Plan::Close(_) => "close",
450 Plan::ReadThenWrite(plan) => match plan.kind {
451 MutationKind::Insert => "insert into select",
452 MutationKind::Update => "update",
453 MutationKind::Delete => "delete",
454 },
455 Plan::Prepare(_) => "prepare",
456 Plan::Execute(_) => "execute",
457 Plan::Deallocate(_) => "deallocate",
458 Plan::Raise(_) => "raise",
459 Plan::GrantRole(_) => "grant role",
460 Plan::RevokeRole(_) => "revoke role",
461 Plan::GrantPrivileges(_) => "grant privilege",
462 Plan::RevokePrivileges(_) => "revoke privilege",
463 Plan::AlterDefaultPrivileges(_) => "alter default privileges",
464 Plan::ReassignOwned(_) => "reassign owned",
465 Plan::SideEffectingFunc(_) => "side effecting func",
466 Plan::ValidateConnection(_) => "validate connection",
467 Plan::AlterRetainHistory(_) => "alter retain history",
468 }
469 }
470
471 pub fn allowed_in_read_only(&self) -> bool {
477 match self {
478 Plan::SetVariable(_) => true,
481 Plan::ResetVariable(_) => true,
482 Plan::SetTransaction(_) => true,
483 Plan::StartTransaction(_) => true,
484 Plan::CommitTransaction(_) => true,
485 Plan::AbortTransaction(_) => true,
486 Plan::Select(_) => true,
487 Plan::EmptyQuery => true,
488 Plan::ShowAllVariables => true,
489 Plan::ShowCreate(_) => true,
490 Plan::ShowColumns(_) => true,
491 Plan::ShowVariable(_) => true,
492 Plan::InspectShard(_) => true,
493 Plan::Subscribe(_) => true,
494 Plan::CopyTo(_) => true,
495 Plan::ExplainPlan(_) => true,
496 Plan::ExplainPushdown(_) => true,
497 Plan::ExplainTimestamp(_) => true,
498 Plan::ExplainSinkSchema(_) => true,
499 Plan::ValidateConnection(_) => true,
500 _ => false,
501 }
502 }
503}
504
505#[derive(Debug)]
506pub struct StartTransactionPlan {
507 pub access: Option<TransactionAccessMode>,
508 pub isolation_level: Option<TransactionIsolationLevel>,
509}
510
511#[derive(Debug)]
512pub enum TransactionType {
513 Explicit,
514 Implicit,
515}
516
517impl TransactionType {
518 pub fn is_explicit(&self) -> bool {
519 matches!(self, TransactionType::Explicit)
520 }
521
522 pub fn is_implicit(&self) -> bool {
523 matches!(self, TransactionType::Implicit)
524 }
525}
526
527#[derive(Debug)]
528pub struct CommitTransactionPlan {
529 pub transaction_type: TransactionType,
530}
531
532#[derive(Debug)]
533pub struct AbortTransactionPlan {
534 pub transaction_type: TransactionType,
535}
536
537#[derive(Debug)]
538pub struct CreateDatabasePlan {
539 pub name: String,
540 pub if_not_exists: bool,
541}
542
543#[derive(Debug)]
544pub struct CreateSchemaPlan {
545 pub database_spec: ResolvedDatabaseSpecifier,
546 pub schema_name: String,
547 pub if_not_exists: bool,
548}
549
550#[derive(Debug)]
551pub struct CreateRolePlan {
552 pub name: String,
553 pub attributes: RoleAttributesRaw,
554}
555
556#[derive(Debug, PartialEq, Eq, Clone)]
557pub struct CreateClusterPlan {
558 pub name: String,
559 pub variant: CreateClusterVariant,
560 pub workload_class: Option<String>,
561}
562
563#[derive(Debug, PartialEq, Eq, Clone)]
564pub enum CreateClusterVariant {
565 Managed(CreateClusterManagedPlan),
566 Unmanaged(CreateClusterUnmanagedPlan),
567}
568
569#[derive(Debug, PartialEq, Eq, Clone)]
570pub struct CreateClusterUnmanagedPlan {
571 pub replicas: Vec<(String, ReplicaConfig)>,
572}
573
574#[derive(Debug, PartialEq, Eq, Clone)]
575pub struct CreateClusterManagedPlan {
576 pub replication_factor: u32,
577 pub size: String,
578 pub availability_zones: Vec<String>,
579 pub compute: ComputeReplicaConfig,
580 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
581 pub schedule: ClusterSchedule,
582}
583
584#[derive(Debug)]
585pub struct CreateClusterReplicaPlan {
586 pub cluster_id: ClusterId,
587 pub name: String,
588 pub config: ReplicaConfig,
589}
590
591#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq)]
593pub struct ComputeReplicaIntrospectionConfig {
594 pub debugging: bool,
596 pub interval: Duration,
598}
599
600#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
601pub struct ComputeReplicaConfig {
602 pub introspection: Option<ComputeReplicaIntrospectionConfig>,
603}
604
605#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
606pub enum ReplicaConfig {
607 Unorchestrated {
608 storagectl_addrs: Vec<String>,
609 computectl_addrs: Vec<String>,
610 compute: ComputeReplicaConfig,
611 },
612 Orchestrated {
613 size: String,
614 availability_zone: Option<String>,
615 compute: ComputeReplicaConfig,
616 internal: bool,
617 billed_as: Option<String>,
618 },
619}
620
621#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
622pub enum ClusterSchedule {
623 Manual,
625 Refresh { hydration_time_estimate: Duration },
629}
630
631impl Default for ClusterSchedule {
632 fn default() -> Self {
633 ClusterSchedule::Manual
635 }
636}
637
638#[derive(Debug)]
639pub struct CreateSourcePlan {
640 pub name: QualifiedItemName,
641 pub source: Source,
642 pub if_not_exists: bool,
643 pub timeline: Timeline,
644 pub in_cluster: Option<ClusterId>,
646}
647
648#[derive(Clone, Debug, PartialEq, Eq)]
649pub struct SourceReferences {
650 pub updated_at: u64,
651 pub references: Vec<SourceReference>,
652}
653
654#[derive(Clone, Debug, PartialEq, Eq)]
657pub struct SourceReference {
658 pub name: String,
659 pub namespace: Option<String>,
660 pub columns: Vec<String>,
661}
662
663#[derive(Debug)]
665pub struct CreateSourcePlanBundle {
666 pub item_id: CatalogItemId,
668 pub global_id: GlobalId,
670 pub plan: CreateSourcePlan,
672 pub resolved_ids: ResolvedIds,
674 pub available_source_references: Option<SourceReferences>,
678}
679
680#[derive(Debug)]
681pub struct CreateConnectionPlan {
682 pub name: QualifiedItemName,
683 pub if_not_exists: bool,
684 pub connection: Connection,
685 pub validate: bool,
686}
687
688#[derive(Debug)]
689pub struct ValidateConnectionPlan {
690 pub id: CatalogItemId,
692 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
694}
695
696#[derive(Debug)]
697pub struct CreateSecretPlan {
698 pub name: QualifiedItemName,
699 pub secret: Secret,
700 pub if_not_exists: bool,
701}
702
703#[derive(Debug)]
704pub struct CreateSinkPlan {
705 pub name: QualifiedItemName,
706 pub sink: Sink,
707 pub with_snapshot: bool,
708 pub if_not_exists: bool,
709 pub in_cluster: ClusterId,
710}
711
712#[derive(Debug)]
713pub struct CreateTablePlan {
714 pub name: QualifiedItemName,
715 pub table: Table,
716 pub if_not_exists: bool,
717}
718
719#[derive(Debug, Clone)]
720pub struct CreateViewPlan {
721 pub name: QualifiedItemName,
722 pub view: View,
723 pub replace: Option<CatalogItemId>,
725 pub drop_ids: Vec<CatalogItemId>,
727 pub if_not_exists: bool,
728 pub ambiguous_columns: bool,
731}
732
733#[derive(Debug, Clone)]
734pub struct CreateMaterializedViewPlan {
735 pub name: QualifiedItemName,
736 pub materialized_view: MaterializedView,
737 pub replace: Option<CatalogItemId>,
739 pub drop_ids: Vec<CatalogItemId>,
741 pub if_not_exists: bool,
742 pub ambiguous_columns: bool,
745}
746
747#[derive(Debug, Clone)]
748pub struct CreateContinualTaskPlan {
749 pub name: QualifiedItemName,
750 pub placeholder_id: Option<mz_expr::LocalId>,
753 pub desc: RelationDesc,
754 pub input_id: GlobalId,
756 pub with_snapshot: bool,
757 pub continual_task: MaterializedView,
759}
760
761#[derive(Debug, Clone)]
762pub struct CreateNetworkPolicyPlan {
763 pub name: String,
764 pub rules: Vec<NetworkPolicyRule>,
765}
766
767#[derive(Debug, Clone)]
768pub struct AlterNetworkPolicyPlan {
769 pub id: NetworkPolicyId,
770 pub name: String,
771 pub rules: Vec<NetworkPolicyRule>,
772}
773
774#[derive(Debug, Clone)]
775pub struct CreateIndexPlan {
776 pub name: QualifiedItemName,
777 pub index: Index,
778 pub if_not_exists: bool,
779}
780
781#[derive(Debug)]
782pub struct CreateTypePlan {
783 pub name: QualifiedItemName,
784 pub typ: Type,
785}
786
787#[derive(Debug)]
788pub struct DropObjectsPlan {
789 pub referenced_ids: Vec<ObjectId>,
791 pub drop_ids: Vec<ObjectId>,
793 pub object_type: ObjectType,
796}
797
798#[derive(Debug)]
799pub struct DropOwnedPlan {
800 pub role_ids: Vec<RoleId>,
802 pub drop_ids: Vec<ObjectId>,
804 pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
806 pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
808}
809
810#[derive(Debug)]
811pub struct ShowVariablePlan {
812 pub name: String,
813}
814
815#[derive(Debug)]
816pub struct InspectShardPlan {
817 pub id: GlobalId,
819}
820
821#[derive(Debug)]
822pub struct SetVariablePlan {
823 pub name: String,
824 pub value: VariableValue,
825 pub local: bool,
826}
827
828#[derive(Debug)]
829pub enum VariableValue {
830 Default,
831 Values(Vec<String>),
832}
833
834#[derive(Debug)]
835pub struct ResetVariablePlan {
836 pub name: String,
837}
838
839#[derive(Debug)]
840pub struct SetTransactionPlan {
841 pub local: bool,
842 pub modes: Vec<TransactionMode>,
843}
844
845#[derive(Clone, Debug)]
847pub struct SelectPlan {
848 pub select: Option<Box<SelectStatement<Aug>>>,
851 pub source: HirRelationExpr,
853 pub when: QueryWhen,
855 pub finishing: RowSetFinishing,
857 pub copy_to: Option<CopyFormat>,
859}
860
861impl SelectPlan {
862 pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
863 let arity = typ.arity();
864 SelectPlan {
865 select: None,
866 source: HirRelationExpr::Constant { rows, typ },
867 when: QueryWhen::Immediately,
868 finishing: RowSetFinishing::trivial(arity),
869 copy_to: None,
870 }
871 }
872}
873
874#[derive(Debug)]
875pub enum SubscribeOutput {
876 Diffs,
877 WithinTimestampOrderBy {
878 order_by: Vec<ColumnOrder>,
880 },
881 EnvelopeUpsert {
882 order_by_keys: Vec<ColumnOrder>,
884 },
885 EnvelopeDebezium {
886 order_by_keys: Vec<ColumnOrder>,
888 },
889}
890
891#[derive(Debug)]
892pub struct SubscribePlan {
893 pub from: SubscribeFrom,
894 pub with_snapshot: bool,
895 pub when: QueryWhen,
896 pub up_to: Option<Timestamp>,
897 pub copy_to: Option<CopyFormat>,
898 pub emit_progress: bool,
899 pub output: SubscribeOutput,
900}
901
902#[derive(Debug, Clone)]
903pub enum SubscribeFrom {
904 Id(GlobalId),
906 Query {
908 expr: MirRelationExpr,
909 desc: RelationDesc,
910 },
911}
912
913impl SubscribeFrom {
914 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
915 match self {
916 SubscribeFrom::Id(id) => BTreeSet::from([*id]),
917 SubscribeFrom::Query { expr, .. } => expr.depends_on(),
918 }
919 }
920
921 pub fn contains_temporal(&self) -> bool {
922 match self {
923 SubscribeFrom::Id(_) => false,
924 SubscribeFrom::Query { expr, .. } => expr.contains_temporal(),
925 }
926 }
927}
928
929#[derive(Debug)]
930pub struct ShowCreatePlan {
931 pub id: ObjectId,
932 pub row: Row,
933}
934
935#[derive(Debug)]
936pub struct ShowColumnsPlan {
937 pub id: CatalogItemId,
938 pub select_plan: SelectPlan,
939 pub new_resolved_ids: ResolvedIds,
940}
941
942#[derive(Debug)]
943pub struct CopyFromPlan {
944 pub target_id: CatalogItemId,
946 pub target_name: String,
948 pub source: CopyFromSource,
950 pub columns: Vec<ColumnIndex>,
954 pub source_desc: RelationDesc,
956 pub mfp: MapFilterProject,
958 pub params: CopyFormatParams<'static>,
960 pub filter: Option<CopyFromFilter>,
962}
963
964#[derive(Debug)]
965pub enum CopyFromSource {
966 Stdin,
968 Url(HirScalarExpr),
972 AwsS3 {
974 uri: HirScalarExpr,
976 connection: AwsConnection,
978 connection_id: CatalogItemId,
980 },
981}
982
983#[derive(Debug)]
984pub enum CopyFromFilter {
985 Files(Vec<String>),
986 Pattern(String),
987}
988
989#[derive(Debug, Clone)]
990pub struct CopyToPlan {
991 pub select_plan: SelectPlan,
993 pub desc: RelationDesc,
994 pub to: HirScalarExpr,
996 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
997 pub connection_id: CatalogItemId,
999 pub format: S3SinkFormat,
1000 pub max_file_size: u64,
1001}
1002
1003#[derive(Clone, Debug)]
1004pub struct ExplainPlanPlan {
1005 pub stage: ExplainStage,
1006 pub format: ExplainFormat,
1007 pub config: ExplainConfig,
1008 pub explainee: Explainee,
1009}
1010
1011#[derive(Clone, Debug)]
1013pub enum Explainee {
1014 View(CatalogItemId),
1016 MaterializedView(CatalogItemId),
1018 Index(CatalogItemId),
1020 ReplanView(CatalogItemId),
1022 ReplanMaterializedView(CatalogItemId),
1024 ReplanIndex(CatalogItemId),
1026 Statement(ExplaineeStatement),
1028}
1029
1030#[derive(Clone, Debug, EnumKind)]
1032#[enum_kind(ExplaineeStatementKind)]
1033pub enum ExplaineeStatement {
1034 Select {
1036 broken: bool,
1038 plan: plan::SelectPlan,
1039 desc: RelationDesc,
1040 },
1041 CreateView {
1043 broken: bool,
1045 plan: plan::CreateViewPlan,
1046 },
1047 CreateMaterializedView {
1049 broken: bool,
1051 plan: plan::CreateMaterializedViewPlan,
1052 },
1053 CreateIndex {
1055 broken: bool,
1057 plan: plan::CreateIndexPlan,
1058 },
1059}
1060
1061impl ExplaineeStatement {
1062 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1063 match self {
1064 Self::Select { plan, .. } => plan.source.depends_on(),
1065 Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1066 Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1067 Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1068 }
1069 }
1070
1071 pub fn broken(&self) -> bool {
1082 match self {
1083 Self::Select { broken, .. } => *broken,
1084 Self::CreateView { broken, .. } => *broken,
1085 Self::CreateMaterializedView { broken, .. } => *broken,
1086 Self::CreateIndex { broken, .. } => *broken,
1087 }
1088 }
1089}
1090
1091impl ExplaineeStatementKind {
1092 pub fn supports(&self, stage: &ExplainStage) -> bool {
1093 use ExplainStage::*;
1094 match self {
1095 Self::Select => true,
1096 Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1097 Self::CreateMaterializedView => true,
1098 Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1099 }
1100 }
1101}
1102
1103impl std::fmt::Display for ExplaineeStatementKind {
1104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1105 match self {
1106 Self::Select => write!(f, "SELECT"),
1107 Self::CreateView => write!(f, "CREATE VIEW"),
1108 Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1109 Self::CreateIndex => write!(f, "CREATE INDEX"),
1110 }
1111 }
1112}
1113
1114#[derive(Clone, Debug)]
1115pub struct ExplainPushdownPlan {
1116 pub explainee: Explainee,
1117}
1118
1119#[derive(Clone, Debug)]
1120pub struct ExplainTimestampPlan {
1121 pub format: ExplainFormat,
1122 pub raw_plan: HirRelationExpr,
1123 pub when: QueryWhen,
1124}
1125
1126#[derive(Debug)]
1127pub struct ExplainSinkSchemaPlan {
1128 pub sink_from: GlobalId,
1129 pub json_schema: String,
1130}
1131
1132#[derive(Debug)]
1133pub struct SendDiffsPlan {
1134 pub id: CatalogItemId,
1135 pub updates: Vec<(Row, Diff)>,
1136 pub kind: MutationKind,
1137 pub returning: Vec<(Row, NonZeroUsize)>,
1138 pub max_result_size: u64,
1139}
1140
1141#[derive(Debug)]
1142pub struct InsertPlan {
1143 pub id: CatalogItemId,
1144 pub values: HirRelationExpr,
1145 pub returning: Vec<mz_expr::MirScalarExpr>,
1146}
1147
1148#[derive(Debug)]
1149pub struct ReadThenWritePlan {
1150 pub id: CatalogItemId,
1151 pub selection: HirRelationExpr,
1152 pub finishing: RowSetFinishing,
1153 pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1154 pub kind: MutationKind,
1155 pub returning: Vec<mz_expr::MirScalarExpr>,
1156}
1157
1158#[derive(Debug)]
1160pub struct AlterNoopPlan {
1161 pub object_type: ObjectType,
1162}
1163
1164#[derive(Debug)]
1165pub struct AlterSetClusterPlan {
1166 pub id: CatalogItemId,
1167 pub set_cluster: ClusterId,
1168}
1169
1170#[derive(Debug)]
1171pub struct AlterRetainHistoryPlan {
1172 pub id: CatalogItemId,
1173 pub value: Option<Value>,
1174 pub window: CompactionWindow,
1175 pub object_type: ObjectType,
1176}
1177
1178#[derive(Debug, Clone)]
1179
1180pub enum AlterOptionParameter<T = String> {
1181 Set(T),
1182 Reset,
1183 Unchanged,
1184}
1185
1186#[derive(Debug)]
1187pub enum AlterConnectionAction {
1188 RotateKeys,
1189 AlterOptions {
1190 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1191 drop_options: BTreeSet<ConnectionOptionName>,
1192 validate: bool,
1193 },
1194}
1195
1196#[derive(Debug)]
1197pub struct AlterConnectionPlan {
1198 pub id: CatalogItemId,
1199 pub action: AlterConnectionAction,
1200}
1201
1202#[derive(Debug)]
1203pub enum AlterSourceAction {
1204 AddSubsourceExports {
1205 subsources: Vec<CreateSourcePlanBundle>,
1206 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1207 },
1208 RefreshReferences {
1209 references: SourceReferences,
1210 },
1211}
1212
1213#[derive(Debug)]
1214pub struct AlterSourcePlan {
1215 pub item_id: CatalogItemId,
1216 pub ingestion_id: GlobalId,
1217 pub action: AlterSourceAction,
1218}
1219
1220#[derive(Debug, Clone)]
1221pub struct AlterSinkPlan {
1222 pub item_id: CatalogItemId,
1223 pub global_id: GlobalId,
1224 pub sink: Sink,
1225 pub with_snapshot: bool,
1226 pub in_cluster: ClusterId,
1227}
1228
1229#[derive(Debug, Clone)]
1230pub struct AlterClusterPlan {
1231 pub id: ClusterId,
1232 pub name: String,
1233 pub options: PlanClusterOption,
1234 pub strategy: AlterClusterPlanStrategy,
1235}
1236
1237#[derive(Debug)]
1238pub struct AlterClusterRenamePlan {
1239 pub id: ClusterId,
1240 pub name: String,
1241 pub to_name: String,
1242}
1243
1244#[derive(Debug)]
1245pub struct AlterClusterReplicaRenamePlan {
1246 pub cluster_id: ClusterId,
1247 pub replica_id: ReplicaId,
1248 pub name: QualifiedReplica,
1249 pub to_name: String,
1250}
1251
1252#[derive(Debug)]
1253pub struct AlterItemRenamePlan {
1254 pub id: CatalogItemId,
1255 pub current_full_name: FullItemName,
1256 pub to_name: String,
1257 pub object_type: ObjectType,
1258}
1259
1260#[derive(Debug)]
1261pub struct AlterSchemaRenamePlan {
1262 pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1263 pub new_schema_name: String,
1264}
1265
1266#[derive(Debug)]
1267pub struct AlterSchemaSwapPlan {
1268 pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1269 pub schema_a_name: String,
1270 pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1271 pub schema_b_name: String,
1272 pub name_temp: String,
1273}
1274
1275#[derive(Debug)]
1276pub struct AlterClusterSwapPlan {
1277 pub id_a: ClusterId,
1278 pub id_b: ClusterId,
1279 pub name_a: String,
1280 pub name_b: String,
1281 pub name_temp: String,
1282}
1283
1284#[derive(Debug)]
1285pub struct AlterSecretPlan {
1286 pub id: CatalogItemId,
1287 pub secret_as: MirScalarExpr,
1288}
1289
1290#[derive(Debug)]
1291pub struct AlterSystemSetPlan {
1292 pub name: String,
1293 pub value: VariableValue,
1294}
1295
1296#[derive(Debug)]
1297pub struct AlterSystemResetPlan {
1298 pub name: String,
1299}
1300
1301#[derive(Debug)]
1302pub struct AlterSystemResetAllPlan {}
1303
1304#[derive(Debug)]
1305pub struct AlterRolePlan {
1306 pub id: RoleId,
1307 pub name: String,
1308 pub option: PlannedAlterRoleOption,
1309}
1310
1311#[derive(Debug)]
1312pub struct AlterOwnerPlan {
1313 pub id: ObjectId,
1314 pub object_type: ObjectType,
1315 pub new_owner: RoleId,
1316}
1317
1318#[derive(Debug)]
1319pub struct AlterTablePlan {
1320 pub relation_id: CatalogItemId,
1321 pub column_name: ColumnName,
1322 pub column_type: SqlColumnType,
1323 pub raw_sql_type: RawDataType,
1324}
1325
1326#[derive(Debug)]
1327pub struct DeclarePlan {
1328 pub name: String,
1329 pub stmt: Statement<Raw>,
1330 pub sql: String,
1331 pub params: Params,
1332}
1333
1334#[derive(Debug)]
1335pub struct FetchPlan {
1336 pub name: String,
1337 pub count: Option<FetchDirection>,
1338 pub timeout: ExecuteTimeout,
1339}
1340
1341#[derive(Debug)]
1342pub struct ClosePlan {
1343 pub name: String,
1344}
1345
1346#[derive(Debug)]
1347pub struct PreparePlan {
1348 pub name: String,
1349 pub stmt: Statement<Raw>,
1350 pub sql: String,
1351 pub desc: StatementDesc,
1352}
1353
1354#[derive(Debug)]
1355pub struct ExecutePlan {
1356 pub name: String,
1357 pub params: Params,
1358}
1359
1360#[derive(Debug)]
1361pub struct DeallocatePlan {
1362 pub name: Option<String>,
1363}
1364
1365#[derive(Debug)]
1366pub struct RaisePlan {
1367 pub severity: NoticeSeverity,
1368}
1369
1370#[derive(Debug)]
1371pub struct GrantRolePlan {
1372 pub role_ids: Vec<RoleId>,
1374 pub member_ids: Vec<RoleId>,
1376 pub grantor_id: RoleId,
1378}
1379
1380#[derive(Debug)]
1381pub struct RevokeRolePlan {
1382 pub role_ids: Vec<RoleId>,
1384 pub member_ids: Vec<RoleId>,
1386 pub grantor_id: RoleId,
1388}
1389
1390#[derive(Debug)]
1391pub struct UpdatePrivilege {
1392 pub acl_mode: AclMode,
1394 pub target_id: SystemObjectId,
1396 pub grantor: RoleId,
1398}
1399
1400#[derive(Debug)]
1401pub struct GrantPrivilegesPlan {
1402 pub update_privileges: Vec<UpdatePrivilege>,
1404 pub grantees: Vec<RoleId>,
1406}
1407
1408#[derive(Debug)]
1409pub struct RevokePrivilegesPlan {
1410 pub update_privileges: Vec<UpdatePrivilege>,
1412 pub revokees: Vec<RoleId>,
1414}
1415#[derive(Debug)]
1416pub struct AlterDefaultPrivilegesPlan {
1417 pub privilege_objects: Vec<DefaultPrivilegeObject>,
1419 pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1421 pub is_grant: bool,
1423}
1424
1425#[derive(Debug)]
1426pub struct ReassignOwnedPlan {
1427 pub old_roles: Vec<RoleId>,
1429 pub new_role: RoleId,
1431 pub reassign_ids: Vec<ObjectId>,
1433}
1434
1435#[derive(Debug)]
1436pub struct CommentPlan {
1437 pub object_id: CommentObjectId,
1439 pub sub_component: Option<usize>,
1443 pub comment: Option<String>,
1445}
1446
1447#[derive(Clone, Debug)]
1448pub enum TableDataSource {
1449 TableWrites { defaults: Vec<Expr<Aug>> },
1451
1452 DataSource {
1455 desc: DataSourceDesc,
1456 timeline: Timeline,
1457 },
1458}
1459
1460#[derive(Clone, Debug)]
1461pub struct Table {
1462 pub create_sql: String,
1463 pub desc: VersionedRelationDesc,
1464 pub temporary: bool,
1465 pub compaction_window: Option<CompactionWindow>,
1466 pub data_source: TableDataSource,
1467}
1468
1469#[derive(Clone, Debug)]
1470pub struct Source {
1471 pub create_sql: String,
1472 pub data_source: DataSourceDesc,
1473 pub desc: RelationDesc,
1474 pub compaction_window: Option<CompactionWindow>,
1475}
1476
1477#[derive(Debug, Clone)]
1478pub enum DataSourceDesc {
1479 Ingestion(SourceDesc<ReferencedConnection>),
1481 OldSyntaxIngestion {
1483 desc: SourceDesc<ReferencedConnection>,
1484 progress_subsource: CatalogItemId,
1487 data_config: SourceExportDataConfig<ReferencedConnection>,
1488 details: SourceExportDetails,
1489 },
1490 IngestionExport {
1493 ingestion_id: CatalogItemId,
1494 external_reference: UnresolvedItemName,
1495 details: SourceExportDetails,
1496 data_config: SourceExportDataConfig<ReferencedConnection>,
1497 },
1498 Progress,
1500 Webhook {
1502 validate_using: Option<WebhookValidation>,
1503 body_format: WebhookBodyFormat,
1504 headers: WebhookHeaders,
1505 cluster_id: Option<StorageInstanceId>,
1507 },
1508}
1509
1510#[derive(Clone, Debug, Serialize)]
1511pub struct WebhookValidation {
1512 pub expression: MirScalarExpr,
1514 pub relation_desc: RelationDesc,
1516 pub bodies: Vec<(usize, bool)>,
1518 pub headers: Vec<(usize, bool)>,
1520 pub secrets: Vec<WebhookValidationSecret>,
1522}
1523
1524impl WebhookValidation {
1525 const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1526
1527 pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1532 let WebhookValidation {
1533 expression,
1534 relation_desc,
1535 ..
1536 } = self;
1537
1538 let mut expression_ = expression.clone();
1540 let desc_ = relation_desc.clone();
1541 let reduce_task = mz_ore::task::spawn_blocking(
1542 || "webhook-validation-reduce",
1543 move || {
1544 expression_.reduce(&desc_.typ().column_types);
1545 expression_
1546 },
1547 );
1548
1549 match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1550 Ok(Ok(reduced_expr)) => {
1551 *expression = reduced_expr;
1552 Ok(())
1553 }
1554 Ok(Err(_)) => Err("joining task"),
1555 Err(_) => Err("timeout"),
1556 }
1557 }
1558}
1559
1560#[derive(Clone, Debug, Default, Serialize)]
1561pub struct WebhookHeaders {
1562 pub header_column: Option<WebhookHeaderFilters>,
1564 pub mapped_headers: BTreeMap<usize, (String, bool)>,
1566}
1567
1568impl WebhookHeaders {
1569 pub fn num_columns(&self) -> usize {
1571 let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1572 let mapped_headers = self.mapped_headers.len();
1573
1574 header_column + mapped_headers
1575 }
1576}
1577
1578#[derive(Clone, Debug, Default, Serialize)]
1579pub struct WebhookHeaderFilters {
1580 pub block: BTreeSet<String>,
1581 pub allow: BTreeSet<String>,
1582}
1583
1584#[derive(Copy, Clone, Debug, Serialize, Arbitrary)]
1585pub enum WebhookBodyFormat {
1586 Json { array: bool },
1587 Bytes,
1588 Text,
1589}
1590
1591impl From<WebhookBodyFormat> for SqlScalarType {
1592 fn from(value: WebhookBodyFormat) -> Self {
1593 match value {
1594 WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1595 WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1596 WebhookBodyFormat::Text => SqlScalarType::String,
1597 }
1598 }
1599}
1600
1601#[derive(Clone, Debug, Serialize)]
1602pub struct WebhookValidationSecret {
1603 pub id: CatalogItemId,
1605 pub column_idx: usize,
1607 pub use_bytes: bool,
1609}
1610
1611#[derive(Clone, Debug)]
1612pub struct Connection {
1613 pub create_sql: String,
1614 pub details: ConnectionDetails,
1615}
1616
1617#[derive(Clone, Debug, Serialize)]
1618pub enum ConnectionDetails {
1619 Kafka(KafkaConnection<ReferencedConnection>),
1620 Csr(CsrConnection<ReferencedConnection>),
1621 Postgres(PostgresConnection<ReferencedConnection>),
1622 Ssh {
1623 connection: SshConnection,
1624 key_1: SshKey,
1625 key_2: SshKey,
1626 },
1627 Aws(AwsConnection),
1628 AwsPrivatelink(AwsPrivatelinkConnection),
1629 MySql(MySqlConnection<ReferencedConnection>),
1630 SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1631 IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1632}
1633
1634impl ConnectionDetails {
1635 pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1636 match self {
1637 ConnectionDetails::Kafka(c) => {
1638 mz_storage_types::connections::Connection::Kafka(c.clone())
1639 }
1640 ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1641 ConnectionDetails::Postgres(c) => {
1642 mz_storage_types::connections::Connection::Postgres(c.clone())
1643 }
1644 ConnectionDetails::Ssh { connection, .. } => {
1645 mz_storage_types::connections::Connection::Ssh(connection.clone())
1646 }
1647 ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1648 ConnectionDetails::AwsPrivatelink(c) => {
1649 mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1650 }
1651 ConnectionDetails::MySql(c) => {
1652 mz_storage_types::connections::Connection::MySql(c.clone())
1653 }
1654 ConnectionDetails::SqlServer(c) => {
1655 mz_storage_types::connections::Connection::SqlServer(c.clone())
1656 }
1657 ConnectionDetails::IcebergCatalog(c) => {
1658 mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1659 }
1660 }
1661 }
1662}
1663
1664#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1665pub struct NetworkPolicyRule {
1666 pub name: String,
1667 pub action: NetworkPolicyRuleAction,
1668 pub address: PolicyAddress,
1669 pub direction: NetworkPolicyRuleDirection,
1670}
1671
1672#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1673pub enum NetworkPolicyRuleAction {
1674 Allow,
1675}
1676
1677impl std::fmt::Display for NetworkPolicyRuleAction {
1678 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1679 match self {
1680 Self::Allow => write!(f, "allow"),
1681 }
1682 }
1683}
1684impl TryFrom<&str> for NetworkPolicyRuleAction {
1685 type Error = PlanError;
1686 fn try_from(value: &str) -> Result<Self, Self::Error> {
1687 match value.to_uppercase().as_str() {
1688 "ALLOW" => Ok(Self::Allow),
1689 _ => Err(PlanError::Unstructured(
1690 "Allow is the only valid option".into(),
1691 )),
1692 }
1693 }
1694}
1695
1696#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1697pub enum NetworkPolicyRuleDirection {
1698 Ingress,
1699}
1700impl std::fmt::Display for NetworkPolicyRuleDirection {
1701 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1702 match self {
1703 Self::Ingress => write!(f, "ingress"),
1704 }
1705 }
1706}
1707impl TryFrom<&str> for NetworkPolicyRuleDirection {
1708 type Error = PlanError;
1709 fn try_from(value: &str) -> Result<Self, Self::Error> {
1710 match value.to_uppercase().as_str() {
1711 "INGRESS" => Ok(Self::Ingress),
1712 _ => Err(PlanError::Unstructured(
1713 "Ingress is the only valid option".into(),
1714 )),
1715 }
1716 }
1717}
1718
1719#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1720pub struct PolicyAddress(pub IpNet);
1721impl std::fmt::Display for PolicyAddress {
1722 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1723 write!(f, "{}", &self.0.to_string())
1724 }
1725}
1726impl From<String> for PolicyAddress {
1727 fn from(value: String) -> Self {
1728 Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1729 }
1730}
1731impl TryFrom<&str> for PolicyAddress {
1732 type Error = PlanError;
1733 fn try_from(value: &str) -> Result<Self, Self::Error> {
1734 let net = IpNet::from_str(value)
1735 .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1736 Ok(Self(net))
1737 }
1738}
1739
1740impl Serialize for PolicyAddress {
1741 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1742 where
1743 S: serde::Serializer,
1744 {
1745 serializer.serialize_str(&format!("{}", &self.0))
1746 }
1747}
1748
1749#[derive(Clone, Debug, Serialize)]
1750pub enum SshKey {
1751 PublicOnly(String),
1752 Both(SshKeyPair),
1753}
1754
1755impl SshKey {
1756 pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1757 match self {
1758 SshKey::PublicOnly(_) => None,
1759 SshKey::Both(key_pair) => Some(key_pair),
1760 }
1761 }
1762
1763 pub fn public_key(&self) -> String {
1764 match self {
1765 SshKey::PublicOnly(s) => s.into(),
1766 SshKey::Both(p) => p.ssh_public_key(),
1767 }
1768 }
1769}
1770
1771#[derive(Clone, Debug)]
1772pub struct Secret {
1773 pub create_sql: String,
1774 pub secret_as: MirScalarExpr,
1775}
1776
1777#[derive(Clone, Debug)]
1778pub struct Sink {
1779 pub create_sql: String,
1781 pub from: GlobalId,
1783 pub connection: StorageSinkConnection<ReferencedConnection>,
1785 pub envelope: SinkEnvelope,
1787 pub version: u64,
1788}
1789
1790#[derive(Clone, Debug)]
1791pub struct View {
1792 pub create_sql: String,
1794 pub expr: HirRelationExpr,
1796 pub dependencies: DependencyIds,
1798 pub column_names: Vec<ColumnName>,
1800 pub temporary: bool,
1802}
1803
1804#[derive(Clone, Debug)]
1805pub struct MaterializedView {
1806 pub create_sql: String,
1808 pub expr: HirRelationExpr,
1810 pub dependencies: DependencyIds,
1812 pub column_names: Vec<ColumnName>,
1814 pub cluster_id: ClusterId,
1816 pub non_null_assertions: Vec<usize>,
1817 pub compaction_window: Option<CompactionWindow>,
1818 pub refresh_schedule: Option<RefreshSchedule>,
1819 pub as_of: Option<Timestamp>,
1820}
1821
1822#[derive(Clone, Debug)]
1823pub struct Index {
1824 pub create_sql: String,
1826 pub on: GlobalId,
1828 pub keys: Vec<mz_expr::MirScalarExpr>,
1829 pub compaction_window: Option<CompactionWindow>,
1830 pub cluster_id: ClusterId,
1831}
1832
1833#[derive(Clone, Debug)]
1834pub struct Type {
1835 pub create_sql: String,
1836 pub inner: CatalogType<IdReference>,
1837}
1838
1839#[derive(Deserialize, Clone, Debug, PartialEq)]
1841pub enum QueryWhen {
1842 Immediately,
1845 FreshestTableWrite,
1848 AtTimestamp(Timestamp),
1853 AtLeastTimestamp(Timestamp),
1856}
1857
1858impl QueryWhen {
1859 pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1861 match self {
1862 QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1863 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1864 }
1865 }
1866 pub fn constrains_upper(&self) -> bool {
1870 match self {
1871 QueryWhen::AtTimestamp(_) => true,
1872 QueryWhen::AtLeastTimestamp(_)
1873 | QueryWhen::Immediately
1874 | QueryWhen::FreshestTableWrite => false,
1875 }
1876 }
1877 pub fn advance_to_since(&self) -> bool {
1879 match self {
1880 QueryWhen::Immediately
1881 | QueryWhen::AtLeastTimestamp(_)
1882 | QueryWhen::FreshestTableWrite => true,
1883 QueryWhen::AtTimestamp(_) => false,
1884 }
1885 }
1886 pub fn can_advance_to_upper(&self) -> bool {
1888 match self {
1889 QueryWhen::Immediately => true,
1890 QueryWhen::FreshestTableWrite
1891 | QueryWhen::AtTimestamp(_)
1892 | QueryWhen::AtLeastTimestamp(_) => false,
1893 }
1894 }
1895
1896 pub fn can_advance_to_timeline_ts(&self) -> bool {
1898 match self {
1899 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1900 QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1901 }
1902 }
1903 pub fn must_advance_to_timeline_ts(&self) -> bool {
1905 match self {
1906 QueryWhen::FreshestTableWrite => true,
1907 QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1908 false
1909 }
1910 }
1911 }
1912 pub fn is_transactional(&self) -> bool {
1914 match self {
1915 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1916 QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1917 }
1918 }
1919}
1920
1921#[derive(Debug, Copy, Clone)]
1922pub enum MutationKind {
1923 Insert,
1924 Update,
1925 Delete,
1926}
1927
1928#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1929pub enum CopyFormat {
1930 Text,
1931 Csv,
1932 Binary,
1933 Parquet,
1934}
1935
1936#[derive(Debug, Copy, Clone)]
1937pub enum ExecuteTimeout {
1938 None,
1939 Seconds(f64),
1940 WaitOnce,
1941}
1942
1943#[derive(Clone, Debug)]
1944pub enum IndexOption {
1945 RetainHistory(CompactionWindow),
1947}
1948
1949#[derive(Clone, Debug)]
1950pub enum TableOption {
1951 RetainHistory(CompactionWindow),
1953}
1954
1955#[derive(Clone, Debug)]
1956pub struct PlanClusterOption {
1957 pub availability_zones: AlterOptionParameter<Vec<String>>,
1958 pub introspection_debugging: AlterOptionParameter<bool>,
1959 pub introspection_interval: AlterOptionParameter<OptionalDuration>,
1960 pub managed: AlterOptionParameter<bool>,
1961 pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
1962 pub replication_factor: AlterOptionParameter<u32>,
1963 pub size: AlterOptionParameter,
1964 pub schedule: AlterOptionParameter<ClusterSchedule>,
1965 pub workload_class: AlterOptionParameter<Option<String>>,
1966}
1967
1968impl Default for PlanClusterOption {
1969 fn default() -> Self {
1970 Self {
1971 availability_zones: AlterOptionParameter::Unchanged,
1972 introspection_debugging: AlterOptionParameter::Unchanged,
1973 introspection_interval: AlterOptionParameter::Unchanged,
1974 managed: AlterOptionParameter::Unchanged,
1975 replicas: AlterOptionParameter::Unchanged,
1976 replication_factor: AlterOptionParameter::Unchanged,
1977 size: AlterOptionParameter::Unchanged,
1978 schedule: AlterOptionParameter::Unchanged,
1979 workload_class: AlterOptionParameter::Unchanged,
1980 }
1981 }
1982}
1983
1984#[derive(Clone, Debug, PartialEq, Eq)]
1985pub enum AlterClusterPlanStrategy {
1986 None,
1987 For(Duration),
1988 UntilReady {
1989 on_timeout: OnTimeoutAction,
1990 timeout: Duration,
1991 },
1992}
1993
1994#[derive(Clone, Debug, PartialEq, Eq)]
1995pub enum OnTimeoutAction {
1996 Commit,
1997 Rollback,
1998}
1999
2000impl Default for OnTimeoutAction {
2001 fn default() -> Self {
2002 Self::Commit
2003 }
2004}
2005
2006impl TryFrom<&str> for OnTimeoutAction {
2007 type Error = PlanError;
2008 fn try_from(value: &str) -> Result<Self, Self::Error> {
2009 match value.to_uppercase().as_str() {
2010 "COMMIT" => Ok(Self::Commit),
2011 "ROLLBACK" => Ok(Self::Rollback),
2012 _ => Err(PlanError::Unstructured(
2013 "Valid options are COMMIT, ROLLBACK".into(),
2014 )),
2015 }
2016 }
2017}
2018
2019impl AlterClusterPlanStrategy {
2020 pub fn is_none(&self) -> bool {
2021 matches!(self, Self::None)
2022 }
2023 pub fn is_some(&self) -> bool {
2024 !matches!(self, Self::None)
2025 }
2026}
2027
2028impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2029 type Error = PlanError;
2030
2031 fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2032 Ok(match value.wait {
2033 Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2034 Some(ClusterAlterOptionValue::UntilReady(options)) => {
2035 let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2036 Self::UntilReady {
2037 timeout: match extracted.timeout {
2038 Some(d) => d,
2039 None => Err(PlanError::UntilReadyTimeoutRequired)?,
2040 },
2041 on_timeout: match extracted.on_timeout {
2042 Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2043 PlanError::InvalidOptionValue {
2044 option_name: "ON TIMEOUT".into(),
2045 err: Box::new(e),
2046 }
2047 })?,
2048 None => OnTimeoutAction::default(),
2049 },
2050 }
2051 }
2052 None => Self::None,
2053 })
2054 }
2055}
2056
2057#[derive(Debug, Clone)]
2059pub struct Params {
2060 pub datums: Row,
2062 pub execute_types: Vec<SqlScalarType>,
2064 pub expected_types: Vec<SqlScalarType>,
2066}
2067
2068impl Params {
2069 pub fn empty() -> Params {
2071 Params {
2072 datums: Row::pack_slice(&[]),
2073 execute_types: vec![],
2074 expected_types: vec![],
2075 }
2076 }
2077}
2078
2079#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, Copy)]
2081pub struct PlanContext {
2082 pub wall_time: DateTime<Utc>,
2083 pub ignore_if_exists_errors: bool,
2084}
2085
2086impl PlanContext {
2087 pub fn new(wall_time: DateTime<Utc>) -> Self {
2088 Self {
2089 wall_time,
2090 ignore_if_exists_errors: false,
2091 }
2092 }
2093
2094 pub fn zero() -> Self {
2098 PlanContext {
2099 wall_time: now::to_datetime(NOW_ZERO()),
2100 ignore_if_exists_errors: false,
2101 }
2102 }
2103
2104 pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2105 self.ignore_if_exists_errors = value;
2106 self
2107 }
2108}