use std::collections::{BTreeMap, BTreeSet};
use std::num::NonZeroUsize;
use std::str::FromStr;
use std::time::Duration;
use chrono::{DateTime, Utc};
use enum_kinds::EnumKind;
use ipnet::IpNet;
use maplit::btreeset;
use mz_adapter_types::compaction::CompactionWindow;
use mz_controller_types::{ClusterId, ReplicaId};
use mz_expr::{CollectionPlan, ColumnOrder, MirRelationExpr, MirScalarExpr, RowSetFinishing};
use mz_ore::now::{self, NOW_ZERO};
use mz_pgcopy::CopyFormatParams;
use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
use mz_repr::explain::{ExplainConfig, ExplainFormat};
use mz_repr::network_policy_id::NetworkPolicyId;
use mz_repr::optimize::OptimizerFeatureOverrides;
use mz_repr::refresh_schedule::RefreshSchedule;
use mz_repr::role_id::RoleId;
use mz_repr::{
CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, Row, ScalarType, Timestamp,
};
use mz_sql_parser::ast::{
AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName, Value,
WithOptionValue,
};
use mz_ssh_util::keys::SshKeyPair;
use mz_storage_types::connections::aws::AwsConnection;
use mz_storage_types::connections::inline::ReferencedConnection;
use mz_storage_types::connections::{
AwsPrivatelinkConnection, CsrConnection, KafkaConnection, MySqlConnection, PostgresConnection,
SshConnection,
};
use mz_storage_types::sinks::{
S3SinkFormat, SinkEnvelope, SinkPartitionStrategy, StorageSinkConnection,
};
use mz_storage_types::sources::{
SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
};
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};
use crate::ast::{
ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
TransactionAccessMode,
};
use crate::catalog::{
CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
RoleAttributes,
};
use crate::names::{
Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
ResolvedDataType, ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
};
pub(crate) mod error;
pub(crate) mod explain;
pub(crate) mod expr;
pub(crate) mod literal;
pub(crate) mod lowering;
pub(crate) mod notice;
pub(crate) mod plan_utils;
pub(crate) mod query;
pub(crate) mod scope;
pub(crate) mod side_effecting_func;
pub(crate) mod statement;
pub(crate) mod transform_ast;
pub(crate) mod transform_expr;
pub(crate) mod typeconv;
pub(crate) mod with_options;
use crate::plan;
use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
use crate::plan::with_options::OptionalDuration;
pub use error::PlanError;
pub use explain::normalize_subqueries;
pub use expr::{
AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
WindowExprType,
};
pub use lowering::Config as HirToMirConfig;
pub use notice::PlanNotice;
pub use query::{ExprContext, QueryContext, QueryLifetime};
pub use scope::Scope;
pub use side_effecting_func::SideEffectingFunc;
pub use statement::ddl::{
AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
PlannedAlterRoleOption, PlannedRoleVariable,
};
pub use statement::{
describe, plan, plan_copy_from, resolve_cluster_for_materialized_view, StatementClassification,
StatementContext, StatementDesc,
};
use self::statement::ddl::ClusterAlterOptionExtracted;
use self::with_options::TryFromValue;
#[derive(Debug, EnumKind)]
#[enum_kind(PlanKind)]
pub enum Plan {
CreateConnection(CreateConnectionPlan),
CreateDatabase(CreateDatabasePlan),
CreateSchema(CreateSchemaPlan),
CreateRole(CreateRolePlan),
CreateCluster(CreateClusterPlan),
CreateClusterReplica(CreateClusterReplicaPlan),
CreateSource(CreateSourcePlan),
CreateSources(Vec<CreateSourcePlanBundle>),
CreateSecret(CreateSecretPlan),
CreateSink(CreateSinkPlan),
CreateTable(CreateTablePlan),
CreateView(CreateViewPlan),
CreateMaterializedView(CreateMaterializedViewPlan),
CreateContinualTask(CreateContinualTaskPlan),
CreateNetworkPolicy(CreateNetworkPolicyPlan),
CreateIndex(CreateIndexPlan),
CreateType(CreateTypePlan),
Comment(CommentPlan),
DiscardTemp,
DiscardAll,
DropObjects(DropObjectsPlan),
DropOwned(DropOwnedPlan),
EmptyQuery,
ShowAllVariables,
ShowCreate(ShowCreatePlan),
ShowColumns(ShowColumnsPlan),
ShowVariable(ShowVariablePlan),
InspectShard(InspectShardPlan),
SetVariable(SetVariablePlan),
ResetVariable(ResetVariablePlan),
SetTransaction(SetTransactionPlan),
StartTransaction(StartTransactionPlan),
CommitTransaction(CommitTransactionPlan),
AbortTransaction(AbortTransactionPlan),
Select(SelectPlan),
Subscribe(SubscribePlan),
CopyFrom(CopyFromPlan),
CopyTo(CopyToPlan),
ExplainPlan(ExplainPlanPlan),
ExplainPushdown(ExplainPushdownPlan),
ExplainTimestamp(ExplainTimestampPlan),
ExplainSinkSchema(ExplainSinkSchemaPlan),
Insert(InsertPlan),
AlterCluster(AlterClusterPlan),
AlterClusterSwap(AlterClusterSwapPlan),
AlterNoop(AlterNoopPlan),
AlterSetCluster(AlterSetClusterPlan),
AlterConnection(AlterConnectionPlan),
AlterSource(AlterSourcePlan),
AlterClusterRename(AlterClusterRenamePlan),
AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
AlterItemRename(AlterItemRenamePlan),
AlterSchemaRename(AlterSchemaRenamePlan),
AlterSchemaSwap(AlterSchemaSwapPlan),
AlterSecret(AlterSecretPlan),
AlterSink(AlterSinkPlan),
AlterSystemSet(AlterSystemSetPlan),
AlterSystemReset(AlterSystemResetPlan),
AlterSystemResetAll(AlterSystemResetAllPlan),
AlterRole(AlterRolePlan),
AlterOwner(AlterOwnerPlan),
AlterTableAddColumn(AlterTablePlan),
AlterNetworkPolicy(AlterNetworkPolicyPlan),
Declare(DeclarePlan),
Fetch(FetchPlan),
Close(ClosePlan),
ReadThenWrite(ReadThenWritePlan),
Prepare(PreparePlan),
Execute(ExecutePlan),
Deallocate(DeallocatePlan),
Raise(RaisePlan),
GrantRole(GrantRolePlan),
RevokeRole(RevokeRolePlan),
GrantPrivileges(GrantPrivilegesPlan),
RevokePrivileges(RevokePrivilegesPlan),
AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
ReassignOwned(ReassignOwnedPlan),
SideEffectingFunc(SideEffectingFunc),
ValidateConnection(ValidateConnectionPlan),
AlterRetainHistory(AlterRetainHistoryPlan),
}
impl Plan {
pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
match stmt {
StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
StatementKind::AlterObjectRename => &[
PlanKind::AlterClusterRename,
PlanKind::AlterClusterReplicaRename,
PlanKind::AlterItemRename,
PlanKind::AlterSchemaRename,
PlanKind::AlterNoop,
],
StatementKind::AlterObjectSwap => &[
PlanKind::AlterClusterSwap,
PlanKind::AlterSchemaSwap,
PlanKind::AlterNoop,
],
StatementKind::AlterRole => &[PlanKind::AlterRole],
StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
StatementKind::AlterSource => &[
PlanKind::AlterNoop,
PlanKind::AlterSource,
PlanKind::AlterRetainHistory,
],
StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
StatementKind::AlterSystemResetAll => {
&[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
}
StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
StatementKind::AlterTableAddColumn => {
&[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
}
StatementKind::Close => &[PlanKind::Close],
StatementKind::Comment => &[PlanKind::Comment],
StatementKind::Commit => &[PlanKind::CommitTransaction],
StatementKind::Copy => &[
PlanKind::CopyFrom,
PlanKind::Select,
PlanKind::Subscribe,
PlanKind::CopyTo,
],
StatementKind::CreateCluster => &[PlanKind::CreateCluster],
StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
StatementKind::CreateConnection => &[PlanKind::CreateConnection],
StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
StatementKind::CreateIndex => &[PlanKind::CreateIndex],
StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
StatementKind::CreateRole => &[PlanKind::CreateRole],
StatementKind::CreateSchema => &[PlanKind::CreateSchema],
StatementKind::CreateSecret => &[PlanKind::CreateSecret],
StatementKind::CreateSink => &[PlanKind::CreateSink],
StatementKind::CreateSource
| StatementKind::CreateSubsource
| StatementKind::CreateWebhookSource => &[PlanKind::CreateSource],
StatementKind::CreateTable => &[PlanKind::CreateTable],
StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
StatementKind::CreateType => &[PlanKind::CreateType],
StatementKind::CreateView => &[PlanKind::CreateView],
StatementKind::Deallocate => &[PlanKind::Deallocate],
StatementKind::Declare => &[PlanKind::Declare],
StatementKind::Delete => &[PlanKind::ReadThenWrite],
StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
StatementKind::DropObjects => &[PlanKind::DropObjects],
StatementKind::DropOwned => &[PlanKind::DropOwned],
StatementKind::Execute => &[PlanKind::Execute],
StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
StatementKind::Fetch => &[PlanKind::Fetch],
StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
StatementKind::GrantRole => &[PlanKind::GrantRole],
StatementKind::Insert => &[PlanKind::Insert],
StatementKind::Prepare => &[PlanKind::Prepare],
StatementKind::Raise => &[PlanKind::Raise],
StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
StatementKind::ResetVariable => &[PlanKind::ResetVariable],
StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
StatementKind::RevokeRole => &[PlanKind::RevokeRole],
StatementKind::Rollback => &[PlanKind::AbortTransaction],
StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
StatementKind::SetTransaction => &[PlanKind::SetTransaction],
StatementKind::SetVariable => &[PlanKind::SetVariable],
StatementKind::Show => &[
PlanKind::Select,
PlanKind::ShowVariable,
PlanKind::ShowCreate,
PlanKind::ShowColumns,
PlanKind::ShowAllVariables,
PlanKind::InspectShard,
],
StatementKind::StartTransaction => &[PlanKind::StartTransaction],
StatementKind::Subscribe => &[PlanKind::Subscribe],
StatementKind::Update => &[PlanKind::ReadThenWrite],
StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
}
}
pub fn name(&self) -> &str {
match self {
Plan::CreateConnection(_) => "create connection",
Plan::CreateDatabase(_) => "create database",
Plan::CreateSchema(_) => "create schema",
Plan::CreateRole(_) => "create role",
Plan::CreateCluster(_) => "create cluster",
Plan::CreateClusterReplica(_) => "create cluster replica",
Plan::CreateSource(_) => "create source",
Plan::CreateSources(_) => "create source",
Plan::CreateSecret(_) => "create secret",
Plan::CreateSink(_) => "create sink",
Plan::CreateTable(_) => "create table",
Plan::CreateView(_) => "create view",
Plan::CreateMaterializedView(_) => "create materialized view",
Plan::CreateContinualTask(_) => "create continual task",
Plan::CreateIndex(_) => "create index",
Plan::CreateType(_) => "create type",
Plan::CreateNetworkPolicy(_) => "create network policy",
Plan::Comment(_) => "comment",
Plan::DiscardTemp => "discard temp",
Plan::DiscardAll => "discard all",
Plan::DropObjects(plan) => match plan.object_type {
ObjectType::Table => "drop table",
ObjectType::View => "drop view",
ObjectType::MaterializedView => "drop materialized view",
ObjectType::Source => "drop source",
ObjectType::Sink => "drop sink",
ObjectType::Index => "drop index",
ObjectType::Type => "drop type",
ObjectType::Role => "drop roles",
ObjectType::Cluster => "drop clusters",
ObjectType::ClusterReplica => "drop cluster replicas",
ObjectType::Secret => "drop secret",
ObjectType::Connection => "drop connection",
ObjectType::Database => "drop database",
ObjectType::Schema => "drop schema",
ObjectType::Func => "drop function",
ObjectType::ContinualTask => "drop continual task",
ObjectType::NetworkPolicy => "drop network policy",
},
Plan::DropOwned(_) => "drop owned",
Plan::EmptyQuery => "do nothing",
Plan::ShowAllVariables => "show all variables",
Plan::ShowCreate(_) => "show create",
Plan::ShowColumns(_) => "show columns",
Plan::ShowVariable(_) => "show variable",
Plan::InspectShard(_) => "inspect shard",
Plan::SetVariable(_) => "set variable",
Plan::ResetVariable(_) => "reset variable",
Plan::SetTransaction(_) => "set transaction",
Plan::StartTransaction(_) => "start transaction",
Plan::CommitTransaction(_) => "commit",
Plan::AbortTransaction(_) => "abort",
Plan::Select(_) => "select",
Plan::Subscribe(_) => "subscribe",
Plan::CopyFrom(_) => "copy from",
Plan::CopyTo(_) => "copy to",
Plan::ExplainPlan(_) => "explain plan",
Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
Plan::ExplainTimestamp(_) => "explain timestamp",
Plan::ExplainSinkSchema(_) => "explain schema",
Plan::Insert(_) => "insert",
Plan::AlterNoop(plan) => match plan.object_type {
ObjectType::Table => "alter table",
ObjectType::View => "alter view",
ObjectType::MaterializedView => "alter materialized view",
ObjectType::Source => "alter source",
ObjectType::Sink => "alter sink",
ObjectType::Index => "alter index",
ObjectType::Type => "alter type",
ObjectType::Role => "alter role",
ObjectType::Cluster => "alter cluster",
ObjectType::ClusterReplica => "alter cluster replica",
ObjectType::Secret => "alter secret",
ObjectType::Connection => "alter connection",
ObjectType::Database => "alter database",
ObjectType::Schema => "alter schema",
ObjectType::Func => "alter function",
ObjectType::ContinualTask => "alter continual task",
ObjectType::NetworkPolicy => "alter network policy",
},
Plan::AlterCluster(_) => "alter cluster",
Plan::AlterClusterRename(_) => "alter cluster rename",
Plan::AlterClusterSwap(_) => "alter cluster swap",
Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
Plan::AlterSetCluster(_) => "alter set cluster",
Plan::AlterConnection(_) => "alter connection",
Plan::AlterSource(_) => "alter source",
Plan::AlterItemRename(_) => "rename item",
Plan::AlterSchemaRename(_) => "alter rename schema",
Plan::AlterSchemaSwap(_) => "alter swap schema",
Plan::AlterSecret(_) => "alter secret",
Plan::AlterSink(_) => "alter sink",
Plan::AlterSystemSet(_) => "alter system",
Plan::AlterSystemReset(_) => "alter system",
Plan::AlterSystemResetAll(_) => "alter system",
Plan::AlterRole(_) => "alter role",
Plan::AlterNetworkPolicy(_) => "alter network policy",
Plan::AlterOwner(plan) => match plan.object_type {
ObjectType::Table => "alter table owner",
ObjectType::View => "alter view owner",
ObjectType::MaterializedView => "alter materialized view owner",
ObjectType::Source => "alter source owner",
ObjectType::Sink => "alter sink owner",
ObjectType::Index => "alter index owner",
ObjectType::Type => "alter type owner",
ObjectType::Role => "alter role owner",
ObjectType::Cluster => "alter cluster owner",
ObjectType::ClusterReplica => "alter cluster replica owner",
ObjectType::Secret => "alter secret owner",
ObjectType::Connection => "alter connection owner",
ObjectType::Database => "alter database owner",
ObjectType::Schema => "alter schema owner",
ObjectType::Func => "alter function owner",
ObjectType::ContinualTask => "alter continual task owner",
ObjectType::NetworkPolicy => "alter network policy owner",
},
Plan::AlterTableAddColumn(_) => "alter table add column",
Plan::Declare(_) => "declare",
Plan::Fetch(_) => "fetch",
Plan::Close(_) => "close",
Plan::ReadThenWrite(plan) => match plan.kind {
MutationKind::Insert => "insert into select",
MutationKind::Update => "update",
MutationKind::Delete => "delete",
},
Plan::Prepare(_) => "prepare",
Plan::Execute(_) => "execute",
Plan::Deallocate(_) => "deallocate",
Plan::Raise(_) => "raise",
Plan::GrantRole(_) => "grant role",
Plan::RevokeRole(_) => "revoke role",
Plan::GrantPrivileges(_) => "grant privilege",
Plan::RevokePrivileges(_) => "revoke privilege",
Plan::AlterDefaultPrivileges(_) => "alter default privileges",
Plan::ReassignOwned(_) => "reassign owned",
Plan::SideEffectingFunc(_) => "side effecting func",
Plan::ValidateConnection(_) => "validate connection",
Plan::AlterRetainHistory(_) => "alter retain history",
}
}
pub fn allowed_in_read_only(&self) -> bool {
match self {
Plan::SetVariable(_) => true,
Plan::ResetVariable(_) => true,
Plan::SetTransaction(_) => true,
Plan::StartTransaction(_) => true,
Plan::CommitTransaction(_) => true,
Plan::AbortTransaction(_) => true,
Plan::Select(_) => true,
Plan::EmptyQuery => true,
Plan::ShowAllVariables => true,
Plan::ShowCreate(_) => true,
Plan::ShowColumns(_) => true,
Plan::ShowVariable(_) => true,
Plan::InspectShard(_) => true,
Plan::Subscribe(_) => true,
Plan::CopyTo(_) => true,
Plan::ExplainPlan(_) => true,
Plan::ExplainPushdown(_) => true,
Plan::ExplainTimestamp(_) => true,
Plan::ExplainSinkSchema(_) => true,
Plan::ValidateConnection(_) => true,
_ => false,
}
}
}
#[derive(Debug)]
pub struct StartTransactionPlan {
pub access: Option<TransactionAccessMode>,
pub isolation_level: Option<TransactionIsolationLevel>,
}
#[derive(Debug)]
pub enum TransactionType {
Explicit,
Implicit,
}
impl TransactionType {
pub fn is_explicit(&self) -> bool {
matches!(self, TransactionType::Explicit)
}
pub fn is_implicit(&self) -> bool {
matches!(self, TransactionType::Implicit)
}
}
#[derive(Debug)]
pub struct CommitTransactionPlan {
pub transaction_type: TransactionType,
}
#[derive(Debug)]
pub struct AbortTransactionPlan {
pub transaction_type: TransactionType,
}
#[derive(Debug)]
pub struct CreateDatabasePlan {
pub name: String,
pub if_not_exists: bool,
}
#[derive(Debug)]
pub struct CreateSchemaPlan {
pub database_spec: ResolvedDatabaseSpecifier,
pub schema_name: String,
pub if_not_exists: bool,
}
#[derive(Debug)]
pub struct CreateRolePlan {
pub name: String,
pub attributes: RoleAttributes,
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct CreateClusterPlan {
pub name: String,
pub variant: CreateClusterVariant,
pub workload_class: Option<String>,
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum CreateClusterVariant {
Managed(CreateClusterManagedPlan),
Unmanaged(CreateClusterUnmanagedPlan),
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct CreateClusterUnmanagedPlan {
pub replicas: Vec<(String, ReplicaConfig)>,
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct CreateClusterManagedPlan {
pub replication_factor: u32,
pub size: String,
pub availability_zones: Vec<String>,
pub compute: ComputeReplicaConfig,
pub disk: bool,
pub optimizer_feature_overrides: OptimizerFeatureOverrides,
pub schedule: ClusterSchedule,
}
#[derive(Debug)]
pub struct CreateClusterReplicaPlan {
pub cluster_id: ClusterId,
pub name: String,
pub config: ReplicaConfig,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq)]
pub struct ComputeReplicaIntrospectionConfig {
pub debugging: bool,
pub interval: Duration,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct ComputeReplicaConfig {
pub introspection: Option<ComputeReplicaIntrospectionConfig>,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum ReplicaConfig {
Unorchestrated {
storagectl_addrs: Vec<String>,
storage_addrs: Vec<String>,
computectl_addrs: Vec<String>,
compute_addrs: Vec<String>,
workers: usize,
compute: ComputeReplicaConfig,
},
Orchestrated {
size: String,
availability_zone: Option<String>,
compute: ComputeReplicaConfig,
disk: bool,
internal: bool,
billed_as: Option<String>,
},
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
pub enum ClusterSchedule {
Manual,
Refresh { hydration_time_estimate: Duration },
}
impl Default for ClusterSchedule {
fn default() -> Self {
ClusterSchedule::Manual
}
}
#[derive(Debug)]
pub struct CreateSourcePlan {
pub name: QualifiedItemName,
pub source: Source,
pub if_not_exists: bool,
pub timeline: Timeline,
pub in_cluster: Option<ClusterId>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct SourceReferences {
pub updated_at: u64,
pub references: Vec<SourceReference>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct SourceReference {
pub name: String,
pub namespace: Option<String>,
pub columns: Vec<String>,
}
#[derive(Debug)]
pub struct CreateSourcePlanBundle {
pub item_id: CatalogItemId,
pub global_id: GlobalId,
pub plan: CreateSourcePlan,
pub resolved_ids: ResolvedIds,
pub available_source_references: Option<SourceReferences>,
}
#[derive(Debug)]
pub struct CreateConnectionPlan {
pub name: QualifiedItemName,
pub if_not_exists: bool,
pub connection: Connection,
pub validate: bool,
}
#[derive(Debug)]
pub struct ValidateConnectionPlan {
pub id: CatalogItemId,
pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
}
#[derive(Debug)]
pub struct CreateSecretPlan {
pub name: QualifiedItemName,
pub secret: Secret,
pub if_not_exists: bool,
}
#[derive(Debug)]
pub struct CreateSinkPlan {
pub name: QualifiedItemName,
pub sink: Sink,
pub with_snapshot: bool,
pub if_not_exists: bool,
pub in_cluster: ClusterId,
}
#[derive(Debug)]
pub struct CreateTablePlan {
pub name: QualifiedItemName,
pub table: Table,
pub if_not_exists: bool,
}
#[derive(Debug, Clone)]
pub struct CreateViewPlan {
pub name: QualifiedItemName,
pub view: View,
pub replace: Option<CatalogItemId>,
pub drop_ids: Vec<CatalogItemId>,
pub if_not_exists: bool,
pub ambiguous_columns: bool,
}
#[derive(Debug, Clone)]
pub struct CreateMaterializedViewPlan {
pub name: QualifiedItemName,
pub materialized_view: MaterializedView,
pub replace: Option<CatalogItemId>,
pub drop_ids: Vec<CatalogItemId>,
pub if_not_exists: bool,
pub ambiguous_columns: bool,
}
#[derive(Debug, Clone)]
pub struct CreateContinualTaskPlan {
pub name: QualifiedItemName,
pub placeholder_id: Option<mz_expr::LocalId>,
pub desc: RelationDesc,
pub input_id: GlobalId,
pub with_snapshot: bool,
pub continual_task: MaterializedView,
}
#[derive(Debug, Clone)]
pub struct CreateNetworkPolicyPlan {
pub name: String,
pub rules: Vec<NetworkPolicyRule>,
}
#[derive(Debug, Clone)]
pub struct AlterNetworkPolicyPlan {
pub id: NetworkPolicyId,
pub name: String,
pub rules: Vec<NetworkPolicyRule>,
}
#[derive(Debug, Clone)]
pub struct CreateIndexPlan {
pub name: QualifiedItemName,
pub index: Index,
pub if_not_exists: bool,
}
#[derive(Debug)]
pub struct CreateTypePlan {
pub name: QualifiedItemName,
pub typ: Type,
}
#[derive(Debug)]
pub struct DropObjectsPlan {
pub referenced_ids: Vec<ObjectId>,
pub drop_ids: Vec<ObjectId>,
pub object_type: ObjectType,
}
#[derive(Debug)]
pub struct DropOwnedPlan {
pub role_ids: Vec<RoleId>,
pub drop_ids: Vec<ObjectId>,
pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
}
#[derive(Debug)]
pub struct ShowVariablePlan {
pub name: String,
}
#[derive(Debug)]
pub struct InspectShardPlan {
pub id: GlobalId,
}
#[derive(Debug)]
pub struct SetVariablePlan {
pub name: String,
pub value: VariableValue,
pub local: bool,
}
#[derive(Debug)]
pub enum VariableValue {
Default,
Values(Vec<String>),
}
#[derive(Debug)]
pub struct ResetVariablePlan {
pub name: String,
}
#[derive(Debug)]
pub struct SetTransactionPlan {
pub local: bool,
pub modes: Vec<TransactionMode>,
}
#[derive(Clone, Debug)]
pub struct SelectPlan {
pub select: Option<Box<SelectStatement<Aug>>>,
pub source: HirRelationExpr,
pub when: QueryWhen,
pub finishing: RowSetFinishing,
pub copy_to: Option<CopyFormat>,
}
#[derive(Debug)]
pub enum SubscribeOutput {
Diffs,
WithinTimestampOrderBy {
order_by: Vec<ColumnOrder>,
},
EnvelopeUpsert {
order_by_keys: Vec<ColumnOrder>,
},
EnvelopeDebezium {
order_by_keys: Vec<ColumnOrder>,
},
}
#[derive(Debug)]
pub struct SubscribePlan {
pub from: SubscribeFrom,
pub with_snapshot: bool,
pub when: QueryWhen,
pub up_to: Option<MirScalarExpr>,
pub copy_to: Option<CopyFormat>,
pub emit_progress: bool,
pub output: SubscribeOutput,
}
#[derive(Debug, Clone)]
pub enum SubscribeFrom {
Id(GlobalId),
Query {
expr: MirRelationExpr,
desc: RelationDesc,
},
}
impl SubscribeFrom {
pub fn depends_on(&self) -> BTreeSet<GlobalId> {
match self {
SubscribeFrom::Id(id) => BTreeSet::from([*id]),
SubscribeFrom::Query { expr, .. } => expr.depends_on(),
}
}
pub fn contains_temporal(&self) -> bool {
match self {
SubscribeFrom::Id(_) => false,
SubscribeFrom::Query { expr, .. } => expr.contains_temporal(),
}
}
}
#[derive(Debug)]
pub struct ShowCreatePlan {
pub id: ObjectId,
pub row: Row,
}
#[derive(Debug)]
pub struct ShowColumnsPlan {
pub id: CatalogItemId,
pub select_plan: SelectPlan,
pub new_resolved_ids: ResolvedIds,
}
#[derive(Debug)]
pub struct CopyFromPlan {
pub id: CatalogItemId,
pub columns: Vec<usize>,
pub params: CopyFormatParams<'static>,
}
#[derive(Debug, Clone)]
pub struct CopyToPlan {
pub select_plan: SelectPlan,
pub desc: RelationDesc,
pub to: HirScalarExpr,
pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
pub connection_id: CatalogItemId,
pub format: S3SinkFormat,
pub max_file_size: u64,
}
#[derive(Clone, Debug)]
pub struct ExplainPlanPlan {
pub stage: ExplainStage,
pub format: ExplainFormat,
pub config: ExplainConfig,
pub explainee: Explainee,
}
#[derive(Clone, Debug)]
pub enum Explainee {
View(CatalogItemId),
MaterializedView(CatalogItemId),
Index(CatalogItemId),
ReplanView(CatalogItemId),
ReplanMaterializedView(CatalogItemId),
ReplanIndex(CatalogItemId),
Statement(ExplaineeStatement),
}
#[derive(Clone, Debug, EnumKind)]
#[enum_kind(ExplaineeStatementKind)]
pub enum ExplaineeStatement {
Select {
broken: bool,
plan: plan::SelectPlan,
desc: RelationDesc,
},
CreateView {
broken: bool,
plan: plan::CreateViewPlan,
},
CreateMaterializedView {
broken: bool,
plan: plan::CreateMaterializedViewPlan,
},
CreateIndex {
broken: bool,
plan: plan::CreateIndexPlan,
},
}
impl ExplaineeStatement {
pub fn depends_on(&self) -> BTreeSet<GlobalId> {
match self {
Self::Select { plan, .. } => plan.source.depends_on(),
Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
}
}
pub fn broken(&self) -> bool {
match self {
Self::Select { broken, .. } => *broken,
Self::CreateView { broken, .. } => *broken,
Self::CreateMaterializedView { broken, .. } => *broken,
Self::CreateIndex { broken, .. } => *broken,
}
}
}
impl ExplaineeStatementKind {
pub fn supports(&self, stage: &ExplainStage) -> bool {
use ExplainStage::*;
match self {
Self::Select => true,
Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
Self::CreateMaterializedView => true,
Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
}
}
}
impl std::fmt::Display for ExplaineeStatementKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Select => write!(f, "SELECT"),
Self::CreateView => write!(f, "CREATE VIEW"),
Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
Self::CreateIndex => write!(f, "CREATE INDEX"),
}
}
}
#[derive(Clone, Debug)]
pub struct ExplainPushdownPlan {
pub explainee: Explainee,
}
#[derive(Clone, Debug)]
pub struct ExplainTimestampPlan {
pub format: ExplainFormat,
pub raw_plan: HirRelationExpr,
pub when: QueryWhen,
}
#[derive(Debug)]
pub struct ExplainSinkSchemaPlan {
pub sink_from: GlobalId,
pub json_schema: String,
}
#[derive(Debug)]
pub struct SendDiffsPlan {
pub id: CatalogItemId,
pub updates: Vec<(Row, Diff)>,
pub kind: MutationKind,
pub returning: Vec<(Row, NonZeroUsize)>,
pub max_result_size: u64,
}
#[derive(Debug)]
pub struct InsertPlan {
pub id: CatalogItemId,
pub values: HirRelationExpr,
pub returning: Vec<mz_expr::MirScalarExpr>,
}
#[derive(Debug)]
pub struct ReadThenWritePlan {
pub id: CatalogItemId,
pub selection: HirRelationExpr,
pub finishing: RowSetFinishing,
pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
pub kind: MutationKind,
pub returning: Vec<mz_expr::MirScalarExpr>,
}
#[derive(Debug)]
pub struct AlterNoopPlan {
pub object_type: ObjectType,
}
#[derive(Debug)]
pub struct AlterSetClusterPlan {
pub id: CatalogItemId,
pub set_cluster: ClusterId,
}
#[derive(Debug)]
pub struct AlterRetainHistoryPlan {
pub id: CatalogItemId,
pub value: Option<Value>,
pub window: CompactionWindow,
pub object_type: ObjectType,
}
#[derive(Debug, Clone)]
pub enum AlterOptionParameter<T = String> {
Set(T),
Reset,
Unchanged,
}
#[derive(Debug)]
pub enum AlterConnectionAction {
RotateKeys,
AlterOptions {
set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
drop_options: BTreeSet<ConnectionOptionName>,
validate: bool,
},
}
#[derive(Debug)]
pub struct AlterConnectionPlan {
pub id: CatalogItemId,
pub action: AlterConnectionAction,
}
#[derive(Debug)]
pub enum AlterSourceAction {
AddSubsourceExports {
subsources: Vec<CreateSourcePlanBundle>,
options: Vec<AlterSourceAddSubsourceOption<Aug>>,
},
RefreshReferences {
references: SourceReferences,
},
}
#[derive(Debug)]
pub struct AlterSourcePlan {
pub item_id: CatalogItemId,
pub ingestion_id: GlobalId,
pub action: AlterSourceAction,
}
#[derive(Debug, Clone)]
pub struct AlterSinkPlan {
pub item_id: CatalogItemId,
pub global_id: GlobalId,
pub sink: Sink,
pub with_snapshot: bool,
pub in_cluster: ClusterId,
}
#[derive(Debug, Clone)]
pub struct AlterClusterPlan {
pub id: ClusterId,
pub name: String,
pub options: PlanClusterOption,
pub strategy: AlterClusterPlanStrategy,
}
#[derive(Debug)]
pub struct AlterClusterRenamePlan {
pub id: ClusterId,
pub name: String,
pub to_name: String,
}
#[derive(Debug)]
pub struct AlterClusterReplicaRenamePlan {
pub cluster_id: ClusterId,
pub replica_id: ReplicaId,
pub name: QualifiedReplica,
pub to_name: String,
}
#[derive(Debug)]
pub struct AlterItemRenamePlan {
pub id: CatalogItemId,
pub current_full_name: FullItemName,
pub to_name: String,
pub object_type: ObjectType,
}
#[derive(Debug)]
pub struct AlterSchemaRenamePlan {
pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
pub new_schema_name: String,
}
#[derive(Debug)]
pub struct AlterSchemaSwapPlan {
pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
pub schema_a_name: String,
pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
pub schema_b_name: String,
pub name_temp: String,
}
#[derive(Debug)]
pub struct AlterClusterSwapPlan {
pub id_a: ClusterId,
pub id_b: ClusterId,
pub name_a: String,
pub name_b: String,
pub name_temp: String,
}
#[derive(Debug)]
pub struct AlterSecretPlan {
pub id: CatalogItemId,
pub secret_as: MirScalarExpr,
}
#[derive(Debug)]
pub struct AlterSystemSetPlan {
pub name: String,
pub value: VariableValue,
}
#[derive(Debug)]
pub struct AlterSystemResetPlan {
pub name: String,
}
#[derive(Debug)]
pub struct AlterSystemResetAllPlan {}
#[derive(Debug)]
pub struct AlterRolePlan {
pub id: RoleId,
pub name: String,
pub option: PlannedAlterRoleOption,
}
#[derive(Debug)]
pub struct AlterOwnerPlan {
pub id: ObjectId,
pub object_type: ObjectType,
pub new_owner: RoleId,
}
#[derive(Debug)]
pub struct AlterTablePlan {
pub relation_id: CatalogItemId,
pub column_name: ColumnName,
pub column_type: ResolvedDataType,
}
#[derive(Debug)]
pub struct DeclarePlan {
pub name: String,
pub stmt: Statement<Raw>,
pub sql: String,
pub params: Params,
}
#[derive(Debug)]
pub struct FetchPlan {
pub name: String,
pub count: Option<FetchDirection>,
pub timeout: ExecuteTimeout,
}
#[derive(Debug)]
pub struct ClosePlan {
pub name: String,
}
#[derive(Debug)]
pub struct PreparePlan {
pub name: String,
pub stmt: Statement<Raw>,
pub sql: String,
pub desc: StatementDesc,
}
#[derive(Debug)]
pub struct ExecutePlan {
pub name: String,
pub params: Params,
}
#[derive(Debug)]
pub struct DeallocatePlan {
pub name: Option<String>,
}
#[derive(Debug)]
pub struct RaisePlan {
pub severity: NoticeSeverity,
}
#[derive(Debug)]
pub struct GrantRolePlan {
pub role_ids: Vec<RoleId>,
pub member_ids: Vec<RoleId>,
pub grantor_id: RoleId,
}
#[derive(Debug)]
pub struct RevokeRolePlan {
pub role_ids: Vec<RoleId>,
pub member_ids: Vec<RoleId>,
pub grantor_id: RoleId,
}
#[derive(Debug)]
pub struct UpdatePrivilege {
pub acl_mode: AclMode,
pub target_id: SystemObjectId,
pub grantor: RoleId,
}
#[derive(Debug)]
pub struct GrantPrivilegesPlan {
pub update_privileges: Vec<UpdatePrivilege>,
pub grantees: Vec<RoleId>,
}
#[derive(Debug)]
pub struct RevokePrivilegesPlan {
pub update_privileges: Vec<UpdatePrivilege>,
pub revokees: Vec<RoleId>,
}
#[derive(Debug)]
pub struct AlterDefaultPrivilegesPlan {
pub privilege_objects: Vec<DefaultPrivilegeObject>,
pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
pub is_grant: bool,
}
#[derive(Debug)]
pub struct ReassignOwnedPlan {
pub old_roles: Vec<RoleId>,
pub new_role: RoleId,
pub reassign_ids: Vec<ObjectId>,
}
#[derive(Debug)]
pub struct CommentPlan {
pub object_id: CommentObjectId,
pub sub_component: Option<usize>,
pub comment: Option<String>,
}
#[derive(Clone, Debug)]
pub enum TableDataSource {
TableWrites { defaults: Vec<Expr<Aug>> },
DataSource {
desc: DataSourceDesc,
timeline: Timeline,
},
}
#[derive(Clone, Debug)]
pub struct Table {
pub create_sql: String,
pub desc: RelationDesc,
pub temporary: bool,
pub compaction_window: Option<CompactionWindow>,
pub data_source: TableDataSource,
}
#[derive(Clone, Debug)]
pub struct Source {
pub create_sql: String,
pub data_source: DataSourceDesc,
pub desc: RelationDesc,
pub compaction_window: Option<CompactionWindow>,
}
#[derive(Debug, Clone)]
pub enum DataSourceDesc {
Ingestion(Ingestion),
IngestionExport {
ingestion_id: CatalogItemId,
external_reference: UnresolvedItemName,
details: SourceExportDetails,
data_config: SourceExportDataConfig<ReferencedConnection>,
},
Progress,
Webhook {
validate_using: Option<WebhookValidation>,
body_format: WebhookBodyFormat,
headers: WebhookHeaders,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Ingestion {
pub desc: SourceDesc<ReferencedConnection>,
pub progress_subsource: CatalogItemId,
}
#[derive(Clone, Debug, Serialize)]
pub struct WebhookValidation {
pub expression: MirScalarExpr,
pub relation_desc: RelationDesc,
pub bodies: Vec<(usize, bool)>,
pub headers: Vec<(usize, bool)>,
pub secrets: Vec<WebhookValidationSecret>,
}
impl WebhookValidation {
const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
let WebhookValidation {
expression,
relation_desc,
..
} = self;
let mut expression_ = expression.clone();
let desc_ = relation_desc.clone();
let reduce_task = mz_ore::task::spawn_blocking(
|| "webhook-validation-reduce",
move || {
expression_.reduce(&desc_.typ().column_types);
expression_
},
);
match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
Ok(Ok(reduced_expr)) => {
*expression = reduced_expr;
Ok(())
}
Ok(Err(_)) => Err("joining task"),
Err(_) => Err("timeout"),
}
}
}
#[derive(Clone, Debug, Default, Serialize)]
pub struct WebhookHeaders {
pub header_column: Option<WebhookHeaderFilters>,
pub mapped_headers: BTreeMap<usize, (String, bool)>,
}
impl WebhookHeaders {
pub fn num_columns(&self) -> usize {
let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
let mapped_headers = self.mapped_headers.len();
header_column + mapped_headers
}
}
#[derive(Clone, Debug, Default, Serialize)]
pub struct WebhookHeaderFilters {
pub block: BTreeSet<String>,
pub allow: BTreeSet<String>,
}
#[derive(Copy, Clone, Debug, Serialize, Arbitrary)]
pub enum WebhookBodyFormat {
Json { array: bool },
Bytes,
Text,
}
impl From<WebhookBodyFormat> for ScalarType {
fn from(value: WebhookBodyFormat) -> Self {
match value {
WebhookBodyFormat::Json { .. } => ScalarType::Jsonb,
WebhookBodyFormat::Bytes => ScalarType::Bytes,
WebhookBodyFormat::Text => ScalarType::String,
}
}
}
#[derive(Clone, Debug, Serialize)]
pub struct WebhookValidationSecret {
pub id: CatalogItemId,
pub column_idx: usize,
pub use_bytes: bool,
}
#[derive(Clone, Debug)]
pub struct Connection {
pub create_sql: String,
pub details: ConnectionDetails,
}
#[derive(Clone, Debug, Serialize)]
pub enum ConnectionDetails {
Kafka(KafkaConnection<ReferencedConnection>),
Csr(CsrConnection<ReferencedConnection>),
Postgres(PostgresConnection<ReferencedConnection>),
Ssh {
connection: SshConnection,
key_1: SshKey,
key_2: SshKey,
},
Aws(AwsConnection),
AwsPrivatelink(AwsPrivatelinkConnection),
MySql(MySqlConnection<ReferencedConnection>),
}
impl ConnectionDetails {
pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
match self {
ConnectionDetails::Kafka(c) => {
mz_storage_types::connections::Connection::Kafka(c.clone())
}
ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
ConnectionDetails::Postgres(c) => {
mz_storage_types::connections::Connection::Postgres(c.clone())
}
ConnectionDetails::Ssh { connection, .. } => {
mz_storage_types::connections::Connection::Ssh(connection.clone())
}
ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
ConnectionDetails::AwsPrivatelink(c) => {
mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
}
ConnectionDetails::MySql(c) => {
mz_storage_types::connections::Connection::MySql(c.clone())
}
}
}
}
#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
pub struct NetworkPolicyRule {
pub name: String,
pub action: NetworkPolicyRuleAction,
pub address: PolicyAddress,
pub direction: NetworkPolicyRuleDirection,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
pub enum NetworkPolicyRuleAction {
Allow,
}
impl std::fmt::Display for NetworkPolicyRuleAction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Allow => write!(f, "allow"),
}
}
}
impl TryFrom<&str> for NetworkPolicyRuleAction {
type Error = PlanError;
fn try_from(value: &str) -> Result<Self, Self::Error> {
match value.to_uppercase().as_str() {
"ALLOW" => Ok(Self::Allow),
_ => Err(PlanError::Unstructured(
"Allow is the only valid option".into(),
)),
}
}
}
#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
pub enum NetworkPolicyRuleDirection {
Ingress,
}
impl std::fmt::Display for NetworkPolicyRuleDirection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Ingress => write!(f, "ingress"),
}
}
}
impl TryFrom<&str> for NetworkPolicyRuleDirection {
type Error = PlanError;
fn try_from(value: &str) -> Result<Self, Self::Error> {
match value.to_uppercase().as_str() {
"INGRESS" => Ok(Self::Ingress),
_ => Err(PlanError::Unstructured(
"Ingress is the only valid option".into(),
)),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
pub struct PolicyAddress(pub IpNet);
impl std::fmt::Display for PolicyAddress {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", &self.0.to_string())
}
}
impl From<String> for PolicyAddress {
fn from(value: String) -> Self {
Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
}
}
impl TryFrom<&str> for PolicyAddress {
type Error = PlanError;
fn try_from(value: &str) -> Result<Self, Self::Error> {
let net = IpNet::from_str(value)
.map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
Ok(Self(net))
}
}
impl Serialize for PolicyAddress {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&format!("{}", &self.0))
}
}
#[derive(Clone, Debug, Serialize)]
pub enum SshKey {
PublicOnly(String),
Both(SshKeyPair),
}
impl SshKey {
pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
match self {
SshKey::PublicOnly(_) => None,
SshKey::Both(key_pair) => Some(key_pair),
}
}
pub fn public_key(&self) -> String {
match self {
SshKey::PublicOnly(s) => s.into(),
SshKey::Both(p) => p.ssh_public_key(),
}
}
}
#[derive(Clone, Debug)]
pub struct Secret {
pub create_sql: String,
pub secret_as: MirScalarExpr,
}
#[derive(Clone, Debug)]
pub struct Sink {
pub create_sql: String,
pub from: GlobalId,
pub connection: StorageSinkConnection<ReferencedConnection>,
pub partition_strategy: SinkPartitionStrategy,
pub envelope: SinkEnvelope,
pub version: u64,
}
#[derive(Clone, Debug)]
pub struct View {
pub create_sql: String,
pub expr: HirRelationExpr,
pub dependencies: DependencyIds,
pub column_names: Vec<ColumnName>,
pub temporary: bool,
}
#[derive(Clone, Debug)]
pub struct MaterializedView {
pub create_sql: String,
pub expr: HirRelationExpr,
pub dependencies: DependencyIds,
pub column_names: Vec<ColumnName>,
pub cluster_id: ClusterId,
pub non_null_assertions: Vec<usize>,
pub compaction_window: Option<CompactionWindow>,
pub refresh_schedule: Option<RefreshSchedule>,
pub as_of: Option<Timestamp>,
}
#[derive(Clone, Debug)]
pub struct Index {
pub create_sql: String,
pub on: GlobalId,
pub keys: Vec<mz_expr::MirScalarExpr>,
pub compaction_window: Option<CompactionWindow>,
pub cluster_id: ClusterId,
}
#[derive(Clone, Debug)]
pub struct Type {
pub create_sql: String,
pub inner: CatalogType<IdReference>,
}
#[derive(Deserialize, Clone, Debug, PartialEq)]
pub enum QueryWhen {
Immediately,
FreshestTableWrite,
AtTimestamp(MirScalarExpr),
AtLeastTimestamp(MirScalarExpr),
}
impl QueryWhen {
pub fn advance_to_timestamp(&self) -> Option<MirScalarExpr> {
match self {
QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
}
}
pub fn advance_to_since(&self) -> bool {
match self {
QueryWhen::Immediately
| QueryWhen::AtLeastTimestamp(_)
| QueryWhen::FreshestTableWrite => true,
QueryWhen::AtTimestamp(_) => false,
}
}
pub fn can_advance_to_upper(&self) -> bool {
match self {
QueryWhen::Immediately => true,
QueryWhen::FreshestTableWrite
| QueryWhen::AtTimestamp(_)
| QueryWhen::AtLeastTimestamp(_) => false,
}
}
pub fn can_advance_to_timeline_ts(&self) -> bool {
match self {
QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
}
}
pub fn must_advance_to_timeline_ts(&self) -> bool {
match self {
QueryWhen::FreshestTableWrite => true,
QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
false
}
}
}
pub fn is_transactional(&self) -> bool {
match self {
QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
}
}
}
#[derive(Debug, Copy, Clone)]
pub enum MutationKind {
Insert,
Update,
Delete,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum CopyFormat {
Text,
Csv,
Binary,
Parquet,
}
#[derive(Debug, Copy, Clone)]
pub enum ExecuteTimeout {
None,
Seconds(f64),
WaitOnce,
}
#[derive(Clone, Debug)]
pub enum IndexOption {
RetainHistory(CompactionWindow),
}
#[derive(Clone, Debug)]
pub enum TableOption {
RetainHistory(CompactionWindow),
}
#[derive(Clone, Debug)]
pub struct PlanClusterOption {
pub availability_zones: AlterOptionParameter<Vec<String>>,
pub introspection_debugging: AlterOptionParameter<bool>,
pub introspection_interval: AlterOptionParameter<OptionalDuration>,
pub managed: AlterOptionParameter<bool>,
pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
pub replication_factor: AlterOptionParameter<u32>,
pub size: AlterOptionParameter,
pub disk: AlterOptionParameter<bool>,
pub schedule: AlterOptionParameter<ClusterSchedule>,
pub workload_class: AlterOptionParameter<Option<String>>,
}
impl Default for PlanClusterOption {
fn default() -> Self {
Self {
availability_zones: AlterOptionParameter::Unchanged,
introspection_debugging: AlterOptionParameter::Unchanged,
introspection_interval: AlterOptionParameter::Unchanged,
managed: AlterOptionParameter::Unchanged,
replicas: AlterOptionParameter::Unchanged,
replication_factor: AlterOptionParameter::Unchanged,
size: AlterOptionParameter::Unchanged,
disk: AlterOptionParameter::Unchanged,
schedule: AlterOptionParameter::Unchanged,
workload_class: AlterOptionParameter::Unchanged,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AlterClusterPlanStrategy {
None,
For(Duration),
UntilReady {
on_timeout: OnTimeoutAction,
timeout: Duration,
},
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum OnTimeoutAction {
Commit,
Rollback,
}
impl Default for OnTimeoutAction {
fn default() -> Self {
Self::Commit
}
}
impl TryFrom<&str> for OnTimeoutAction {
type Error = PlanError;
fn try_from(value: &str) -> Result<Self, Self::Error> {
match value.to_uppercase().as_str() {
"COMMIT" => Ok(Self::Commit),
"ROLLBACK" => Ok(Self::Rollback),
_ => Err(PlanError::Unstructured(
"Valid options are COMMIT, ROLLBACK".into(),
)),
}
}
}
impl AlterClusterPlanStrategy {
pub fn is_none(&self) -> bool {
matches!(self, Self::None)
}
}
impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
type Error = PlanError;
fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
Ok(match value.wait {
Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
Some(ClusterAlterOptionValue::UntilReady(options)) => {
let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
Self::UntilReady {
timeout: match extracted.timeout {
Some(d) => d,
None => Err(PlanError::UntilReadyTimeoutRequired)?,
},
on_timeout: match extracted.on_timeout {
Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
PlanError::InvalidOptionValue {
option_name: "ON TIMEOUT".into(),
err: Box::new(e),
}
})?,
None => OnTimeoutAction::default(),
},
}
}
None => Self::None,
})
}
}
#[derive(Debug, Clone)]
pub struct Params {
pub datums: Row,
pub types: Vec<ScalarType>,
}
impl Params {
pub fn empty() -> Params {
Params {
datums: Row::pack_slice(&[]),
types: vec![],
}
}
}
#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, Copy)]
pub struct PlanContext {
pub wall_time: DateTime<Utc>,
pub ignore_if_exists_errors: bool,
}
impl PlanContext {
pub fn new(wall_time: DateTime<Utc>) -> Self {
Self {
wall_time,
ignore_if_exists_errors: false,
}
}
pub fn zero() -> Self {
PlanContext {
wall_time: now::to_datetime(NOW_ZERO()),
ignore_if_exists_errors: false,
}
}
pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
self.ignore_if_exists_errors = value;
self
}
}