1use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{
41 CollectionPlan, ColumnOrder, MapFilterProject, MirRelationExpr, MirScalarExpr, RowSetFinishing,
42};
43use mz_ore::now::{self, NOW_ZERO};
44use mz_pgcopy::CopyFormatParams;
45use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
46use mz_repr::explain::{ExplainConfig, ExplainFormat};
47use mz_repr::network_policy_id::NetworkPolicyId;
48use mz_repr::optimize::OptimizerFeatureOverrides;
49use mz_repr::refresh_schedule::RefreshSchedule;
50use mz_repr::role_id::RoleId;
51use mz_repr::{
52 CatalogItemId, ColumnIndex, ColumnName, ColumnType, Diff, GlobalId, RelationDesc, Row,
53 ScalarType, Timestamp, VersionedRelationDesc,
54};
55use mz_sql_parser::ast::{
56 AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
57 RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
58 Value, WithOptionValue,
59};
60use mz_ssh_util::keys::SshKeyPair;
61use mz_storage_types::connections::aws::AwsConnection;
62use mz_storage_types::connections::inline::ReferencedConnection;
63use mz_storage_types::connections::{
64 AwsPrivatelinkConnection, CsrConnection, KafkaConnection, MySqlConnection, PostgresConnection,
65 SqlServerConnectionDetails, SshConnection,
66};
67use mz_storage_types::instances::StorageInstanceId;
68use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
69use mz_storage_types::sources::{
70 SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
71};
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74
75use crate::ast::{
76 ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
77 TransactionAccessMode,
78};
79use crate::catalog::{
80 CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
81 RoleAttributes,
82};
83use crate::names::{
84 Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
85 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
86};
87
88pub(crate) mod error;
89pub(crate) mod explain;
90pub(crate) mod hir;
91pub(crate) mod literal;
92pub(crate) mod lowering;
93pub(crate) mod notice;
94pub(crate) mod plan_utils;
95pub(crate) mod query;
96pub(crate) mod scope;
97pub(crate) mod side_effecting_func;
98pub(crate) mod statement;
99pub(crate) mod transform_ast;
100pub(crate) mod transform_hir;
101pub(crate) mod typeconv;
102pub(crate) mod with_options;
103
104use crate::plan;
105use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
106use crate::plan::with_options::OptionalDuration;
107pub use error::PlanError;
108pub use explain::normalize_subqueries;
109pub use hir::{
110 AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
111 WindowExprType,
112};
113pub use lowering::Config as HirToMirConfig;
114pub use notice::PlanNotice;
115pub use query::{ExprContext, QueryContext, QueryLifetime};
116pub use scope::Scope;
117pub use side_effecting_func::SideEffectingFunc;
118pub use statement::ddl::{
119 AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
120 PlannedAlterRoleOption, PlannedRoleVariable, SqlServerConfigOptionExtracted,
121};
122pub use statement::{
123 StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
124 resolve_cluster_for_materialized_view,
125};
126
127use self::statement::ddl::ClusterAlterOptionExtracted;
128use self::with_options::TryFromValue;
129
130#[derive(Debug, EnumKind)]
132#[enum_kind(PlanKind)]
133pub enum Plan {
134 CreateConnection(CreateConnectionPlan),
135 CreateDatabase(CreateDatabasePlan),
136 CreateSchema(CreateSchemaPlan),
137 CreateRole(CreateRolePlan),
138 CreateCluster(CreateClusterPlan),
139 CreateClusterReplica(CreateClusterReplicaPlan),
140 CreateSource(CreateSourcePlan),
141 CreateSources(Vec<CreateSourcePlanBundle>),
142 CreateSecret(CreateSecretPlan),
143 CreateSink(CreateSinkPlan),
144 CreateTable(CreateTablePlan),
145 CreateView(CreateViewPlan),
146 CreateMaterializedView(CreateMaterializedViewPlan),
147 CreateContinualTask(CreateContinualTaskPlan),
148 CreateNetworkPolicy(CreateNetworkPolicyPlan),
149 CreateIndex(CreateIndexPlan),
150 CreateType(CreateTypePlan),
151 Comment(CommentPlan),
152 DiscardTemp,
153 DiscardAll,
154 DropObjects(DropObjectsPlan),
155 DropOwned(DropOwnedPlan),
156 EmptyQuery,
157 ShowAllVariables,
158 ShowCreate(ShowCreatePlan),
159 ShowColumns(ShowColumnsPlan),
160 ShowVariable(ShowVariablePlan),
161 InspectShard(InspectShardPlan),
162 SetVariable(SetVariablePlan),
163 ResetVariable(ResetVariablePlan),
164 SetTransaction(SetTransactionPlan),
165 StartTransaction(StartTransactionPlan),
166 CommitTransaction(CommitTransactionPlan),
167 AbortTransaction(AbortTransactionPlan),
168 Select(SelectPlan),
169 Subscribe(SubscribePlan),
170 CopyFrom(CopyFromPlan),
171 CopyTo(CopyToPlan),
172 ExplainPlan(ExplainPlanPlan),
173 ExplainPushdown(ExplainPushdownPlan),
174 ExplainTimestamp(ExplainTimestampPlan),
175 ExplainSinkSchema(ExplainSinkSchemaPlan),
176 Insert(InsertPlan),
177 AlterCluster(AlterClusterPlan),
178 AlterClusterSwap(AlterClusterSwapPlan),
179 AlterNoop(AlterNoopPlan),
180 AlterSetCluster(AlterSetClusterPlan),
181 AlterConnection(AlterConnectionPlan),
182 AlterSource(AlterSourcePlan),
183 AlterClusterRename(AlterClusterRenamePlan),
184 AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
185 AlterItemRename(AlterItemRenamePlan),
186 AlterSchemaRename(AlterSchemaRenamePlan),
187 AlterSchemaSwap(AlterSchemaSwapPlan),
188 AlterSecret(AlterSecretPlan),
189 AlterSink(AlterSinkPlan),
190 AlterSystemSet(AlterSystemSetPlan),
191 AlterSystemReset(AlterSystemResetPlan),
192 AlterSystemResetAll(AlterSystemResetAllPlan),
193 AlterRole(AlterRolePlan),
194 AlterOwner(AlterOwnerPlan),
195 AlterTableAddColumn(AlterTablePlan),
196 AlterNetworkPolicy(AlterNetworkPolicyPlan),
197 Declare(DeclarePlan),
198 Fetch(FetchPlan),
199 Close(ClosePlan),
200 ReadThenWrite(ReadThenWritePlan),
201 Prepare(PreparePlan),
202 Execute(ExecutePlan),
203 Deallocate(DeallocatePlan),
204 Raise(RaisePlan),
205 GrantRole(GrantRolePlan),
206 RevokeRole(RevokeRolePlan),
207 GrantPrivileges(GrantPrivilegesPlan),
208 RevokePrivileges(RevokePrivilegesPlan),
209 AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
210 ReassignOwned(ReassignOwnedPlan),
211 SideEffectingFunc(SideEffectingFunc),
212 ValidateConnection(ValidateConnectionPlan),
213 AlterRetainHistory(AlterRetainHistoryPlan),
214}
215
216impl Plan {
217 pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
220 match stmt {
221 StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
222 StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
223 StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
224 StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
225 StatementKind::AlterObjectRename => &[
226 PlanKind::AlterClusterRename,
227 PlanKind::AlterClusterReplicaRename,
228 PlanKind::AlterItemRename,
229 PlanKind::AlterSchemaRename,
230 PlanKind::AlterNoop,
231 ],
232 StatementKind::AlterObjectSwap => &[
233 PlanKind::AlterClusterSwap,
234 PlanKind::AlterSchemaSwap,
235 PlanKind::AlterNoop,
236 ],
237 StatementKind::AlterRole => &[PlanKind::AlterRole],
238 StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
239 StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
240 StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
241 StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
242 StatementKind::AlterSource => &[
243 PlanKind::AlterNoop,
244 PlanKind::AlterSource,
245 PlanKind::AlterRetainHistory,
246 ],
247 StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
248 StatementKind::AlterSystemResetAll => {
249 &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
250 }
251 StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
252 StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
253 StatementKind::AlterTableAddColumn => {
254 &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
255 }
256 StatementKind::Close => &[PlanKind::Close],
257 StatementKind::Comment => &[PlanKind::Comment],
258 StatementKind::Commit => &[PlanKind::CommitTransaction],
259 StatementKind::Copy => &[
260 PlanKind::CopyFrom,
261 PlanKind::Select,
262 PlanKind::Subscribe,
263 PlanKind::CopyTo,
264 ],
265 StatementKind::CreateCluster => &[PlanKind::CreateCluster],
266 StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
267 StatementKind::CreateConnection => &[PlanKind::CreateConnection],
268 StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
269 StatementKind::CreateIndex => &[PlanKind::CreateIndex],
270 StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
271 StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
272 StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
273 StatementKind::CreateRole => &[PlanKind::CreateRole],
274 StatementKind::CreateSchema => &[PlanKind::CreateSchema],
275 StatementKind::CreateSecret => &[PlanKind::CreateSecret],
276 StatementKind::CreateSink => &[PlanKind::CreateSink],
277 StatementKind::CreateSource | StatementKind::CreateSubsource => {
278 &[PlanKind::CreateSource]
279 }
280 StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
281 StatementKind::CreateTable => &[PlanKind::CreateTable],
282 StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
283 StatementKind::CreateType => &[PlanKind::CreateType],
284 StatementKind::CreateView => &[PlanKind::CreateView],
285 StatementKind::Deallocate => &[PlanKind::Deallocate],
286 StatementKind::Declare => &[PlanKind::Declare],
287 StatementKind::Delete => &[PlanKind::ReadThenWrite],
288 StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
289 StatementKind::DropObjects => &[PlanKind::DropObjects],
290 StatementKind::DropOwned => &[PlanKind::DropOwned],
291 StatementKind::Execute => &[PlanKind::Execute],
292 StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
293 StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
294 StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
295 StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
296 StatementKind::Fetch => &[PlanKind::Fetch],
297 StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
298 StatementKind::GrantRole => &[PlanKind::GrantRole],
299 StatementKind::Insert => &[PlanKind::Insert],
300 StatementKind::Prepare => &[PlanKind::Prepare],
301 StatementKind::Raise => &[PlanKind::Raise],
302 StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
303 StatementKind::ResetVariable => &[PlanKind::ResetVariable],
304 StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
305 StatementKind::RevokeRole => &[PlanKind::RevokeRole],
306 StatementKind::Rollback => &[PlanKind::AbortTransaction],
307 StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
308 StatementKind::SetTransaction => &[PlanKind::SetTransaction],
309 StatementKind::SetVariable => &[PlanKind::SetVariable],
310 StatementKind::Show => &[
311 PlanKind::Select,
312 PlanKind::ShowVariable,
313 PlanKind::ShowCreate,
314 PlanKind::ShowColumns,
315 PlanKind::ShowAllVariables,
316 PlanKind::InspectShard,
317 ],
318 StatementKind::StartTransaction => &[PlanKind::StartTransaction],
319 StatementKind::Subscribe => &[PlanKind::Subscribe],
320 StatementKind::Update => &[PlanKind::ReadThenWrite],
321 StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
322 StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
323 }
324 }
325
326 pub fn name(&self) -> &str {
328 match self {
329 Plan::CreateConnection(_) => "create connection",
330 Plan::CreateDatabase(_) => "create database",
331 Plan::CreateSchema(_) => "create schema",
332 Plan::CreateRole(_) => "create role",
333 Plan::CreateCluster(_) => "create cluster",
334 Plan::CreateClusterReplica(_) => "create cluster replica",
335 Plan::CreateSource(_) => "create source",
336 Plan::CreateSources(_) => "create source",
337 Plan::CreateSecret(_) => "create secret",
338 Plan::CreateSink(_) => "create sink",
339 Plan::CreateTable(_) => "create table",
340 Plan::CreateView(_) => "create view",
341 Plan::CreateMaterializedView(_) => "create materialized view",
342 Plan::CreateContinualTask(_) => "create continual task",
343 Plan::CreateIndex(_) => "create index",
344 Plan::CreateType(_) => "create type",
345 Plan::CreateNetworkPolicy(_) => "create network policy",
346 Plan::Comment(_) => "comment",
347 Plan::DiscardTemp => "discard temp",
348 Plan::DiscardAll => "discard all",
349 Plan::DropObjects(plan) => match plan.object_type {
350 ObjectType::Table => "drop table",
351 ObjectType::View => "drop view",
352 ObjectType::MaterializedView => "drop materialized view",
353 ObjectType::Source => "drop source",
354 ObjectType::Sink => "drop sink",
355 ObjectType::Index => "drop index",
356 ObjectType::Type => "drop type",
357 ObjectType::Role => "drop roles",
358 ObjectType::Cluster => "drop clusters",
359 ObjectType::ClusterReplica => "drop cluster replicas",
360 ObjectType::Secret => "drop secret",
361 ObjectType::Connection => "drop connection",
362 ObjectType::Database => "drop database",
363 ObjectType::Schema => "drop schema",
364 ObjectType::Func => "drop function",
365 ObjectType::ContinualTask => "drop continual task",
366 ObjectType::NetworkPolicy => "drop network policy",
367 },
368 Plan::DropOwned(_) => "drop owned",
369 Plan::EmptyQuery => "do nothing",
370 Plan::ShowAllVariables => "show all variables",
371 Plan::ShowCreate(_) => "show create",
372 Plan::ShowColumns(_) => "show columns",
373 Plan::ShowVariable(_) => "show variable",
374 Plan::InspectShard(_) => "inspect shard",
375 Plan::SetVariable(_) => "set variable",
376 Plan::ResetVariable(_) => "reset variable",
377 Plan::SetTransaction(_) => "set transaction",
378 Plan::StartTransaction(_) => "start transaction",
379 Plan::CommitTransaction(_) => "commit",
380 Plan::AbortTransaction(_) => "abort",
381 Plan::Select(_) => "select",
382 Plan::Subscribe(_) => "subscribe",
383 Plan::CopyFrom(_) => "copy from",
384 Plan::CopyTo(_) => "copy to",
385 Plan::ExplainPlan(_) => "explain plan",
386 Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
387 Plan::ExplainTimestamp(_) => "explain timestamp",
388 Plan::ExplainSinkSchema(_) => "explain schema",
389 Plan::Insert(_) => "insert",
390 Plan::AlterNoop(plan) => match plan.object_type {
391 ObjectType::Table => "alter table",
392 ObjectType::View => "alter view",
393 ObjectType::MaterializedView => "alter materialized view",
394 ObjectType::Source => "alter source",
395 ObjectType::Sink => "alter sink",
396 ObjectType::Index => "alter index",
397 ObjectType::Type => "alter type",
398 ObjectType::Role => "alter role",
399 ObjectType::Cluster => "alter cluster",
400 ObjectType::ClusterReplica => "alter cluster replica",
401 ObjectType::Secret => "alter secret",
402 ObjectType::Connection => "alter connection",
403 ObjectType::Database => "alter database",
404 ObjectType::Schema => "alter schema",
405 ObjectType::Func => "alter function",
406 ObjectType::ContinualTask => "alter continual task",
407 ObjectType::NetworkPolicy => "alter network policy",
408 },
409 Plan::AlterCluster(_) => "alter cluster",
410 Plan::AlterClusterRename(_) => "alter cluster rename",
411 Plan::AlterClusterSwap(_) => "alter cluster swap",
412 Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
413 Plan::AlterSetCluster(_) => "alter set cluster",
414 Plan::AlterConnection(_) => "alter connection",
415 Plan::AlterSource(_) => "alter source",
416 Plan::AlterItemRename(_) => "rename item",
417 Plan::AlterSchemaRename(_) => "alter rename schema",
418 Plan::AlterSchemaSwap(_) => "alter swap schema",
419 Plan::AlterSecret(_) => "alter secret",
420 Plan::AlterSink(_) => "alter sink",
421 Plan::AlterSystemSet(_) => "alter system",
422 Plan::AlterSystemReset(_) => "alter system",
423 Plan::AlterSystemResetAll(_) => "alter system",
424 Plan::AlterRole(_) => "alter role",
425 Plan::AlterNetworkPolicy(_) => "alter network policy",
426 Plan::AlterOwner(plan) => match plan.object_type {
427 ObjectType::Table => "alter table owner",
428 ObjectType::View => "alter view owner",
429 ObjectType::MaterializedView => "alter materialized view owner",
430 ObjectType::Source => "alter source owner",
431 ObjectType::Sink => "alter sink owner",
432 ObjectType::Index => "alter index owner",
433 ObjectType::Type => "alter type owner",
434 ObjectType::Role => "alter role owner",
435 ObjectType::Cluster => "alter cluster owner",
436 ObjectType::ClusterReplica => "alter cluster replica owner",
437 ObjectType::Secret => "alter secret owner",
438 ObjectType::Connection => "alter connection owner",
439 ObjectType::Database => "alter database owner",
440 ObjectType::Schema => "alter schema owner",
441 ObjectType::Func => "alter function owner",
442 ObjectType::ContinualTask => "alter continual task owner",
443 ObjectType::NetworkPolicy => "alter network policy owner",
444 },
445 Plan::AlterTableAddColumn(_) => "alter table add column",
446 Plan::Declare(_) => "declare",
447 Plan::Fetch(_) => "fetch",
448 Plan::Close(_) => "close",
449 Plan::ReadThenWrite(plan) => match plan.kind {
450 MutationKind::Insert => "insert into select",
451 MutationKind::Update => "update",
452 MutationKind::Delete => "delete",
453 },
454 Plan::Prepare(_) => "prepare",
455 Plan::Execute(_) => "execute",
456 Plan::Deallocate(_) => "deallocate",
457 Plan::Raise(_) => "raise",
458 Plan::GrantRole(_) => "grant role",
459 Plan::RevokeRole(_) => "revoke role",
460 Plan::GrantPrivileges(_) => "grant privilege",
461 Plan::RevokePrivileges(_) => "revoke privilege",
462 Plan::AlterDefaultPrivileges(_) => "alter default privileges",
463 Plan::ReassignOwned(_) => "reassign owned",
464 Plan::SideEffectingFunc(_) => "side effecting func",
465 Plan::ValidateConnection(_) => "validate connection",
466 Plan::AlterRetainHistory(_) => "alter retain history",
467 }
468 }
469
470 pub fn allowed_in_read_only(&self) -> bool {
476 match self {
477 Plan::SetVariable(_) => true,
480 Plan::ResetVariable(_) => true,
481 Plan::SetTransaction(_) => true,
482 Plan::StartTransaction(_) => true,
483 Plan::CommitTransaction(_) => true,
484 Plan::AbortTransaction(_) => true,
485 Plan::Select(_) => true,
486 Plan::EmptyQuery => true,
487 Plan::ShowAllVariables => true,
488 Plan::ShowCreate(_) => true,
489 Plan::ShowColumns(_) => true,
490 Plan::ShowVariable(_) => true,
491 Plan::InspectShard(_) => true,
492 Plan::Subscribe(_) => true,
493 Plan::CopyTo(_) => true,
494 Plan::ExplainPlan(_) => true,
495 Plan::ExplainPushdown(_) => true,
496 Plan::ExplainTimestamp(_) => true,
497 Plan::ExplainSinkSchema(_) => true,
498 Plan::ValidateConnection(_) => true,
499 _ => false,
500 }
501 }
502}
503
504#[derive(Debug)]
505pub struct StartTransactionPlan {
506 pub access: Option<TransactionAccessMode>,
507 pub isolation_level: Option<TransactionIsolationLevel>,
508}
509
510#[derive(Debug)]
511pub enum TransactionType {
512 Explicit,
513 Implicit,
514}
515
516impl TransactionType {
517 pub fn is_explicit(&self) -> bool {
518 matches!(self, TransactionType::Explicit)
519 }
520
521 pub fn is_implicit(&self) -> bool {
522 matches!(self, TransactionType::Implicit)
523 }
524}
525
526#[derive(Debug)]
527pub struct CommitTransactionPlan {
528 pub transaction_type: TransactionType,
529}
530
531#[derive(Debug)]
532pub struct AbortTransactionPlan {
533 pub transaction_type: TransactionType,
534}
535
536#[derive(Debug)]
537pub struct CreateDatabasePlan {
538 pub name: String,
539 pub if_not_exists: bool,
540}
541
542#[derive(Debug)]
543pub struct CreateSchemaPlan {
544 pub database_spec: ResolvedDatabaseSpecifier,
545 pub schema_name: String,
546 pub if_not_exists: bool,
547}
548
549#[derive(Debug)]
550pub struct CreateRolePlan {
551 pub name: String,
552 pub attributes: RoleAttributes,
553}
554
555#[derive(Debug, PartialEq, Eq, Clone)]
556pub struct CreateClusterPlan {
557 pub name: String,
558 pub variant: CreateClusterVariant,
559 pub workload_class: Option<String>,
560}
561
562#[derive(Debug, PartialEq, Eq, Clone)]
563pub enum CreateClusterVariant {
564 Managed(CreateClusterManagedPlan),
565 Unmanaged(CreateClusterUnmanagedPlan),
566}
567
568#[derive(Debug, PartialEq, Eq, Clone)]
569pub struct CreateClusterUnmanagedPlan {
570 pub replicas: Vec<(String, ReplicaConfig)>,
571}
572
573#[derive(Debug, PartialEq, Eq, Clone)]
574pub struct CreateClusterManagedPlan {
575 pub replication_factor: u32,
576 pub size: String,
577 pub availability_zones: Vec<String>,
578 pub compute: ComputeReplicaConfig,
579 pub disk: bool,
580 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
581 pub schedule: ClusterSchedule,
582}
583
584#[derive(Debug)]
585pub struct CreateClusterReplicaPlan {
586 pub cluster_id: ClusterId,
587 pub name: String,
588 pub config: ReplicaConfig,
589}
590
591#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq)]
593pub struct ComputeReplicaIntrospectionConfig {
594 pub debugging: bool,
596 pub interval: Duration,
598}
599
600#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
601pub struct ComputeReplicaConfig {
602 pub introspection: Option<ComputeReplicaIntrospectionConfig>,
603}
604
605#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
606pub enum ReplicaConfig {
607 Unorchestrated {
608 storagectl_addrs: Vec<String>,
609 storage_addrs: Vec<String>,
610 computectl_addrs: Vec<String>,
611 compute_addrs: Vec<String>,
612 workers: usize,
613 compute: ComputeReplicaConfig,
614 },
615 Orchestrated {
616 size: String,
617 availability_zone: Option<String>,
618 compute: ComputeReplicaConfig,
619 disk: bool,
620 internal: bool,
621 billed_as: Option<String>,
622 },
623}
624
625#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
626pub enum ClusterSchedule {
627 Manual,
629 Refresh { hydration_time_estimate: Duration },
633}
634
635impl Default for ClusterSchedule {
636 fn default() -> Self {
637 ClusterSchedule::Manual
639 }
640}
641
642#[derive(Debug)]
643pub struct CreateSourcePlan {
644 pub name: QualifiedItemName,
645 pub source: Source,
646 pub if_not_exists: bool,
647 pub timeline: Timeline,
648 pub in_cluster: Option<ClusterId>,
650}
651
652#[derive(Clone, Debug, PartialEq, Eq)]
653pub struct SourceReferences {
654 pub updated_at: u64,
655 pub references: Vec<SourceReference>,
656}
657
658#[derive(Clone, Debug, PartialEq, Eq)]
661pub struct SourceReference {
662 pub name: String,
663 pub namespace: Option<String>,
664 pub columns: Vec<String>,
665}
666
667#[derive(Debug)]
669pub struct CreateSourcePlanBundle {
670 pub item_id: CatalogItemId,
672 pub global_id: GlobalId,
674 pub plan: CreateSourcePlan,
676 pub resolved_ids: ResolvedIds,
678 pub available_source_references: Option<SourceReferences>,
682}
683
684#[derive(Debug)]
685pub struct CreateConnectionPlan {
686 pub name: QualifiedItemName,
687 pub if_not_exists: bool,
688 pub connection: Connection,
689 pub validate: bool,
690}
691
692#[derive(Debug)]
693pub struct ValidateConnectionPlan {
694 pub id: CatalogItemId,
696 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
698}
699
700#[derive(Debug)]
701pub struct CreateSecretPlan {
702 pub name: QualifiedItemName,
703 pub secret: Secret,
704 pub if_not_exists: bool,
705}
706
707#[derive(Debug)]
708pub struct CreateSinkPlan {
709 pub name: QualifiedItemName,
710 pub sink: Sink,
711 pub with_snapshot: bool,
712 pub if_not_exists: bool,
713 pub in_cluster: ClusterId,
714}
715
716#[derive(Debug)]
717pub struct CreateTablePlan {
718 pub name: QualifiedItemName,
719 pub table: Table,
720 pub if_not_exists: bool,
721}
722
723#[derive(Debug, Clone)]
724pub struct CreateViewPlan {
725 pub name: QualifiedItemName,
726 pub view: View,
727 pub replace: Option<CatalogItemId>,
729 pub drop_ids: Vec<CatalogItemId>,
731 pub if_not_exists: bool,
732 pub ambiguous_columns: bool,
735}
736
737#[derive(Debug, Clone)]
738pub struct CreateMaterializedViewPlan {
739 pub name: QualifiedItemName,
740 pub materialized_view: MaterializedView,
741 pub replace: Option<CatalogItemId>,
743 pub drop_ids: Vec<CatalogItemId>,
745 pub if_not_exists: bool,
746 pub ambiguous_columns: bool,
749}
750
751#[derive(Debug, Clone)]
752pub struct CreateContinualTaskPlan {
753 pub name: QualifiedItemName,
754 pub placeholder_id: Option<mz_expr::LocalId>,
757 pub desc: RelationDesc,
758 pub input_id: GlobalId,
760 pub with_snapshot: bool,
761 pub continual_task: MaterializedView,
763}
764
765#[derive(Debug, Clone)]
766pub struct CreateNetworkPolicyPlan {
767 pub name: String,
768 pub rules: Vec<NetworkPolicyRule>,
769}
770
771#[derive(Debug, Clone)]
772pub struct AlterNetworkPolicyPlan {
773 pub id: NetworkPolicyId,
774 pub name: String,
775 pub rules: Vec<NetworkPolicyRule>,
776}
777
778#[derive(Debug, Clone)]
779pub struct CreateIndexPlan {
780 pub name: QualifiedItemName,
781 pub index: Index,
782 pub if_not_exists: bool,
783}
784
785#[derive(Debug)]
786pub struct CreateTypePlan {
787 pub name: QualifiedItemName,
788 pub typ: Type,
789}
790
791#[derive(Debug)]
792pub struct DropObjectsPlan {
793 pub referenced_ids: Vec<ObjectId>,
795 pub drop_ids: Vec<ObjectId>,
797 pub object_type: ObjectType,
800}
801
802#[derive(Debug)]
803pub struct DropOwnedPlan {
804 pub role_ids: Vec<RoleId>,
806 pub drop_ids: Vec<ObjectId>,
808 pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
810 pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
812}
813
814#[derive(Debug)]
815pub struct ShowVariablePlan {
816 pub name: String,
817}
818
819#[derive(Debug)]
820pub struct InspectShardPlan {
821 pub id: GlobalId,
823}
824
825#[derive(Debug)]
826pub struct SetVariablePlan {
827 pub name: String,
828 pub value: VariableValue,
829 pub local: bool,
830}
831
832#[derive(Debug)]
833pub enum VariableValue {
834 Default,
835 Values(Vec<String>),
836}
837
838#[derive(Debug)]
839pub struct ResetVariablePlan {
840 pub name: String,
841}
842
843#[derive(Debug)]
844pub struct SetTransactionPlan {
845 pub local: bool,
846 pub modes: Vec<TransactionMode>,
847}
848
849#[derive(Clone, Debug)]
851pub struct SelectPlan {
852 pub select: Option<Box<SelectStatement<Aug>>>,
855 pub source: HirRelationExpr,
857 pub when: QueryWhen,
859 pub finishing: RowSetFinishing,
861 pub copy_to: Option<CopyFormat>,
863}
864
865#[derive(Debug)]
866pub enum SubscribeOutput {
867 Diffs,
868 WithinTimestampOrderBy {
869 order_by: Vec<ColumnOrder>,
871 },
872 EnvelopeUpsert {
873 order_by_keys: Vec<ColumnOrder>,
875 },
876 EnvelopeDebezium {
877 order_by_keys: Vec<ColumnOrder>,
879 },
880}
881
882#[derive(Debug)]
883pub struct SubscribePlan {
884 pub from: SubscribeFrom,
885 pub with_snapshot: bool,
886 pub when: QueryWhen,
887 pub up_to: Option<MirScalarExpr>,
888 pub copy_to: Option<CopyFormat>,
889 pub emit_progress: bool,
890 pub output: SubscribeOutput,
891}
892
893#[derive(Debug, Clone)]
894pub enum SubscribeFrom {
895 Id(GlobalId),
897 Query {
899 expr: MirRelationExpr,
900 desc: RelationDesc,
901 },
902}
903
904impl SubscribeFrom {
905 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
906 match self {
907 SubscribeFrom::Id(id) => BTreeSet::from([*id]),
908 SubscribeFrom::Query { expr, .. } => expr.depends_on(),
909 }
910 }
911
912 pub fn contains_temporal(&self) -> bool {
913 match self {
914 SubscribeFrom::Id(_) => false,
915 SubscribeFrom::Query { expr, .. } => expr.contains_temporal(),
916 }
917 }
918}
919
920#[derive(Debug)]
921pub struct ShowCreatePlan {
922 pub id: ObjectId,
923 pub row: Row,
924}
925
926#[derive(Debug)]
927pub struct ShowColumnsPlan {
928 pub id: CatalogItemId,
929 pub select_plan: SelectPlan,
930 pub new_resolved_ids: ResolvedIds,
931}
932
933#[derive(Debug)]
934pub struct CopyFromPlan {
935 pub id: CatalogItemId,
937 pub source: CopyFromSource,
939 pub columns: Vec<ColumnIndex>,
943 pub source_desc: RelationDesc,
945 pub mfp: MapFilterProject,
947 pub params: CopyFormatParams<'static>,
949 pub filter: Option<CopyFromFilter>,
951}
952
953#[derive(Debug)]
954pub enum CopyFromSource {
955 Stdin,
957 Url(HirScalarExpr),
961 AwsS3 {
963 uri: HirScalarExpr,
965 connection: AwsConnection,
967 connection_id: CatalogItemId,
969 },
970}
971
972#[derive(Debug)]
973pub enum CopyFromFilter {
974 Files(Vec<String>),
975 Pattern(String),
976}
977
978#[derive(Debug, Clone)]
979pub struct CopyToPlan {
980 pub select_plan: SelectPlan,
982 pub desc: RelationDesc,
983 pub to: HirScalarExpr,
985 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
986 pub connection_id: CatalogItemId,
988 pub format: S3SinkFormat,
989 pub max_file_size: u64,
990}
991
992#[derive(Clone, Debug)]
993pub struct ExplainPlanPlan {
994 pub stage: ExplainStage,
995 pub format: ExplainFormat,
996 pub config: ExplainConfig,
997 pub explainee: Explainee,
998}
999
1000#[derive(Clone, Debug)]
1002pub enum Explainee {
1003 View(CatalogItemId),
1005 MaterializedView(CatalogItemId),
1007 Index(CatalogItemId),
1009 ReplanView(CatalogItemId),
1011 ReplanMaterializedView(CatalogItemId),
1013 ReplanIndex(CatalogItemId),
1015 Statement(ExplaineeStatement),
1017}
1018
1019#[derive(Clone, Debug, EnumKind)]
1021#[enum_kind(ExplaineeStatementKind)]
1022pub enum ExplaineeStatement {
1023 Select {
1025 broken: bool,
1027 plan: plan::SelectPlan,
1028 desc: RelationDesc,
1029 },
1030 CreateView {
1032 broken: bool,
1034 plan: plan::CreateViewPlan,
1035 },
1036 CreateMaterializedView {
1038 broken: bool,
1040 plan: plan::CreateMaterializedViewPlan,
1041 },
1042 CreateIndex {
1044 broken: bool,
1046 plan: plan::CreateIndexPlan,
1047 },
1048}
1049
1050impl ExplaineeStatement {
1051 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1052 match self {
1053 Self::Select { plan, .. } => plan.source.depends_on(),
1054 Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1055 Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1056 Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1057 }
1058 }
1059
1060 pub fn broken(&self) -> bool {
1071 match self {
1072 Self::Select { broken, .. } => *broken,
1073 Self::CreateView { broken, .. } => *broken,
1074 Self::CreateMaterializedView { broken, .. } => *broken,
1075 Self::CreateIndex { broken, .. } => *broken,
1076 }
1077 }
1078}
1079
1080impl ExplaineeStatementKind {
1081 pub fn supports(&self, stage: &ExplainStage) -> bool {
1082 use ExplainStage::*;
1083 match self {
1084 Self::Select => true,
1085 Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1086 Self::CreateMaterializedView => true,
1087 Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1088 }
1089 }
1090}
1091
1092impl std::fmt::Display for ExplaineeStatementKind {
1093 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1094 match self {
1095 Self::Select => write!(f, "SELECT"),
1096 Self::CreateView => write!(f, "CREATE VIEW"),
1097 Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1098 Self::CreateIndex => write!(f, "CREATE INDEX"),
1099 }
1100 }
1101}
1102
1103#[derive(Clone, Debug)]
1104pub struct ExplainPushdownPlan {
1105 pub explainee: Explainee,
1106}
1107
1108#[derive(Clone, Debug)]
1109pub struct ExplainTimestampPlan {
1110 pub format: ExplainFormat,
1111 pub raw_plan: HirRelationExpr,
1112 pub when: QueryWhen,
1113}
1114
1115#[derive(Debug)]
1116pub struct ExplainSinkSchemaPlan {
1117 pub sink_from: GlobalId,
1118 pub json_schema: String,
1119}
1120
1121#[derive(Debug)]
1122pub struct SendDiffsPlan {
1123 pub id: CatalogItemId,
1124 pub updates: Vec<(Row, Diff)>,
1125 pub kind: MutationKind,
1126 pub returning: Vec<(Row, NonZeroUsize)>,
1127 pub max_result_size: u64,
1128}
1129
1130#[derive(Debug)]
1131pub struct InsertPlan {
1132 pub id: CatalogItemId,
1133 pub values: HirRelationExpr,
1134 pub returning: Vec<mz_expr::MirScalarExpr>,
1135}
1136
1137#[derive(Debug)]
1138pub struct ReadThenWritePlan {
1139 pub id: CatalogItemId,
1140 pub selection: HirRelationExpr,
1141 pub finishing: RowSetFinishing,
1142 pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1143 pub kind: MutationKind,
1144 pub returning: Vec<mz_expr::MirScalarExpr>,
1145}
1146
1147#[derive(Debug)]
1149pub struct AlterNoopPlan {
1150 pub object_type: ObjectType,
1151}
1152
1153#[derive(Debug)]
1154pub struct AlterSetClusterPlan {
1155 pub id: CatalogItemId,
1156 pub set_cluster: ClusterId,
1157}
1158
1159#[derive(Debug)]
1160pub struct AlterRetainHistoryPlan {
1161 pub id: CatalogItemId,
1162 pub value: Option<Value>,
1163 pub window: CompactionWindow,
1164 pub object_type: ObjectType,
1165}
1166
1167#[derive(Debug, Clone)]
1168
1169pub enum AlterOptionParameter<T = String> {
1170 Set(T),
1171 Reset,
1172 Unchanged,
1173}
1174
1175#[derive(Debug)]
1176pub enum AlterConnectionAction {
1177 RotateKeys,
1178 AlterOptions {
1179 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1180 drop_options: BTreeSet<ConnectionOptionName>,
1181 validate: bool,
1182 },
1183}
1184
1185#[derive(Debug)]
1186pub struct AlterConnectionPlan {
1187 pub id: CatalogItemId,
1188 pub action: AlterConnectionAction,
1189}
1190
1191#[derive(Debug)]
1192pub enum AlterSourceAction {
1193 AddSubsourceExports {
1194 subsources: Vec<CreateSourcePlanBundle>,
1195 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1196 },
1197 RefreshReferences {
1198 references: SourceReferences,
1199 },
1200}
1201
1202#[derive(Debug)]
1203pub struct AlterSourcePlan {
1204 pub item_id: CatalogItemId,
1205 pub ingestion_id: GlobalId,
1206 pub action: AlterSourceAction,
1207}
1208
1209#[derive(Debug, Clone)]
1210pub struct AlterSinkPlan {
1211 pub item_id: CatalogItemId,
1212 pub global_id: GlobalId,
1213 pub sink: Sink,
1214 pub with_snapshot: bool,
1215 pub in_cluster: ClusterId,
1216}
1217
1218#[derive(Debug, Clone)]
1219pub struct AlterClusterPlan {
1220 pub id: ClusterId,
1221 pub name: String,
1222 pub options: PlanClusterOption,
1223 pub strategy: AlterClusterPlanStrategy,
1224}
1225
1226#[derive(Debug)]
1227pub struct AlterClusterRenamePlan {
1228 pub id: ClusterId,
1229 pub name: String,
1230 pub to_name: String,
1231}
1232
1233#[derive(Debug)]
1234pub struct AlterClusterReplicaRenamePlan {
1235 pub cluster_id: ClusterId,
1236 pub replica_id: ReplicaId,
1237 pub name: QualifiedReplica,
1238 pub to_name: String,
1239}
1240
1241#[derive(Debug)]
1242pub struct AlterItemRenamePlan {
1243 pub id: CatalogItemId,
1244 pub current_full_name: FullItemName,
1245 pub to_name: String,
1246 pub object_type: ObjectType,
1247}
1248
1249#[derive(Debug)]
1250pub struct AlterSchemaRenamePlan {
1251 pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1252 pub new_schema_name: String,
1253}
1254
1255#[derive(Debug)]
1256pub struct AlterSchemaSwapPlan {
1257 pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1258 pub schema_a_name: String,
1259 pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1260 pub schema_b_name: String,
1261 pub name_temp: String,
1262}
1263
1264#[derive(Debug)]
1265pub struct AlterClusterSwapPlan {
1266 pub id_a: ClusterId,
1267 pub id_b: ClusterId,
1268 pub name_a: String,
1269 pub name_b: String,
1270 pub name_temp: String,
1271}
1272
1273#[derive(Debug)]
1274pub struct AlterSecretPlan {
1275 pub id: CatalogItemId,
1276 pub secret_as: MirScalarExpr,
1277}
1278
1279#[derive(Debug)]
1280pub struct AlterSystemSetPlan {
1281 pub name: String,
1282 pub value: VariableValue,
1283}
1284
1285#[derive(Debug)]
1286pub struct AlterSystemResetPlan {
1287 pub name: String,
1288}
1289
1290#[derive(Debug)]
1291pub struct AlterSystemResetAllPlan {}
1292
1293#[derive(Debug)]
1294pub struct AlterRolePlan {
1295 pub id: RoleId,
1296 pub name: String,
1297 pub option: PlannedAlterRoleOption,
1298}
1299
1300#[derive(Debug)]
1301pub struct AlterOwnerPlan {
1302 pub id: ObjectId,
1303 pub object_type: ObjectType,
1304 pub new_owner: RoleId,
1305}
1306
1307#[derive(Debug)]
1308pub struct AlterTablePlan {
1309 pub relation_id: CatalogItemId,
1310 pub column_name: ColumnName,
1311 pub column_type: ColumnType,
1312 pub raw_sql_type: RawDataType,
1313}
1314
1315#[derive(Debug)]
1316pub struct DeclarePlan {
1317 pub name: String,
1318 pub stmt: Statement<Raw>,
1319 pub sql: String,
1320 pub params: Params,
1321}
1322
1323#[derive(Debug)]
1324pub struct FetchPlan {
1325 pub name: String,
1326 pub count: Option<FetchDirection>,
1327 pub timeout: ExecuteTimeout,
1328}
1329
1330#[derive(Debug)]
1331pub struct ClosePlan {
1332 pub name: String,
1333}
1334
1335#[derive(Debug)]
1336pub struct PreparePlan {
1337 pub name: String,
1338 pub stmt: Statement<Raw>,
1339 pub sql: String,
1340 pub desc: StatementDesc,
1341}
1342
1343#[derive(Debug)]
1344pub struct ExecutePlan {
1345 pub name: String,
1346 pub params: Params,
1347}
1348
1349#[derive(Debug)]
1350pub struct DeallocatePlan {
1351 pub name: Option<String>,
1352}
1353
1354#[derive(Debug)]
1355pub struct RaisePlan {
1356 pub severity: NoticeSeverity,
1357}
1358
1359#[derive(Debug)]
1360pub struct GrantRolePlan {
1361 pub role_ids: Vec<RoleId>,
1363 pub member_ids: Vec<RoleId>,
1365 pub grantor_id: RoleId,
1367}
1368
1369#[derive(Debug)]
1370pub struct RevokeRolePlan {
1371 pub role_ids: Vec<RoleId>,
1373 pub member_ids: Vec<RoleId>,
1375 pub grantor_id: RoleId,
1377}
1378
1379#[derive(Debug)]
1380pub struct UpdatePrivilege {
1381 pub acl_mode: AclMode,
1383 pub target_id: SystemObjectId,
1385 pub grantor: RoleId,
1387}
1388
1389#[derive(Debug)]
1390pub struct GrantPrivilegesPlan {
1391 pub update_privileges: Vec<UpdatePrivilege>,
1393 pub grantees: Vec<RoleId>,
1395}
1396
1397#[derive(Debug)]
1398pub struct RevokePrivilegesPlan {
1399 pub update_privileges: Vec<UpdatePrivilege>,
1401 pub revokees: Vec<RoleId>,
1403}
1404#[derive(Debug)]
1405pub struct AlterDefaultPrivilegesPlan {
1406 pub privilege_objects: Vec<DefaultPrivilegeObject>,
1408 pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1410 pub is_grant: bool,
1412}
1413
1414#[derive(Debug)]
1415pub struct ReassignOwnedPlan {
1416 pub old_roles: Vec<RoleId>,
1418 pub new_role: RoleId,
1420 pub reassign_ids: Vec<ObjectId>,
1422}
1423
1424#[derive(Debug)]
1425pub struct CommentPlan {
1426 pub object_id: CommentObjectId,
1428 pub sub_component: Option<usize>,
1432 pub comment: Option<String>,
1434}
1435
1436#[derive(Clone, Debug)]
1437pub enum TableDataSource {
1438 TableWrites { defaults: Vec<Expr<Aug>> },
1440
1441 DataSource {
1444 desc: DataSourceDesc,
1445 timeline: Timeline,
1446 },
1447}
1448
1449#[derive(Clone, Debug)]
1450pub struct Table {
1451 pub create_sql: String,
1452 pub desc: VersionedRelationDesc,
1453 pub temporary: bool,
1454 pub compaction_window: Option<CompactionWindow>,
1455 pub data_source: TableDataSource,
1456}
1457
1458#[derive(Clone, Debug)]
1459pub struct Source {
1460 pub create_sql: String,
1461 pub data_source: DataSourceDesc,
1462 pub desc: RelationDesc,
1463 pub compaction_window: Option<CompactionWindow>,
1464}
1465
1466#[derive(Debug, Clone)]
1467pub enum DataSourceDesc {
1468 Ingestion(Ingestion),
1470 IngestionExport {
1473 ingestion_id: CatalogItemId,
1474 external_reference: UnresolvedItemName,
1475 details: SourceExportDetails,
1476 data_config: SourceExportDataConfig<ReferencedConnection>,
1477 },
1478 Progress,
1480 Webhook {
1482 validate_using: Option<WebhookValidation>,
1483 body_format: WebhookBodyFormat,
1484 headers: WebhookHeaders,
1485 cluster_id: Option<StorageInstanceId>,
1487 },
1488}
1489
1490#[derive(Clone, Debug, Serialize, Deserialize)]
1491pub struct Ingestion {
1492 pub desc: SourceDesc<ReferencedConnection>,
1493 pub progress_subsource: CatalogItemId,
1494}
1495
1496#[derive(Clone, Debug, Serialize)]
1497pub struct WebhookValidation {
1498 pub expression: MirScalarExpr,
1500 pub relation_desc: RelationDesc,
1502 pub bodies: Vec<(usize, bool)>,
1504 pub headers: Vec<(usize, bool)>,
1506 pub secrets: Vec<WebhookValidationSecret>,
1508}
1509
1510impl WebhookValidation {
1511 const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1512
1513 pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1518 let WebhookValidation {
1519 expression,
1520 relation_desc,
1521 ..
1522 } = self;
1523
1524 let mut expression_ = expression.clone();
1526 let desc_ = relation_desc.clone();
1527 let reduce_task = mz_ore::task::spawn_blocking(
1528 || "webhook-validation-reduce",
1529 move || {
1530 expression_.reduce(&desc_.typ().column_types);
1531 expression_
1532 },
1533 );
1534
1535 match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1536 Ok(Ok(reduced_expr)) => {
1537 *expression = reduced_expr;
1538 Ok(())
1539 }
1540 Ok(Err(_)) => Err("joining task"),
1541 Err(_) => Err("timeout"),
1542 }
1543 }
1544}
1545
1546#[derive(Clone, Debug, Default, Serialize)]
1547pub struct WebhookHeaders {
1548 pub header_column: Option<WebhookHeaderFilters>,
1550 pub mapped_headers: BTreeMap<usize, (String, bool)>,
1552}
1553
1554impl WebhookHeaders {
1555 pub fn num_columns(&self) -> usize {
1557 let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1558 let mapped_headers = self.mapped_headers.len();
1559
1560 header_column + mapped_headers
1561 }
1562}
1563
1564#[derive(Clone, Debug, Default, Serialize)]
1565pub struct WebhookHeaderFilters {
1566 pub block: BTreeSet<String>,
1567 pub allow: BTreeSet<String>,
1568}
1569
1570#[derive(Copy, Clone, Debug, Serialize, Arbitrary)]
1571pub enum WebhookBodyFormat {
1572 Json { array: bool },
1573 Bytes,
1574 Text,
1575}
1576
1577impl From<WebhookBodyFormat> for ScalarType {
1578 fn from(value: WebhookBodyFormat) -> Self {
1579 match value {
1580 WebhookBodyFormat::Json { .. } => ScalarType::Jsonb,
1581 WebhookBodyFormat::Bytes => ScalarType::Bytes,
1582 WebhookBodyFormat::Text => ScalarType::String,
1583 }
1584 }
1585}
1586
1587#[derive(Clone, Debug, Serialize)]
1588pub struct WebhookValidationSecret {
1589 pub id: CatalogItemId,
1591 pub column_idx: usize,
1593 pub use_bytes: bool,
1595}
1596
1597#[derive(Clone, Debug)]
1598pub struct Connection {
1599 pub create_sql: String,
1600 pub details: ConnectionDetails,
1601}
1602
1603#[derive(Clone, Debug, Serialize)]
1604pub enum ConnectionDetails {
1605 Kafka(KafkaConnection<ReferencedConnection>),
1606 Csr(CsrConnection<ReferencedConnection>),
1607 Postgres(PostgresConnection<ReferencedConnection>),
1608 Ssh {
1609 connection: SshConnection,
1610 key_1: SshKey,
1611 key_2: SshKey,
1612 },
1613 Aws(AwsConnection),
1614 AwsPrivatelink(AwsPrivatelinkConnection),
1615 MySql(MySqlConnection<ReferencedConnection>),
1616 SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1617}
1618
1619impl ConnectionDetails {
1620 pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1621 match self {
1622 ConnectionDetails::Kafka(c) => {
1623 mz_storage_types::connections::Connection::Kafka(c.clone())
1624 }
1625 ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1626 ConnectionDetails::Postgres(c) => {
1627 mz_storage_types::connections::Connection::Postgres(c.clone())
1628 }
1629 ConnectionDetails::Ssh { connection, .. } => {
1630 mz_storage_types::connections::Connection::Ssh(connection.clone())
1631 }
1632 ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1633 ConnectionDetails::AwsPrivatelink(c) => {
1634 mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1635 }
1636 ConnectionDetails::MySql(c) => {
1637 mz_storage_types::connections::Connection::MySql(c.clone())
1638 }
1639 ConnectionDetails::SqlServer(c) => {
1640 mz_storage_types::connections::Connection::SqlServer(c.clone())
1641 }
1642 }
1643 }
1644}
1645
1646#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1647pub struct NetworkPolicyRule {
1648 pub name: String,
1649 pub action: NetworkPolicyRuleAction,
1650 pub address: PolicyAddress,
1651 pub direction: NetworkPolicyRuleDirection,
1652}
1653
1654#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1655pub enum NetworkPolicyRuleAction {
1656 Allow,
1657}
1658
1659impl std::fmt::Display for NetworkPolicyRuleAction {
1660 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1661 match self {
1662 Self::Allow => write!(f, "allow"),
1663 }
1664 }
1665}
1666impl TryFrom<&str> for NetworkPolicyRuleAction {
1667 type Error = PlanError;
1668 fn try_from(value: &str) -> Result<Self, Self::Error> {
1669 match value.to_uppercase().as_str() {
1670 "ALLOW" => Ok(Self::Allow),
1671 _ => Err(PlanError::Unstructured(
1672 "Allow is the only valid option".into(),
1673 )),
1674 }
1675 }
1676}
1677
1678#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1679pub enum NetworkPolicyRuleDirection {
1680 Ingress,
1681}
1682impl std::fmt::Display for NetworkPolicyRuleDirection {
1683 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1684 match self {
1685 Self::Ingress => write!(f, "ingress"),
1686 }
1687 }
1688}
1689impl TryFrom<&str> for NetworkPolicyRuleDirection {
1690 type Error = PlanError;
1691 fn try_from(value: &str) -> Result<Self, Self::Error> {
1692 match value.to_uppercase().as_str() {
1693 "INGRESS" => Ok(Self::Ingress),
1694 _ => Err(PlanError::Unstructured(
1695 "Ingress is the only valid option".into(),
1696 )),
1697 }
1698 }
1699}
1700
1701#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1702pub struct PolicyAddress(pub IpNet);
1703impl std::fmt::Display for PolicyAddress {
1704 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1705 write!(f, "{}", &self.0.to_string())
1706 }
1707}
1708impl From<String> for PolicyAddress {
1709 fn from(value: String) -> Self {
1710 Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1711 }
1712}
1713impl TryFrom<&str> for PolicyAddress {
1714 type Error = PlanError;
1715 fn try_from(value: &str) -> Result<Self, Self::Error> {
1716 let net = IpNet::from_str(value)
1717 .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1718 Ok(Self(net))
1719 }
1720}
1721
1722impl Serialize for PolicyAddress {
1723 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1724 where
1725 S: serde::Serializer,
1726 {
1727 serializer.serialize_str(&format!("{}", &self.0))
1728 }
1729}
1730
1731#[derive(Clone, Debug, Serialize)]
1732pub enum SshKey {
1733 PublicOnly(String),
1734 Both(SshKeyPair),
1735}
1736
1737impl SshKey {
1738 pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1739 match self {
1740 SshKey::PublicOnly(_) => None,
1741 SshKey::Both(key_pair) => Some(key_pair),
1742 }
1743 }
1744
1745 pub fn public_key(&self) -> String {
1746 match self {
1747 SshKey::PublicOnly(s) => s.into(),
1748 SshKey::Both(p) => p.ssh_public_key(),
1749 }
1750 }
1751}
1752
1753#[derive(Clone, Debug)]
1754pub struct Secret {
1755 pub create_sql: String,
1756 pub secret_as: MirScalarExpr,
1757}
1758
1759#[derive(Clone, Debug)]
1760pub struct Sink {
1761 pub create_sql: String,
1763 pub from: GlobalId,
1765 pub connection: StorageSinkConnection<ReferencedConnection>,
1767 pub envelope: SinkEnvelope,
1769 pub version: u64,
1770}
1771
1772#[derive(Clone, Debug)]
1773pub struct View {
1774 pub create_sql: String,
1776 pub expr: HirRelationExpr,
1778 pub dependencies: DependencyIds,
1780 pub column_names: Vec<ColumnName>,
1782 pub temporary: bool,
1784}
1785
1786#[derive(Clone, Debug)]
1787pub struct MaterializedView {
1788 pub create_sql: String,
1790 pub expr: HirRelationExpr,
1792 pub dependencies: DependencyIds,
1794 pub column_names: Vec<ColumnName>,
1796 pub cluster_id: ClusterId,
1798 pub non_null_assertions: Vec<usize>,
1799 pub compaction_window: Option<CompactionWindow>,
1800 pub refresh_schedule: Option<RefreshSchedule>,
1801 pub as_of: Option<Timestamp>,
1802}
1803
1804#[derive(Clone, Debug)]
1805pub struct Index {
1806 pub create_sql: String,
1808 pub on: GlobalId,
1810 pub keys: Vec<mz_expr::MirScalarExpr>,
1811 pub compaction_window: Option<CompactionWindow>,
1812 pub cluster_id: ClusterId,
1813}
1814
1815#[derive(Clone, Debug)]
1816pub struct Type {
1817 pub create_sql: String,
1818 pub inner: CatalogType<IdReference>,
1819}
1820
1821#[derive(Deserialize, Clone, Debug, PartialEq)]
1823pub enum QueryWhen {
1824 Immediately,
1827 FreshestTableWrite,
1830 AtTimestamp(MirScalarExpr),
1835 AtLeastTimestamp(MirScalarExpr),
1838}
1839
1840impl QueryWhen {
1841 pub fn advance_to_timestamp(&self) -> Option<MirScalarExpr> {
1843 match self {
1844 QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1845 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1846 }
1847 }
1848 pub fn constrains_upper(&self) -> bool {
1852 match self {
1853 QueryWhen::AtTimestamp(_) => true,
1854 QueryWhen::AtLeastTimestamp(_)
1855 | QueryWhen::Immediately
1856 | QueryWhen::FreshestTableWrite => false,
1857 }
1858 }
1859 pub fn advance_to_since(&self) -> bool {
1861 match self {
1862 QueryWhen::Immediately
1863 | QueryWhen::AtLeastTimestamp(_)
1864 | QueryWhen::FreshestTableWrite => true,
1865 QueryWhen::AtTimestamp(_) => false,
1866 }
1867 }
1868 pub fn can_advance_to_upper(&self) -> bool {
1870 match self {
1871 QueryWhen::Immediately => true,
1872 QueryWhen::FreshestTableWrite
1873 | QueryWhen::AtTimestamp(_)
1874 | QueryWhen::AtLeastTimestamp(_) => false,
1875 }
1876 }
1877
1878 pub fn can_advance_to_timeline_ts(&self) -> bool {
1880 match self {
1881 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1882 QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1883 }
1884 }
1885 pub fn must_advance_to_timeline_ts(&self) -> bool {
1887 match self {
1888 QueryWhen::FreshestTableWrite => true,
1889 QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1890 false
1891 }
1892 }
1893 }
1894 pub fn is_transactional(&self) -> bool {
1896 match self {
1897 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1898 QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1899 }
1900 }
1901}
1902
1903#[derive(Debug, Copy, Clone)]
1904pub enum MutationKind {
1905 Insert,
1906 Update,
1907 Delete,
1908}
1909
1910#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1911pub enum CopyFormat {
1912 Text,
1913 Csv,
1914 Binary,
1915 Parquet,
1916}
1917
1918#[derive(Debug, Copy, Clone)]
1919pub enum ExecuteTimeout {
1920 None,
1921 Seconds(f64),
1922 WaitOnce,
1923}
1924
1925#[derive(Clone, Debug)]
1926pub enum IndexOption {
1927 RetainHistory(CompactionWindow),
1929}
1930
1931#[derive(Clone, Debug)]
1932pub enum TableOption {
1933 RetainHistory(CompactionWindow),
1935}
1936
1937#[derive(Clone, Debug)]
1938pub struct PlanClusterOption {
1939 pub availability_zones: AlterOptionParameter<Vec<String>>,
1940 pub introspection_debugging: AlterOptionParameter<bool>,
1941 pub introspection_interval: AlterOptionParameter<OptionalDuration>,
1942 pub managed: AlterOptionParameter<bool>,
1943 pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
1944 pub replication_factor: AlterOptionParameter<u32>,
1945 pub size: AlterOptionParameter,
1946 pub disk: AlterOptionParameter<bool>,
1947 pub schedule: AlterOptionParameter<ClusterSchedule>,
1948 pub workload_class: AlterOptionParameter<Option<String>>,
1949}
1950
1951impl Default for PlanClusterOption {
1952 fn default() -> Self {
1953 Self {
1954 availability_zones: AlterOptionParameter::Unchanged,
1955 introspection_debugging: AlterOptionParameter::Unchanged,
1956 introspection_interval: AlterOptionParameter::Unchanged,
1957 managed: AlterOptionParameter::Unchanged,
1958 replicas: AlterOptionParameter::Unchanged,
1959 replication_factor: AlterOptionParameter::Unchanged,
1960 size: AlterOptionParameter::Unchanged,
1961 disk: AlterOptionParameter::Unchanged,
1962 schedule: AlterOptionParameter::Unchanged,
1963 workload_class: AlterOptionParameter::Unchanged,
1964 }
1965 }
1966}
1967
1968#[derive(Clone, Debug, PartialEq, Eq)]
1969pub enum AlterClusterPlanStrategy {
1970 None,
1971 For(Duration),
1972 UntilReady {
1973 on_timeout: OnTimeoutAction,
1974 timeout: Duration,
1975 },
1976}
1977
1978#[derive(Clone, Debug, PartialEq, Eq)]
1979pub enum OnTimeoutAction {
1980 Commit,
1981 Rollback,
1982}
1983
1984impl Default for OnTimeoutAction {
1985 fn default() -> Self {
1986 Self::Commit
1987 }
1988}
1989
1990impl TryFrom<&str> for OnTimeoutAction {
1991 type Error = PlanError;
1992 fn try_from(value: &str) -> Result<Self, Self::Error> {
1993 match value.to_uppercase().as_str() {
1994 "COMMIT" => Ok(Self::Commit),
1995 "ROLLBACK" => Ok(Self::Rollback),
1996 _ => Err(PlanError::Unstructured(
1997 "Valid options are COMMIT, ROLLBACK".into(),
1998 )),
1999 }
2000 }
2001}
2002
2003impl AlterClusterPlanStrategy {
2004 pub fn is_none(&self) -> bool {
2005 matches!(self, Self::None)
2006 }
2007 pub fn is_some(&self) -> bool {
2008 !matches!(self, Self::None)
2009 }
2010}
2011
2012impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2013 type Error = PlanError;
2014
2015 fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2016 Ok(match value.wait {
2017 Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2018 Some(ClusterAlterOptionValue::UntilReady(options)) => {
2019 let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2020 Self::UntilReady {
2021 timeout: match extracted.timeout {
2022 Some(d) => d,
2023 None => Err(PlanError::UntilReadyTimeoutRequired)?,
2024 },
2025 on_timeout: match extracted.on_timeout {
2026 Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2027 PlanError::InvalidOptionValue {
2028 option_name: "ON TIMEOUT".into(),
2029 err: Box::new(e),
2030 }
2031 })?,
2032 None => OnTimeoutAction::default(),
2033 },
2034 }
2035 }
2036 None => Self::None,
2037 })
2038 }
2039}
2040
2041#[derive(Debug, Clone)]
2043pub struct Params {
2044 pub datums: Row,
2045 pub types: Vec<ScalarType>,
2046}
2047
2048impl Params {
2049 pub fn empty() -> Params {
2051 Params {
2052 datums: Row::pack_slice(&[]),
2053 types: vec![],
2054 }
2055 }
2056}
2057
2058#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, Copy)]
2060pub struct PlanContext {
2061 pub wall_time: DateTime<Utc>,
2062 pub ignore_if_exists_errors: bool,
2063}
2064
2065impl PlanContext {
2066 pub fn new(wall_time: DateTime<Utc>) -> Self {
2067 Self {
2068 wall_time,
2069 ignore_if_exists_errors: false,
2070 }
2071 }
2072
2073 pub fn zero() -> Self {
2077 PlanContext {
2078 wall_time: now::to_datetime(NOW_ZERO()),
2079 ignore_if_exists_errors: false,
2080 }
2081 }
2082
2083 pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2084 self.ignore_if_exists_errors = value;
2085 self
2086 }
2087}