1use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{
41 CollectionPlan, ColumnOrder, MapFilterProject, MirRelationExpr, MirScalarExpr, RowSetFinishing,
42};
43use mz_ore::now::{self, NOW_ZERO};
44use mz_pgcopy::CopyFormatParams;
45use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
46use mz_repr::explain::{ExplainConfig, ExplainFormat};
47use mz_repr::network_policy_id::NetworkPolicyId;
48use mz_repr::optimize::OptimizerFeatureOverrides;
49use mz_repr::refresh_schedule::RefreshSchedule;
50use mz_repr::role_id::RoleId;
51use mz_repr::{
52 CatalogItemId, ColumnIndex, ColumnName, ColumnType, Diff, GlobalId, RelationDesc, RelationType,
53 Row, ScalarType, Timestamp, VersionedRelationDesc,
54};
55use mz_sql_parser::ast::{
56 AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
57 RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
58 Value, WithOptionValue,
59};
60use mz_ssh_util::keys::SshKeyPair;
61use mz_storage_types::connections::aws::AwsConnection;
62use mz_storage_types::connections::inline::ReferencedConnection;
63use mz_storage_types::connections::{
64 AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
65 MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
66};
67use mz_storage_types::instances::StorageInstanceId;
68use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
69use mz_storage_types::sources::{
70 SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
71};
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74
75use crate::ast::{
76 ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
77 TransactionAccessMode,
78};
79use crate::catalog::{
80 CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
81 RoleAttributes,
82};
83use crate::names::{
84 Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
85 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
86};
87
88pub(crate) mod error;
89pub(crate) mod explain;
90pub(crate) mod hir;
91pub(crate) mod literal;
92pub(crate) mod lowering;
93pub(crate) mod notice;
94pub(crate) mod plan_utils;
95pub(crate) mod query;
96pub(crate) mod scope;
97pub(crate) mod side_effecting_func;
98pub(crate) mod statement;
99pub(crate) mod transform_ast;
100pub(crate) mod transform_hir;
101pub(crate) mod typeconv;
102pub(crate) mod with_options;
103
104use crate::plan;
105use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
106use crate::plan::with_options::OptionalDuration;
107pub use error::PlanError;
108pub use explain::normalize_subqueries;
109pub use hir::{
110 AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
111 WindowExprType,
112};
113pub use lowering::Config as HirToMirConfig;
114pub use notice::PlanNotice;
115pub use query::{ExprContext, QueryContext, QueryLifetime};
116pub use scope::Scope;
117pub use side_effecting_func::SideEffectingFunc;
118pub use statement::ddl::{
119 AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
120 PlannedAlterRoleOption, PlannedRoleVariable, SqlServerConfigOptionExtracted,
121};
122pub use statement::{
123 StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
124 resolve_cluster_for_materialized_view,
125};
126
127use self::statement::ddl::ClusterAlterOptionExtracted;
128use self::with_options::TryFromValue;
129
130#[derive(Debug, EnumKind)]
132#[enum_kind(PlanKind)]
133pub enum Plan {
134 CreateConnection(CreateConnectionPlan),
135 CreateDatabase(CreateDatabasePlan),
136 CreateSchema(CreateSchemaPlan),
137 CreateRole(CreateRolePlan),
138 CreateCluster(CreateClusterPlan),
139 CreateClusterReplica(CreateClusterReplicaPlan),
140 CreateSource(CreateSourcePlan),
141 CreateSources(Vec<CreateSourcePlanBundle>),
142 CreateSecret(CreateSecretPlan),
143 CreateSink(CreateSinkPlan),
144 CreateTable(CreateTablePlan),
145 CreateView(CreateViewPlan),
146 CreateMaterializedView(CreateMaterializedViewPlan),
147 CreateContinualTask(CreateContinualTaskPlan),
148 CreateNetworkPolicy(CreateNetworkPolicyPlan),
149 CreateIndex(CreateIndexPlan),
150 CreateType(CreateTypePlan),
151 Comment(CommentPlan),
152 DiscardTemp,
153 DiscardAll,
154 DropObjects(DropObjectsPlan),
155 DropOwned(DropOwnedPlan),
156 EmptyQuery,
157 ShowAllVariables,
158 ShowCreate(ShowCreatePlan),
159 ShowColumns(ShowColumnsPlan),
160 ShowVariable(ShowVariablePlan),
161 InspectShard(InspectShardPlan),
162 SetVariable(SetVariablePlan),
163 ResetVariable(ResetVariablePlan),
164 SetTransaction(SetTransactionPlan),
165 StartTransaction(StartTransactionPlan),
166 CommitTransaction(CommitTransactionPlan),
167 AbortTransaction(AbortTransactionPlan),
168 Select(SelectPlan),
169 Subscribe(SubscribePlan),
170 CopyFrom(CopyFromPlan),
171 CopyTo(CopyToPlan),
172 ExplainPlan(ExplainPlanPlan),
173 ExplainPushdown(ExplainPushdownPlan),
174 ExplainTimestamp(ExplainTimestampPlan),
175 ExplainSinkSchema(ExplainSinkSchemaPlan),
176 Insert(InsertPlan),
177 AlterCluster(AlterClusterPlan),
178 AlterClusterSwap(AlterClusterSwapPlan),
179 AlterNoop(AlterNoopPlan),
180 AlterSetCluster(AlterSetClusterPlan),
181 AlterConnection(AlterConnectionPlan),
182 AlterSource(AlterSourcePlan),
183 AlterClusterRename(AlterClusterRenamePlan),
184 AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
185 AlterItemRename(AlterItemRenamePlan),
186 AlterSchemaRename(AlterSchemaRenamePlan),
187 AlterSchemaSwap(AlterSchemaSwapPlan),
188 AlterSecret(AlterSecretPlan),
189 AlterSink(AlterSinkPlan),
190 AlterSystemSet(AlterSystemSetPlan),
191 AlterSystemReset(AlterSystemResetPlan),
192 AlterSystemResetAll(AlterSystemResetAllPlan),
193 AlterRole(AlterRolePlan),
194 AlterOwner(AlterOwnerPlan),
195 AlterTableAddColumn(AlterTablePlan),
196 AlterNetworkPolicy(AlterNetworkPolicyPlan),
197 Declare(DeclarePlan),
198 Fetch(FetchPlan),
199 Close(ClosePlan),
200 ReadThenWrite(ReadThenWritePlan),
201 Prepare(PreparePlan),
202 Execute(ExecutePlan),
203 Deallocate(DeallocatePlan),
204 Raise(RaisePlan),
205 GrantRole(GrantRolePlan),
206 RevokeRole(RevokeRolePlan),
207 GrantPrivileges(GrantPrivilegesPlan),
208 RevokePrivileges(RevokePrivilegesPlan),
209 AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
210 ReassignOwned(ReassignOwnedPlan),
211 SideEffectingFunc(SideEffectingFunc),
212 ValidateConnection(ValidateConnectionPlan),
213 AlterRetainHistory(AlterRetainHistoryPlan),
214}
215
216impl Plan {
217 pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
220 match stmt {
221 StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
222 StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
223 StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
224 StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
225 StatementKind::AlterObjectRename => &[
226 PlanKind::AlterClusterRename,
227 PlanKind::AlterClusterReplicaRename,
228 PlanKind::AlterItemRename,
229 PlanKind::AlterSchemaRename,
230 PlanKind::AlterNoop,
231 ],
232 StatementKind::AlterObjectSwap => &[
233 PlanKind::AlterClusterSwap,
234 PlanKind::AlterSchemaSwap,
235 PlanKind::AlterNoop,
236 ],
237 StatementKind::AlterRole => &[PlanKind::AlterRole],
238 StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
239 StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
240 StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
241 StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
242 StatementKind::AlterSource => &[
243 PlanKind::AlterNoop,
244 PlanKind::AlterSource,
245 PlanKind::AlterRetainHistory,
246 ],
247 StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
248 StatementKind::AlterSystemResetAll => {
249 &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
250 }
251 StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
252 StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
253 StatementKind::AlterTableAddColumn => {
254 &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
255 }
256 StatementKind::Close => &[PlanKind::Close],
257 StatementKind::Comment => &[PlanKind::Comment],
258 StatementKind::Commit => &[PlanKind::CommitTransaction],
259 StatementKind::Copy => &[
260 PlanKind::CopyFrom,
261 PlanKind::Select,
262 PlanKind::Subscribe,
263 PlanKind::CopyTo,
264 ],
265 StatementKind::CreateCluster => &[PlanKind::CreateCluster],
266 StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
267 StatementKind::CreateConnection => &[PlanKind::CreateConnection],
268 StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
269 StatementKind::CreateIndex => &[PlanKind::CreateIndex],
270 StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
271 StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
272 StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
273 StatementKind::CreateRole => &[PlanKind::CreateRole],
274 StatementKind::CreateSchema => &[PlanKind::CreateSchema],
275 StatementKind::CreateSecret => &[PlanKind::CreateSecret],
276 StatementKind::CreateSink => &[PlanKind::CreateSink],
277 StatementKind::CreateSource | StatementKind::CreateSubsource => {
278 &[PlanKind::CreateSource]
279 }
280 StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
281 StatementKind::CreateTable => &[PlanKind::CreateTable],
282 StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
283 StatementKind::CreateType => &[PlanKind::CreateType],
284 StatementKind::CreateView => &[PlanKind::CreateView],
285 StatementKind::Deallocate => &[PlanKind::Deallocate],
286 StatementKind::Declare => &[PlanKind::Declare],
287 StatementKind::Delete => &[PlanKind::ReadThenWrite],
288 StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
289 StatementKind::DropObjects => &[PlanKind::DropObjects],
290 StatementKind::DropOwned => &[PlanKind::DropOwned],
291 StatementKind::Execute => &[PlanKind::Execute],
292 StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
293 StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
294 StatementKind::ExplainAnalyze => &[PlanKind::Select],
295 StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
296 StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
297 StatementKind::Fetch => &[PlanKind::Fetch],
298 StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
299 StatementKind::GrantRole => &[PlanKind::GrantRole],
300 StatementKind::Insert => &[PlanKind::Insert],
301 StatementKind::Prepare => &[PlanKind::Prepare],
302 StatementKind::Raise => &[PlanKind::Raise],
303 StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
304 StatementKind::ResetVariable => &[PlanKind::ResetVariable],
305 StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
306 StatementKind::RevokeRole => &[PlanKind::RevokeRole],
307 StatementKind::Rollback => &[PlanKind::AbortTransaction],
308 StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
309 StatementKind::SetTransaction => &[PlanKind::SetTransaction],
310 StatementKind::SetVariable => &[PlanKind::SetVariable],
311 StatementKind::Show => &[
312 PlanKind::Select,
313 PlanKind::ShowVariable,
314 PlanKind::ShowCreate,
315 PlanKind::ShowColumns,
316 PlanKind::ShowAllVariables,
317 PlanKind::InspectShard,
318 ],
319 StatementKind::StartTransaction => &[PlanKind::StartTransaction],
320 StatementKind::Subscribe => &[PlanKind::Subscribe],
321 StatementKind::Update => &[PlanKind::ReadThenWrite],
322 StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
323 StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
324 }
325 }
326
327 pub fn name(&self) -> &str {
329 match self {
330 Plan::CreateConnection(_) => "create connection",
331 Plan::CreateDatabase(_) => "create database",
332 Plan::CreateSchema(_) => "create schema",
333 Plan::CreateRole(_) => "create role",
334 Plan::CreateCluster(_) => "create cluster",
335 Plan::CreateClusterReplica(_) => "create cluster replica",
336 Plan::CreateSource(_) => "create source",
337 Plan::CreateSources(_) => "create source",
338 Plan::CreateSecret(_) => "create secret",
339 Plan::CreateSink(_) => "create sink",
340 Plan::CreateTable(_) => "create table",
341 Plan::CreateView(_) => "create view",
342 Plan::CreateMaterializedView(_) => "create materialized view",
343 Plan::CreateContinualTask(_) => "create continual task",
344 Plan::CreateIndex(_) => "create index",
345 Plan::CreateType(_) => "create type",
346 Plan::CreateNetworkPolicy(_) => "create network policy",
347 Plan::Comment(_) => "comment",
348 Plan::DiscardTemp => "discard temp",
349 Plan::DiscardAll => "discard all",
350 Plan::DropObjects(plan) => match plan.object_type {
351 ObjectType::Table => "drop table",
352 ObjectType::View => "drop view",
353 ObjectType::MaterializedView => "drop materialized view",
354 ObjectType::Source => "drop source",
355 ObjectType::Sink => "drop sink",
356 ObjectType::Index => "drop index",
357 ObjectType::Type => "drop type",
358 ObjectType::Role => "drop roles",
359 ObjectType::Cluster => "drop clusters",
360 ObjectType::ClusterReplica => "drop cluster replicas",
361 ObjectType::Secret => "drop secret",
362 ObjectType::Connection => "drop connection",
363 ObjectType::Database => "drop database",
364 ObjectType::Schema => "drop schema",
365 ObjectType::Func => "drop function",
366 ObjectType::ContinualTask => "drop continual task",
367 ObjectType::NetworkPolicy => "drop network policy",
368 },
369 Plan::DropOwned(_) => "drop owned",
370 Plan::EmptyQuery => "do nothing",
371 Plan::ShowAllVariables => "show all variables",
372 Plan::ShowCreate(_) => "show create",
373 Plan::ShowColumns(_) => "show columns",
374 Plan::ShowVariable(_) => "show variable",
375 Plan::InspectShard(_) => "inspect shard",
376 Plan::SetVariable(_) => "set variable",
377 Plan::ResetVariable(_) => "reset variable",
378 Plan::SetTransaction(_) => "set transaction",
379 Plan::StartTransaction(_) => "start transaction",
380 Plan::CommitTransaction(_) => "commit",
381 Plan::AbortTransaction(_) => "abort",
382 Plan::Select(_) => "select",
383 Plan::Subscribe(_) => "subscribe",
384 Plan::CopyFrom(_) => "copy from",
385 Plan::CopyTo(_) => "copy to",
386 Plan::ExplainPlan(_) => "explain plan",
387 Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
388 Plan::ExplainTimestamp(_) => "explain timestamp",
389 Plan::ExplainSinkSchema(_) => "explain schema",
390 Plan::Insert(_) => "insert",
391 Plan::AlterNoop(plan) => match plan.object_type {
392 ObjectType::Table => "alter table",
393 ObjectType::View => "alter view",
394 ObjectType::MaterializedView => "alter materialized view",
395 ObjectType::Source => "alter source",
396 ObjectType::Sink => "alter sink",
397 ObjectType::Index => "alter index",
398 ObjectType::Type => "alter type",
399 ObjectType::Role => "alter role",
400 ObjectType::Cluster => "alter cluster",
401 ObjectType::ClusterReplica => "alter cluster replica",
402 ObjectType::Secret => "alter secret",
403 ObjectType::Connection => "alter connection",
404 ObjectType::Database => "alter database",
405 ObjectType::Schema => "alter schema",
406 ObjectType::Func => "alter function",
407 ObjectType::ContinualTask => "alter continual task",
408 ObjectType::NetworkPolicy => "alter network policy",
409 },
410 Plan::AlterCluster(_) => "alter cluster",
411 Plan::AlterClusterRename(_) => "alter cluster rename",
412 Plan::AlterClusterSwap(_) => "alter cluster swap",
413 Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
414 Plan::AlterSetCluster(_) => "alter set cluster",
415 Plan::AlterConnection(_) => "alter connection",
416 Plan::AlterSource(_) => "alter source",
417 Plan::AlterItemRename(_) => "rename item",
418 Plan::AlterSchemaRename(_) => "alter rename schema",
419 Plan::AlterSchemaSwap(_) => "alter swap schema",
420 Plan::AlterSecret(_) => "alter secret",
421 Plan::AlterSink(_) => "alter sink",
422 Plan::AlterSystemSet(_) => "alter system",
423 Plan::AlterSystemReset(_) => "alter system",
424 Plan::AlterSystemResetAll(_) => "alter system",
425 Plan::AlterRole(_) => "alter role",
426 Plan::AlterNetworkPolicy(_) => "alter network policy",
427 Plan::AlterOwner(plan) => match plan.object_type {
428 ObjectType::Table => "alter table owner",
429 ObjectType::View => "alter view owner",
430 ObjectType::MaterializedView => "alter materialized view owner",
431 ObjectType::Source => "alter source owner",
432 ObjectType::Sink => "alter sink owner",
433 ObjectType::Index => "alter index owner",
434 ObjectType::Type => "alter type owner",
435 ObjectType::Role => "alter role owner",
436 ObjectType::Cluster => "alter cluster owner",
437 ObjectType::ClusterReplica => "alter cluster replica owner",
438 ObjectType::Secret => "alter secret owner",
439 ObjectType::Connection => "alter connection owner",
440 ObjectType::Database => "alter database owner",
441 ObjectType::Schema => "alter schema owner",
442 ObjectType::Func => "alter function owner",
443 ObjectType::ContinualTask => "alter continual task owner",
444 ObjectType::NetworkPolicy => "alter network policy owner",
445 },
446 Plan::AlterTableAddColumn(_) => "alter table add column",
447 Plan::Declare(_) => "declare",
448 Plan::Fetch(_) => "fetch",
449 Plan::Close(_) => "close",
450 Plan::ReadThenWrite(plan) => match plan.kind {
451 MutationKind::Insert => "insert into select",
452 MutationKind::Update => "update",
453 MutationKind::Delete => "delete",
454 },
455 Plan::Prepare(_) => "prepare",
456 Plan::Execute(_) => "execute",
457 Plan::Deallocate(_) => "deallocate",
458 Plan::Raise(_) => "raise",
459 Plan::GrantRole(_) => "grant role",
460 Plan::RevokeRole(_) => "revoke role",
461 Plan::GrantPrivileges(_) => "grant privilege",
462 Plan::RevokePrivileges(_) => "revoke privilege",
463 Plan::AlterDefaultPrivileges(_) => "alter default privileges",
464 Plan::ReassignOwned(_) => "reassign owned",
465 Plan::SideEffectingFunc(_) => "side effecting func",
466 Plan::ValidateConnection(_) => "validate connection",
467 Plan::AlterRetainHistory(_) => "alter retain history",
468 }
469 }
470
471 pub fn allowed_in_read_only(&self) -> bool {
477 match self {
478 Plan::SetVariable(_) => true,
481 Plan::ResetVariable(_) => true,
482 Plan::SetTransaction(_) => true,
483 Plan::StartTransaction(_) => true,
484 Plan::CommitTransaction(_) => true,
485 Plan::AbortTransaction(_) => true,
486 Plan::Select(_) => true,
487 Plan::EmptyQuery => true,
488 Plan::ShowAllVariables => true,
489 Plan::ShowCreate(_) => true,
490 Plan::ShowColumns(_) => true,
491 Plan::ShowVariable(_) => true,
492 Plan::InspectShard(_) => true,
493 Plan::Subscribe(_) => true,
494 Plan::CopyTo(_) => true,
495 Plan::ExplainPlan(_) => true,
496 Plan::ExplainPushdown(_) => true,
497 Plan::ExplainTimestamp(_) => true,
498 Plan::ExplainSinkSchema(_) => true,
499 Plan::ValidateConnection(_) => true,
500 _ => false,
501 }
502 }
503}
504
505#[derive(Debug)]
506pub struct StartTransactionPlan {
507 pub access: Option<TransactionAccessMode>,
508 pub isolation_level: Option<TransactionIsolationLevel>,
509}
510
511#[derive(Debug)]
512pub enum TransactionType {
513 Explicit,
514 Implicit,
515}
516
517impl TransactionType {
518 pub fn is_explicit(&self) -> bool {
519 matches!(self, TransactionType::Explicit)
520 }
521
522 pub fn is_implicit(&self) -> bool {
523 matches!(self, TransactionType::Implicit)
524 }
525}
526
527#[derive(Debug)]
528pub struct CommitTransactionPlan {
529 pub transaction_type: TransactionType,
530}
531
532#[derive(Debug)]
533pub struct AbortTransactionPlan {
534 pub transaction_type: TransactionType,
535}
536
537#[derive(Debug)]
538pub struct CreateDatabasePlan {
539 pub name: String,
540 pub if_not_exists: bool,
541}
542
543#[derive(Debug)]
544pub struct CreateSchemaPlan {
545 pub database_spec: ResolvedDatabaseSpecifier,
546 pub schema_name: String,
547 pub if_not_exists: bool,
548}
549
550#[derive(Debug)]
551pub struct CreateRolePlan {
552 pub name: String,
553 pub attributes: RoleAttributes,
554}
555
556#[derive(Debug, PartialEq, Eq, Clone)]
557pub struct CreateClusterPlan {
558 pub name: String,
559 pub variant: CreateClusterVariant,
560 pub workload_class: Option<String>,
561}
562
563#[derive(Debug, PartialEq, Eq, Clone)]
564pub enum CreateClusterVariant {
565 Managed(CreateClusterManagedPlan),
566 Unmanaged(CreateClusterUnmanagedPlan),
567}
568
569#[derive(Debug, PartialEq, Eq, Clone)]
570pub struct CreateClusterUnmanagedPlan {
571 pub replicas: Vec<(String, ReplicaConfig)>,
572}
573
574#[derive(Debug, PartialEq, Eq, Clone)]
575pub struct CreateClusterManagedPlan {
576 pub replication_factor: u32,
577 pub size: String,
578 pub availability_zones: Vec<String>,
579 pub compute: ComputeReplicaConfig,
580 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
581 pub schedule: ClusterSchedule,
582}
583
584#[derive(Debug)]
585pub struct CreateClusterReplicaPlan {
586 pub cluster_id: ClusterId,
587 pub name: String,
588 pub config: ReplicaConfig,
589}
590
591#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq)]
593pub struct ComputeReplicaIntrospectionConfig {
594 pub debugging: bool,
596 pub interval: Duration,
598}
599
600#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
601pub struct ComputeReplicaConfig {
602 pub introspection: Option<ComputeReplicaIntrospectionConfig>,
603}
604
605#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
606pub enum ReplicaConfig {
607 Unorchestrated {
608 storagectl_addrs: Vec<String>,
609 computectl_addrs: Vec<String>,
610 compute: ComputeReplicaConfig,
611 },
612 Orchestrated {
613 size: String,
614 availability_zone: Option<String>,
615 compute: ComputeReplicaConfig,
616 internal: bool,
617 billed_as: Option<String>,
618 },
619}
620
621#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
622pub enum ClusterSchedule {
623 Manual,
625 Refresh { hydration_time_estimate: Duration },
629}
630
631impl Default for ClusterSchedule {
632 fn default() -> Self {
633 ClusterSchedule::Manual
635 }
636}
637
638#[derive(Debug)]
639pub struct CreateSourcePlan {
640 pub name: QualifiedItemName,
641 pub source: Source,
642 pub if_not_exists: bool,
643 pub timeline: Timeline,
644 pub in_cluster: Option<ClusterId>,
646}
647
648#[derive(Clone, Debug, PartialEq, Eq)]
649pub struct SourceReferences {
650 pub updated_at: u64,
651 pub references: Vec<SourceReference>,
652}
653
654#[derive(Clone, Debug, PartialEq, Eq)]
657pub struct SourceReference {
658 pub name: String,
659 pub namespace: Option<String>,
660 pub columns: Vec<String>,
661}
662
663#[derive(Debug)]
665pub struct CreateSourcePlanBundle {
666 pub item_id: CatalogItemId,
668 pub global_id: GlobalId,
670 pub plan: CreateSourcePlan,
672 pub resolved_ids: ResolvedIds,
674 pub available_source_references: Option<SourceReferences>,
678}
679
680#[derive(Debug)]
681pub struct CreateConnectionPlan {
682 pub name: QualifiedItemName,
683 pub if_not_exists: bool,
684 pub connection: Connection,
685 pub validate: bool,
686}
687
688#[derive(Debug)]
689pub struct ValidateConnectionPlan {
690 pub id: CatalogItemId,
692 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
694}
695
696#[derive(Debug)]
697pub struct CreateSecretPlan {
698 pub name: QualifiedItemName,
699 pub secret: Secret,
700 pub if_not_exists: bool,
701}
702
703#[derive(Debug)]
704pub struct CreateSinkPlan {
705 pub name: QualifiedItemName,
706 pub sink: Sink,
707 pub with_snapshot: bool,
708 pub if_not_exists: bool,
709 pub in_cluster: ClusterId,
710}
711
712#[derive(Debug)]
713pub struct CreateTablePlan {
714 pub name: QualifiedItemName,
715 pub table: Table,
716 pub if_not_exists: bool,
717}
718
719#[derive(Debug, Clone)]
720pub struct CreateViewPlan {
721 pub name: QualifiedItemName,
722 pub view: View,
723 pub replace: Option<CatalogItemId>,
725 pub drop_ids: Vec<CatalogItemId>,
727 pub if_not_exists: bool,
728 pub ambiguous_columns: bool,
731}
732
733#[derive(Debug, Clone)]
734pub struct CreateMaterializedViewPlan {
735 pub name: QualifiedItemName,
736 pub materialized_view: MaterializedView,
737 pub replace: Option<CatalogItemId>,
739 pub drop_ids: Vec<CatalogItemId>,
741 pub if_not_exists: bool,
742 pub ambiguous_columns: bool,
745}
746
747#[derive(Debug, Clone)]
748pub struct CreateContinualTaskPlan {
749 pub name: QualifiedItemName,
750 pub placeholder_id: Option<mz_expr::LocalId>,
753 pub desc: RelationDesc,
754 pub input_id: GlobalId,
756 pub with_snapshot: bool,
757 pub continual_task: MaterializedView,
759}
760
761#[derive(Debug, Clone)]
762pub struct CreateNetworkPolicyPlan {
763 pub name: String,
764 pub rules: Vec<NetworkPolicyRule>,
765}
766
767#[derive(Debug, Clone)]
768pub struct AlterNetworkPolicyPlan {
769 pub id: NetworkPolicyId,
770 pub name: String,
771 pub rules: Vec<NetworkPolicyRule>,
772}
773
774#[derive(Debug, Clone)]
775pub struct CreateIndexPlan {
776 pub name: QualifiedItemName,
777 pub index: Index,
778 pub if_not_exists: bool,
779}
780
781#[derive(Debug)]
782pub struct CreateTypePlan {
783 pub name: QualifiedItemName,
784 pub typ: Type,
785}
786
787#[derive(Debug)]
788pub struct DropObjectsPlan {
789 pub referenced_ids: Vec<ObjectId>,
791 pub drop_ids: Vec<ObjectId>,
793 pub object_type: ObjectType,
796}
797
798#[derive(Debug)]
799pub struct DropOwnedPlan {
800 pub role_ids: Vec<RoleId>,
802 pub drop_ids: Vec<ObjectId>,
804 pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
806 pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
808}
809
810#[derive(Debug)]
811pub struct ShowVariablePlan {
812 pub name: String,
813}
814
815#[derive(Debug)]
816pub struct InspectShardPlan {
817 pub id: GlobalId,
819}
820
821#[derive(Debug)]
822pub struct SetVariablePlan {
823 pub name: String,
824 pub value: VariableValue,
825 pub local: bool,
826}
827
828#[derive(Debug)]
829pub enum VariableValue {
830 Default,
831 Values(Vec<String>),
832}
833
834#[derive(Debug)]
835pub struct ResetVariablePlan {
836 pub name: String,
837}
838
839#[derive(Debug)]
840pub struct SetTransactionPlan {
841 pub local: bool,
842 pub modes: Vec<TransactionMode>,
843}
844
845#[derive(Clone, Debug)]
847pub struct SelectPlan {
848 pub select: Option<Box<SelectStatement<Aug>>>,
851 pub source: HirRelationExpr,
853 pub when: QueryWhen,
855 pub finishing: RowSetFinishing,
857 pub copy_to: Option<CopyFormat>,
859}
860
861impl SelectPlan {
862 pub fn immediate(rows: Vec<Row>, typ: RelationType) -> Self {
863 let arity = typ.arity();
864 SelectPlan {
865 select: None,
866 source: HirRelationExpr::Constant { rows, typ },
867 when: QueryWhen::Immediately,
868 finishing: RowSetFinishing::trivial(arity),
869 copy_to: None,
870 }
871 }
872}
873
874#[derive(Debug)]
875pub enum SubscribeOutput {
876 Diffs,
877 WithinTimestampOrderBy {
878 order_by: Vec<ColumnOrder>,
880 },
881 EnvelopeUpsert {
882 order_by_keys: Vec<ColumnOrder>,
884 },
885 EnvelopeDebezium {
886 order_by_keys: Vec<ColumnOrder>,
888 },
889}
890
891#[derive(Debug)]
892pub struct SubscribePlan {
893 pub from: SubscribeFrom,
894 pub with_snapshot: bool,
895 pub when: QueryWhen,
896 pub up_to: Option<MirScalarExpr>,
897 pub copy_to: Option<CopyFormat>,
898 pub emit_progress: bool,
899 pub output: SubscribeOutput,
900}
901
902#[derive(Debug, Clone)]
903pub enum SubscribeFrom {
904 Id(GlobalId),
906 Query {
908 expr: MirRelationExpr,
909 desc: RelationDesc,
910 },
911}
912
913impl SubscribeFrom {
914 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
915 match self {
916 SubscribeFrom::Id(id) => BTreeSet::from([*id]),
917 SubscribeFrom::Query { expr, .. } => expr.depends_on(),
918 }
919 }
920
921 pub fn contains_temporal(&self) -> bool {
922 match self {
923 SubscribeFrom::Id(_) => false,
924 SubscribeFrom::Query { expr, .. } => expr.contains_temporal(),
925 }
926 }
927}
928
929#[derive(Debug)]
930pub struct ShowCreatePlan {
931 pub id: ObjectId,
932 pub row: Row,
933}
934
935#[derive(Debug)]
936pub struct ShowColumnsPlan {
937 pub id: CatalogItemId,
938 pub select_plan: SelectPlan,
939 pub new_resolved_ids: ResolvedIds,
940}
941
942#[derive(Debug)]
943pub struct CopyFromPlan {
944 pub id: CatalogItemId,
946 pub source: CopyFromSource,
948 pub columns: Vec<ColumnIndex>,
952 pub source_desc: RelationDesc,
954 pub mfp: MapFilterProject,
956 pub params: CopyFormatParams<'static>,
958 pub filter: Option<CopyFromFilter>,
960}
961
962#[derive(Debug)]
963pub enum CopyFromSource {
964 Stdin,
966 Url(HirScalarExpr),
970 AwsS3 {
972 uri: HirScalarExpr,
974 connection: AwsConnection,
976 connection_id: CatalogItemId,
978 },
979}
980
981#[derive(Debug)]
982pub enum CopyFromFilter {
983 Files(Vec<String>),
984 Pattern(String),
985}
986
987#[derive(Debug, Clone)]
988pub struct CopyToPlan {
989 pub select_plan: SelectPlan,
991 pub desc: RelationDesc,
992 pub to: HirScalarExpr,
994 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
995 pub connection_id: CatalogItemId,
997 pub format: S3SinkFormat,
998 pub max_file_size: u64,
999}
1000
1001#[derive(Clone, Debug)]
1002pub struct ExplainPlanPlan {
1003 pub stage: ExplainStage,
1004 pub format: ExplainFormat,
1005 pub config: ExplainConfig,
1006 pub explainee: Explainee,
1007}
1008
1009#[derive(Clone, Debug)]
1011pub enum Explainee {
1012 View(CatalogItemId),
1014 MaterializedView(CatalogItemId),
1016 Index(CatalogItemId),
1018 ReplanView(CatalogItemId),
1020 ReplanMaterializedView(CatalogItemId),
1022 ReplanIndex(CatalogItemId),
1024 Statement(ExplaineeStatement),
1026}
1027
1028#[derive(Clone, Debug, EnumKind)]
1030#[enum_kind(ExplaineeStatementKind)]
1031pub enum ExplaineeStatement {
1032 Select {
1034 broken: bool,
1036 plan: plan::SelectPlan,
1037 desc: RelationDesc,
1038 },
1039 CreateView {
1041 broken: bool,
1043 plan: plan::CreateViewPlan,
1044 },
1045 CreateMaterializedView {
1047 broken: bool,
1049 plan: plan::CreateMaterializedViewPlan,
1050 },
1051 CreateIndex {
1053 broken: bool,
1055 plan: plan::CreateIndexPlan,
1056 },
1057}
1058
1059impl ExplaineeStatement {
1060 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1061 match self {
1062 Self::Select { plan, .. } => plan.source.depends_on(),
1063 Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1064 Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1065 Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1066 }
1067 }
1068
1069 pub fn broken(&self) -> bool {
1080 match self {
1081 Self::Select { broken, .. } => *broken,
1082 Self::CreateView { broken, .. } => *broken,
1083 Self::CreateMaterializedView { broken, .. } => *broken,
1084 Self::CreateIndex { broken, .. } => *broken,
1085 }
1086 }
1087}
1088
1089impl ExplaineeStatementKind {
1090 pub fn supports(&self, stage: &ExplainStage) -> bool {
1091 use ExplainStage::*;
1092 match self {
1093 Self::Select => true,
1094 Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1095 Self::CreateMaterializedView => true,
1096 Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1097 }
1098 }
1099}
1100
1101impl std::fmt::Display for ExplaineeStatementKind {
1102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1103 match self {
1104 Self::Select => write!(f, "SELECT"),
1105 Self::CreateView => write!(f, "CREATE VIEW"),
1106 Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1107 Self::CreateIndex => write!(f, "CREATE INDEX"),
1108 }
1109 }
1110}
1111
1112#[derive(Clone, Debug)]
1113pub struct ExplainPushdownPlan {
1114 pub explainee: Explainee,
1115}
1116
1117#[derive(Clone, Debug)]
1118pub struct ExplainTimestampPlan {
1119 pub format: ExplainFormat,
1120 pub raw_plan: HirRelationExpr,
1121 pub when: QueryWhen,
1122}
1123
1124#[derive(Debug)]
1125pub struct ExplainSinkSchemaPlan {
1126 pub sink_from: GlobalId,
1127 pub json_schema: String,
1128}
1129
1130#[derive(Debug)]
1131pub struct SendDiffsPlan {
1132 pub id: CatalogItemId,
1133 pub updates: Vec<(Row, Diff)>,
1134 pub kind: MutationKind,
1135 pub returning: Vec<(Row, NonZeroUsize)>,
1136 pub max_result_size: u64,
1137}
1138
1139#[derive(Debug)]
1140pub struct InsertPlan {
1141 pub id: CatalogItemId,
1142 pub values: HirRelationExpr,
1143 pub returning: Vec<mz_expr::MirScalarExpr>,
1144}
1145
1146#[derive(Debug)]
1147pub struct ReadThenWritePlan {
1148 pub id: CatalogItemId,
1149 pub selection: HirRelationExpr,
1150 pub finishing: RowSetFinishing,
1151 pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1152 pub kind: MutationKind,
1153 pub returning: Vec<mz_expr::MirScalarExpr>,
1154}
1155
1156#[derive(Debug)]
1158pub struct AlterNoopPlan {
1159 pub object_type: ObjectType,
1160}
1161
1162#[derive(Debug)]
1163pub struct AlterSetClusterPlan {
1164 pub id: CatalogItemId,
1165 pub set_cluster: ClusterId,
1166}
1167
1168#[derive(Debug)]
1169pub struct AlterRetainHistoryPlan {
1170 pub id: CatalogItemId,
1171 pub value: Option<Value>,
1172 pub window: CompactionWindow,
1173 pub object_type: ObjectType,
1174}
1175
1176#[derive(Debug, Clone)]
1177
1178pub enum AlterOptionParameter<T = String> {
1179 Set(T),
1180 Reset,
1181 Unchanged,
1182}
1183
1184#[derive(Debug)]
1185pub enum AlterConnectionAction {
1186 RotateKeys,
1187 AlterOptions {
1188 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1189 drop_options: BTreeSet<ConnectionOptionName>,
1190 validate: bool,
1191 },
1192}
1193
1194#[derive(Debug)]
1195pub struct AlterConnectionPlan {
1196 pub id: CatalogItemId,
1197 pub action: AlterConnectionAction,
1198}
1199
1200#[derive(Debug)]
1201pub enum AlterSourceAction {
1202 AddSubsourceExports {
1203 subsources: Vec<CreateSourcePlanBundle>,
1204 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1205 },
1206 RefreshReferences {
1207 references: SourceReferences,
1208 },
1209}
1210
1211#[derive(Debug)]
1212pub struct AlterSourcePlan {
1213 pub item_id: CatalogItemId,
1214 pub ingestion_id: GlobalId,
1215 pub action: AlterSourceAction,
1216}
1217
1218#[derive(Debug, Clone)]
1219pub struct AlterSinkPlan {
1220 pub item_id: CatalogItemId,
1221 pub global_id: GlobalId,
1222 pub sink: Sink,
1223 pub with_snapshot: bool,
1224 pub in_cluster: ClusterId,
1225}
1226
1227#[derive(Debug, Clone)]
1228pub struct AlterClusterPlan {
1229 pub id: ClusterId,
1230 pub name: String,
1231 pub options: PlanClusterOption,
1232 pub strategy: AlterClusterPlanStrategy,
1233}
1234
1235#[derive(Debug)]
1236pub struct AlterClusterRenamePlan {
1237 pub id: ClusterId,
1238 pub name: String,
1239 pub to_name: String,
1240}
1241
1242#[derive(Debug)]
1243pub struct AlterClusterReplicaRenamePlan {
1244 pub cluster_id: ClusterId,
1245 pub replica_id: ReplicaId,
1246 pub name: QualifiedReplica,
1247 pub to_name: String,
1248}
1249
1250#[derive(Debug)]
1251pub struct AlterItemRenamePlan {
1252 pub id: CatalogItemId,
1253 pub current_full_name: FullItemName,
1254 pub to_name: String,
1255 pub object_type: ObjectType,
1256}
1257
1258#[derive(Debug)]
1259pub struct AlterSchemaRenamePlan {
1260 pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1261 pub new_schema_name: String,
1262}
1263
1264#[derive(Debug)]
1265pub struct AlterSchemaSwapPlan {
1266 pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1267 pub schema_a_name: String,
1268 pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1269 pub schema_b_name: String,
1270 pub name_temp: String,
1271}
1272
1273#[derive(Debug)]
1274pub struct AlterClusterSwapPlan {
1275 pub id_a: ClusterId,
1276 pub id_b: ClusterId,
1277 pub name_a: String,
1278 pub name_b: String,
1279 pub name_temp: String,
1280}
1281
1282#[derive(Debug)]
1283pub struct AlterSecretPlan {
1284 pub id: CatalogItemId,
1285 pub secret_as: MirScalarExpr,
1286}
1287
1288#[derive(Debug)]
1289pub struct AlterSystemSetPlan {
1290 pub name: String,
1291 pub value: VariableValue,
1292}
1293
1294#[derive(Debug)]
1295pub struct AlterSystemResetPlan {
1296 pub name: String,
1297}
1298
1299#[derive(Debug)]
1300pub struct AlterSystemResetAllPlan {}
1301
1302#[derive(Debug)]
1303pub struct AlterRolePlan {
1304 pub id: RoleId,
1305 pub name: String,
1306 pub option: PlannedAlterRoleOption,
1307}
1308
1309#[derive(Debug)]
1310pub struct AlterOwnerPlan {
1311 pub id: ObjectId,
1312 pub object_type: ObjectType,
1313 pub new_owner: RoleId,
1314}
1315
1316#[derive(Debug)]
1317pub struct AlterTablePlan {
1318 pub relation_id: CatalogItemId,
1319 pub column_name: ColumnName,
1320 pub column_type: ColumnType,
1321 pub raw_sql_type: RawDataType,
1322}
1323
1324#[derive(Debug)]
1325pub struct DeclarePlan {
1326 pub name: String,
1327 pub stmt: Statement<Raw>,
1328 pub sql: String,
1329 pub params: Params,
1330}
1331
1332#[derive(Debug)]
1333pub struct FetchPlan {
1334 pub name: String,
1335 pub count: Option<FetchDirection>,
1336 pub timeout: ExecuteTimeout,
1337}
1338
1339#[derive(Debug)]
1340pub struct ClosePlan {
1341 pub name: String,
1342}
1343
1344#[derive(Debug)]
1345pub struct PreparePlan {
1346 pub name: String,
1347 pub stmt: Statement<Raw>,
1348 pub sql: String,
1349 pub desc: StatementDesc,
1350}
1351
1352#[derive(Debug)]
1353pub struct ExecutePlan {
1354 pub name: String,
1355 pub params: Params,
1356}
1357
1358#[derive(Debug)]
1359pub struct DeallocatePlan {
1360 pub name: Option<String>,
1361}
1362
1363#[derive(Debug)]
1364pub struct RaisePlan {
1365 pub severity: NoticeSeverity,
1366}
1367
1368#[derive(Debug)]
1369pub struct GrantRolePlan {
1370 pub role_ids: Vec<RoleId>,
1372 pub member_ids: Vec<RoleId>,
1374 pub grantor_id: RoleId,
1376}
1377
1378#[derive(Debug)]
1379pub struct RevokeRolePlan {
1380 pub role_ids: Vec<RoleId>,
1382 pub member_ids: Vec<RoleId>,
1384 pub grantor_id: RoleId,
1386}
1387
1388#[derive(Debug)]
1389pub struct UpdatePrivilege {
1390 pub acl_mode: AclMode,
1392 pub target_id: SystemObjectId,
1394 pub grantor: RoleId,
1396}
1397
1398#[derive(Debug)]
1399pub struct GrantPrivilegesPlan {
1400 pub update_privileges: Vec<UpdatePrivilege>,
1402 pub grantees: Vec<RoleId>,
1404}
1405
1406#[derive(Debug)]
1407pub struct RevokePrivilegesPlan {
1408 pub update_privileges: Vec<UpdatePrivilege>,
1410 pub revokees: Vec<RoleId>,
1412}
1413#[derive(Debug)]
1414pub struct AlterDefaultPrivilegesPlan {
1415 pub privilege_objects: Vec<DefaultPrivilegeObject>,
1417 pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1419 pub is_grant: bool,
1421}
1422
1423#[derive(Debug)]
1424pub struct ReassignOwnedPlan {
1425 pub old_roles: Vec<RoleId>,
1427 pub new_role: RoleId,
1429 pub reassign_ids: Vec<ObjectId>,
1431}
1432
1433#[derive(Debug)]
1434pub struct CommentPlan {
1435 pub object_id: CommentObjectId,
1437 pub sub_component: Option<usize>,
1441 pub comment: Option<String>,
1443}
1444
1445#[derive(Clone, Debug)]
1446pub enum TableDataSource {
1447 TableWrites { defaults: Vec<Expr<Aug>> },
1449
1450 DataSource {
1453 desc: DataSourceDesc,
1454 timeline: Timeline,
1455 },
1456}
1457
1458#[derive(Clone, Debug)]
1459pub struct Table {
1460 pub create_sql: String,
1461 pub desc: VersionedRelationDesc,
1462 pub temporary: bool,
1463 pub compaction_window: Option<CompactionWindow>,
1464 pub data_source: TableDataSource,
1465}
1466
1467#[derive(Clone, Debug)]
1468pub struct Source {
1469 pub create_sql: String,
1470 pub data_source: DataSourceDesc,
1471 pub desc: RelationDesc,
1472 pub compaction_window: Option<CompactionWindow>,
1473}
1474
1475#[derive(Debug, Clone)]
1476pub enum DataSourceDesc {
1477 Ingestion(Ingestion),
1479 IngestionExport {
1482 ingestion_id: CatalogItemId,
1483 external_reference: UnresolvedItemName,
1484 details: SourceExportDetails,
1485 data_config: SourceExportDataConfig<ReferencedConnection>,
1486 },
1487 Progress,
1489 Webhook {
1491 validate_using: Option<WebhookValidation>,
1492 body_format: WebhookBodyFormat,
1493 headers: WebhookHeaders,
1494 cluster_id: Option<StorageInstanceId>,
1496 },
1497}
1498
1499#[derive(Clone, Debug, Serialize, Deserialize)]
1500pub struct Ingestion {
1501 pub desc: SourceDesc<ReferencedConnection>,
1502 pub progress_subsource: CatalogItemId,
1503}
1504
1505#[derive(Clone, Debug, Serialize)]
1506pub struct WebhookValidation {
1507 pub expression: MirScalarExpr,
1509 pub relation_desc: RelationDesc,
1511 pub bodies: Vec<(usize, bool)>,
1513 pub headers: Vec<(usize, bool)>,
1515 pub secrets: Vec<WebhookValidationSecret>,
1517}
1518
1519impl WebhookValidation {
1520 const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1521
1522 pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1527 let WebhookValidation {
1528 expression,
1529 relation_desc,
1530 ..
1531 } = self;
1532
1533 let mut expression_ = expression.clone();
1535 let desc_ = relation_desc.clone();
1536 let reduce_task = mz_ore::task::spawn_blocking(
1537 || "webhook-validation-reduce",
1538 move || {
1539 expression_.reduce(&desc_.typ().column_types);
1540 expression_
1541 },
1542 );
1543
1544 match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1545 Ok(Ok(reduced_expr)) => {
1546 *expression = reduced_expr;
1547 Ok(())
1548 }
1549 Ok(Err(_)) => Err("joining task"),
1550 Err(_) => Err("timeout"),
1551 }
1552 }
1553}
1554
1555#[derive(Clone, Debug, Default, Serialize)]
1556pub struct WebhookHeaders {
1557 pub header_column: Option<WebhookHeaderFilters>,
1559 pub mapped_headers: BTreeMap<usize, (String, bool)>,
1561}
1562
1563impl WebhookHeaders {
1564 pub fn num_columns(&self) -> usize {
1566 let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1567 let mapped_headers = self.mapped_headers.len();
1568
1569 header_column + mapped_headers
1570 }
1571}
1572
1573#[derive(Clone, Debug, Default, Serialize)]
1574pub struct WebhookHeaderFilters {
1575 pub block: BTreeSet<String>,
1576 pub allow: BTreeSet<String>,
1577}
1578
1579#[derive(Copy, Clone, Debug, Serialize, Arbitrary)]
1580pub enum WebhookBodyFormat {
1581 Json { array: bool },
1582 Bytes,
1583 Text,
1584}
1585
1586impl From<WebhookBodyFormat> for ScalarType {
1587 fn from(value: WebhookBodyFormat) -> Self {
1588 match value {
1589 WebhookBodyFormat::Json { .. } => ScalarType::Jsonb,
1590 WebhookBodyFormat::Bytes => ScalarType::Bytes,
1591 WebhookBodyFormat::Text => ScalarType::String,
1592 }
1593 }
1594}
1595
1596#[derive(Clone, Debug, Serialize)]
1597pub struct WebhookValidationSecret {
1598 pub id: CatalogItemId,
1600 pub column_idx: usize,
1602 pub use_bytes: bool,
1604}
1605
1606#[derive(Clone, Debug)]
1607pub struct Connection {
1608 pub create_sql: String,
1609 pub details: ConnectionDetails,
1610}
1611
1612#[derive(Clone, Debug, Serialize)]
1613pub enum ConnectionDetails {
1614 Kafka(KafkaConnection<ReferencedConnection>),
1615 Csr(CsrConnection<ReferencedConnection>),
1616 Postgres(PostgresConnection<ReferencedConnection>),
1617 Ssh {
1618 connection: SshConnection,
1619 key_1: SshKey,
1620 key_2: SshKey,
1621 },
1622 Aws(AwsConnection),
1623 AwsPrivatelink(AwsPrivatelinkConnection),
1624 MySql(MySqlConnection<ReferencedConnection>),
1625 SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1626 IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1627}
1628
1629impl ConnectionDetails {
1630 pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1631 match self {
1632 ConnectionDetails::Kafka(c) => {
1633 mz_storage_types::connections::Connection::Kafka(c.clone())
1634 }
1635 ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1636 ConnectionDetails::Postgres(c) => {
1637 mz_storage_types::connections::Connection::Postgres(c.clone())
1638 }
1639 ConnectionDetails::Ssh { connection, .. } => {
1640 mz_storage_types::connections::Connection::Ssh(connection.clone())
1641 }
1642 ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1643 ConnectionDetails::AwsPrivatelink(c) => {
1644 mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1645 }
1646 ConnectionDetails::MySql(c) => {
1647 mz_storage_types::connections::Connection::MySql(c.clone())
1648 }
1649 ConnectionDetails::SqlServer(c) => {
1650 mz_storage_types::connections::Connection::SqlServer(c.clone())
1651 }
1652 ConnectionDetails::IcebergCatalog(c) => {
1653 mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1654 }
1655 }
1656 }
1657}
1658
1659#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1660pub struct NetworkPolicyRule {
1661 pub name: String,
1662 pub action: NetworkPolicyRuleAction,
1663 pub address: PolicyAddress,
1664 pub direction: NetworkPolicyRuleDirection,
1665}
1666
1667#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1668pub enum NetworkPolicyRuleAction {
1669 Allow,
1670}
1671
1672impl std::fmt::Display for NetworkPolicyRuleAction {
1673 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1674 match self {
1675 Self::Allow => write!(f, "allow"),
1676 }
1677 }
1678}
1679impl TryFrom<&str> for NetworkPolicyRuleAction {
1680 type Error = PlanError;
1681 fn try_from(value: &str) -> Result<Self, Self::Error> {
1682 match value.to_uppercase().as_str() {
1683 "ALLOW" => Ok(Self::Allow),
1684 _ => Err(PlanError::Unstructured(
1685 "Allow is the only valid option".into(),
1686 )),
1687 }
1688 }
1689}
1690
1691#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1692pub enum NetworkPolicyRuleDirection {
1693 Ingress,
1694}
1695impl std::fmt::Display for NetworkPolicyRuleDirection {
1696 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1697 match self {
1698 Self::Ingress => write!(f, "ingress"),
1699 }
1700 }
1701}
1702impl TryFrom<&str> for NetworkPolicyRuleDirection {
1703 type Error = PlanError;
1704 fn try_from(value: &str) -> Result<Self, Self::Error> {
1705 match value.to_uppercase().as_str() {
1706 "INGRESS" => Ok(Self::Ingress),
1707 _ => Err(PlanError::Unstructured(
1708 "Ingress is the only valid option".into(),
1709 )),
1710 }
1711 }
1712}
1713
1714#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1715pub struct PolicyAddress(pub IpNet);
1716impl std::fmt::Display for PolicyAddress {
1717 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1718 write!(f, "{}", &self.0.to_string())
1719 }
1720}
1721impl From<String> for PolicyAddress {
1722 fn from(value: String) -> Self {
1723 Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1724 }
1725}
1726impl TryFrom<&str> for PolicyAddress {
1727 type Error = PlanError;
1728 fn try_from(value: &str) -> Result<Self, Self::Error> {
1729 let net = IpNet::from_str(value)
1730 .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1731 Ok(Self(net))
1732 }
1733}
1734
1735impl Serialize for PolicyAddress {
1736 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1737 where
1738 S: serde::Serializer,
1739 {
1740 serializer.serialize_str(&format!("{}", &self.0))
1741 }
1742}
1743
1744#[derive(Clone, Debug, Serialize)]
1745pub enum SshKey {
1746 PublicOnly(String),
1747 Both(SshKeyPair),
1748}
1749
1750impl SshKey {
1751 pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1752 match self {
1753 SshKey::PublicOnly(_) => None,
1754 SshKey::Both(key_pair) => Some(key_pair),
1755 }
1756 }
1757
1758 pub fn public_key(&self) -> String {
1759 match self {
1760 SshKey::PublicOnly(s) => s.into(),
1761 SshKey::Both(p) => p.ssh_public_key(),
1762 }
1763 }
1764}
1765
1766#[derive(Clone, Debug)]
1767pub struct Secret {
1768 pub create_sql: String,
1769 pub secret_as: MirScalarExpr,
1770}
1771
1772#[derive(Clone, Debug)]
1773pub struct Sink {
1774 pub create_sql: String,
1776 pub from: GlobalId,
1778 pub connection: StorageSinkConnection<ReferencedConnection>,
1780 pub envelope: SinkEnvelope,
1782 pub version: u64,
1783}
1784
1785#[derive(Clone, Debug)]
1786pub struct View {
1787 pub create_sql: String,
1789 pub expr: HirRelationExpr,
1791 pub dependencies: DependencyIds,
1793 pub column_names: Vec<ColumnName>,
1795 pub temporary: bool,
1797}
1798
1799#[derive(Clone, Debug)]
1800pub struct MaterializedView {
1801 pub create_sql: String,
1803 pub expr: HirRelationExpr,
1805 pub dependencies: DependencyIds,
1807 pub column_names: Vec<ColumnName>,
1809 pub cluster_id: ClusterId,
1811 pub non_null_assertions: Vec<usize>,
1812 pub compaction_window: Option<CompactionWindow>,
1813 pub refresh_schedule: Option<RefreshSchedule>,
1814 pub as_of: Option<Timestamp>,
1815}
1816
1817#[derive(Clone, Debug)]
1818pub struct Index {
1819 pub create_sql: String,
1821 pub on: GlobalId,
1823 pub keys: Vec<mz_expr::MirScalarExpr>,
1824 pub compaction_window: Option<CompactionWindow>,
1825 pub cluster_id: ClusterId,
1826}
1827
1828#[derive(Clone, Debug)]
1829pub struct Type {
1830 pub create_sql: String,
1831 pub inner: CatalogType<IdReference>,
1832}
1833
1834#[derive(Deserialize, Clone, Debug, PartialEq)]
1836pub enum QueryWhen {
1837 Immediately,
1840 FreshestTableWrite,
1843 AtTimestamp(MirScalarExpr),
1848 AtLeastTimestamp(MirScalarExpr),
1851}
1852
1853impl QueryWhen {
1854 pub fn advance_to_timestamp(&self) -> Option<MirScalarExpr> {
1856 match self {
1857 QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1858 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1859 }
1860 }
1861 pub fn constrains_upper(&self) -> bool {
1865 match self {
1866 QueryWhen::AtTimestamp(_) => true,
1867 QueryWhen::AtLeastTimestamp(_)
1868 | QueryWhen::Immediately
1869 | QueryWhen::FreshestTableWrite => false,
1870 }
1871 }
1872 pub fn advance_to_since(&self) -> bool {
1874 match self {
1875 QueryWhen::Immediately
1876 | QueryWhen::AtLeastTimestamp(_)
1877 | QueryWhen::FreshestTableWrite => true,
1878 QueryWhen::AtTimestamp(_) => false,
1879 }
1880 }
1881 pub fn can_advance_to_upper(&self) -> bool {
1883 match self {
1884 QueryWhen::Immediately => true,
1885 QueryWhen::FreshestTableWrite
1886 | QueryWhen::AtTimestamp(_)
1887 | QueryWhen::AtLeastTimestamp(_) => false,
1888 }
1889 }
1890
1891 pub fn can_advance_to_timeline_ts(&self) -> bool {
1893 match self {
1894 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1895 QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1896 }
1897 }
1898 pub fn must_advance_to_timeline_ts(&self) -> bool {
1900 match self {
1901 QueryWhen::FreshestTableWrite => true,
1902 QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1903 false
1904 }
1905 }
1906 }
1907 pub fn is_transactional(&self) -> bool {
1909 match self {
1910 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1911 QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1912 }
1913 }
1914}
1915
1916#[derive(Debug, Copy, Clone)]
1917pub enum MutationKind {
1918 Insert,
1919 Update,
1920 Delete,
1921}
1922
1923#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1924pub enum CopyFormat {
1925 Text,
1926 Csv,
1927 Binary,
1928 Parquet,
1929}
1930
1931#[derive(Debug, Copy, Clone)]
1932pub enum ExecuteTimeout {
1933 None,
1934 Seconds(f64),
1935 WaitOnce,
1936}
1937
1938#[derive(Clone, Debug)]
1939pub enum IndexOption {
1940 RetainHistory(CompactionWindow),
1942}
1943
1944#[derive(Clone, Debug)]
1945pub enum TableOption {
1946 RetainHistory(CompactionWindow),
1948}
1949
1950#[derive(Clone, Debug)]
1951pub struct PlanClusterOption {
1952 pub availability_zones: AlterOptionParameter<Vec<String>>,
1953 pub introspection_debugging: AlterOptionParameter<bool>,
1954 pub introspection_interval: AlterOptionParameter<OptionalDuration>,
1955 pub managed: AlterOptionParameter<bool>,
1956 pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
1957 pub replication_factor: AlterOptionParameter<u32>,
1958 pub size: AlterOptionParameter,
1959 pub schedule: AlterOptionParameter<ClusterSchedule>,
1960 pub workload_class: AlterOptionParameter<Option<String>>,
1961}
1962
1963impl Default for PlanClusterOption {
1964 fn default() -> Self {
1965 Self {
1966 availability_zones: AlterOptionParameter::Unchanged,
1967 introspection_debugging: AlterOptionParameter::Unchanged,
1968 introspection_interval: AlterOptionParameter::Unchanged,
1969 managed: AlterOptionParameter::Unchanged,
1970 replicas: AlterOptionParameter::Unchanged,
1971 replication_factor: AlterOptionParameter::Unchanged,
1972 size: AlterOptionParameter::Unchanged,
1973 schedule: AlterOptionParameter::Unchanged,
1974 workload_class: AlterOptionParameter::Unchanged,
1975 }
1976 }
1977}
1978
1979#[derive(Clone, Debug, PartialEq, Eq)]
1980pub enum AlterClusterPlanStrategy {
1981 None,
1982 For(Duration),
1983 UntilReady {
1984 on_timeout: OnTimeoutAction,
1985 timeout: Duration,
1986 },
1987}
1988
1989#[derive(Clone, Debug, PartialEq, Eq)]
1990pub enum OnTimeoutAction {
1991 Commit,
1992 Rollback,
1993}
1994
1995impl Default for OnTimeoutAction {
1996 fn default() -> Self {
1997 Self::Commit
1998 }
1999}
2000
2001impl TryFrom<&str> for OnTimeoutAction {
2002 type Error = PlanError;
2003 fn try_from(value: &str) -> Result<Self, Self::Error> {
2004 match value.to_uppercase().as_str() {
2005 "COMMIT" => Ok(Self::Commit),
2006 "ROLLBACK" => Ok(Self::Rollback),
2007 _ => Err(PlanError::Unstructured(
2008 "Valid options are COMMIT, ROLLBACK".into(),
2009 )),
2010 }
2011 }
2012}
2013
2014impl AlterClusterPlanStrategy {
2015 pub fn is_none(&self) -> bool {
2016 matches!(self, Self::None)
2017 }
2018 pub fn is_some(&self) -> bool {
2019 !matches!(self, Self::None)
2020 }
2021}
2022
2023impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2024 type Error = PlanError;
2025
2026 fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2027 Ok(match value.wait {
2028 Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2029 Some(ClusterAlterOptionValue::UntilReady(options)) => {
2030 let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2031 Self::UntilReady {
2032 timeout: match extracted.timeout {
2033 Some(d) => d,
2034 None => Err(PlanError::UntilReadyTimeoutRequired)?,
2035 },
2036 on_timeout: match extracted.on_timeout {
2037 Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2038 PlanError::InvalidOptionValue {
2039 option_name: "ON TIMEOUT".into(),
2040 err: Box::new(e),
2041 }
2042 })?,
2043 None => OnTimeoutAction::default(),
2044 },
2045 }
2046 }
2047 None => Self::None,
2048 })
2049 }
2050}
2051
2052#[derive(Debug, Clone)]
2054pub struct Params {
2055 pub datums: Row,
2057 pub execute_types: Vec<ScalarType>,
2059 pub expected_types: Vec<ScalarType>,
2061}
2062
2063impl Params {
2064 pub fn empty() -> Params {
2066 Params {
2067 datums: Row::pack_slice(&[]),
2068 execute_types: vec![],
2069 expected_types: vec![],
2070 }
2071 }
2072}
2073
2074#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, Copy)]
2076pub struct PlanContext {
2077 pub wall_time: DateTime<Utc>,
2078 pub ignore_if_exists_errors: bool,
2079}
2080
2081impl PlanContext {
2082 pub fn new(wall_time: DateTime<Utc>) -> Self {
2083 Self {
2084 wall_time,
2085 ignore_if_exists_errors: false,
2086 }
2087 }
2088
2089 pub fn zero() -> Self {
2093 PlanContext {
2094 wall_time: now::to_datetime(NOW_ZERO()),
2095 ignore_if_exists_errors: false,
2096 }
2097 }
2098
2099 pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2100 self.ignore_if_exists_errors = value;
2101 self
2102 }
2103}