use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet};
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, LazyLock};
use chrono::{DateTime, Utc};
use mz_adapter_types::compaction::CompactionWindow;
use mz_adapter_types::connection::ConnectionId;
use mz_compute_client::logging::LogVariant;
use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
use mz_controller_types::{ClusterId, ReplicaId};
use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
use mz_ore::collections::CollectionExt;
use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
use mz_repr::network_policy_id::NetworkPolicyId;
use mz_repr::optimize::OptimizerFeatureOverrides;
use mz_repr::refresh_schedule::RefreshSchedule;
use mz_repr::role_id::RoleId;
use mz_repr::{
CatalogItemId, Diff, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
Timestamp,
};
use mz_sql::ast::display::AstDisplay;
use mz_sql::ast::{Expr, Raw, Statement, UnresolvedItemName, Value, WithOptionValue};
use mz_sql::catalog::{
CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogTypeDetails,
DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, RoleAttributes, RoleMembership,
RoleVars, SystemObjectType,
};
use mz_sql::names::{
Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
};
use mz_sql::plan::{
ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
HirRelationExpr, Ingestion as PlanIngestion, NetworkPolicyRule, PlanError, WebhookBodyFormat,
WebhookHeaders, WebhookValidation,
};
use mz_sql::rbac;
use mz_sql::session::vars::OwnedVarInput;
use mz_storage_client::controller::IntrospectionType;
use mz_storage_types::connections::inline::ReferencedConnection;
use mz_storage_types::sinks::{SinkEnvelope, SinkPartitionStrategy, StorageSinkConnection};
use mz_storage_types::sources::{
GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
SourceExportDetails, Timeline,
};
use serde::ser::SerializeSeq;
use serde::{Deserialize, Serialize};
use timely::progress::Antichain;
use tracing::debug;
use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
use crate::durable;
pub trait UpdateFrom<T>: From<T> {
fn update_from(&mut self, from: T);
}
#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
pub struct Database {
pub name: String,
pub id: DatabaseId,
pub oid: u32,
#[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
pub schemas_by_id: BTreeMap<SchemaId, Schema>,
pub schemas_by_name: BTreeMap<String, SchemaId>,
pub owner_id: RoleId,
pub privileges: PrivilegeMap,
}
impl From<Database> for durable::Database {
fn from(database: Database) -> durable::Database {
durable::Database {
id: database.id,
oid: database.oid,
name: database.name,
owner_id: database.owner_id,
privileges: database.privileges.into_all_values().collect(),
}
}
}
impl From<durable::Database> for Database {
fn from(
durable::Database {
id,
oid,
name,
owner_id,
privileges,
}: durable::Database,
) -> Database {
Database {
id,
oid,
schemas_by_id: BTreeMap::new(),
schemas_by_name: BTreeMap::new(),
name,
owner_id,
privileges: PrivilegeMap::from_mz_acl_items(privileges),
}
}
}
impl UpdateFrom<durable::Database> for Database {
fn update_from(
&mut self,
durable::Database {
id,
oid,
name,
owner_id,
privileges,
}: durable::Database,
) {
self.id = id;
self.oid = oid;
self.name = name;
self.owner_id = owner_id;
self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
}
}
#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
pub struct Schema {
pub name: QualifiedSchemaName,
pub id: SchemaSpecifier,
pub oid: u32,
pub items: BTreeMap<String, CatalogItemId>,
pub functions: BTreeMap<String, CatalogItemId>,
pub types: BTreeMap<String, CatalogItemId>,
pub owner_id: RoleId,
pub privileges: PrivilegeMap,
}
impl From<Schema> for durable::Schema {
fn from(schema: Schema) -> durable::Schema {
durable::Schema {
id: schema.id.into(),
oid: schema.oid,
name: schema.name.schema,
database_id: schema.name.database.id(),
owner_id: schema.owner_id,
privileges: schema.privileges.into_all_values().collect(),
}
}
}
impl From<durable::Schema> for Schema {
fn from(
durable::Schema {
id,
oid,
name,
database_id,
owner_id,
privileges,
}: durable::Schema,
) -> Schema {
Schema {
name: QualifiedSchemaName {
database: database_id.into(),
schema: name,
},
id: id.into(),
oid,
items: BTreeMap::new(),
functions: BTreeMap::new(),
types: BTreeMap::new(),
owner_id,
privileges: PrivilegeMap::from_mz_acl_items(privileges),
}
}
}
impl UpdateFrom<durable::Schema> for Schema {
fn update_from(
&mut self,
durable::Schema {
id,
oid,
name,
database_id,
owner_id,
privileges,
}: durable::Schema,
) {
self.name = QualifiedSchemaName {
database: database_id.into(),
schema: name,
};
self.id = id.into();
self.oid = oid;
self.owner_id = owner_id;
self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
}
}
#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
pub struct Role {
pub name: String,
pub id: RoleId,
pub oid: u32,
pub attributes: RoleAttributes,
pub membership: RoleMembership,
pub vars: RoleVars,
}
impl Role {
pub fn is_user(&self) -> bool {
self.id.is_user()
}
pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
}
}
impl From<Role> for durable::Role {
fn from(role: Role) -> durable::Role {
durable::Role {
id: role.id,
oid: role.oid,
name: role.name,
attributes: role.attributes,
membership: role.membership,
vars: role.vars,
}
}
}
impl From<durable::Role> for Role {
fn from(
durable::Role {
id,
oid,
name,
attributes,
membership,
vars,
}: durable::Role,
) -> Self {
Role {
name,
id,
oid,
attributes,
membership,
vars,
}
}
}
impl UpdateFrom<durable::Role> for Role {
fn update_from(
&mut self,
durable::Role {
id,
oid,
name,
attributes,
membership,
vars,
}: durable::Role,
) {
self.id = id;
self.oid = oid;
self.name = name;
self.attributes = attributes;
self.membership = membership;
self.vars = vars;
}
}
#[derive(Debug, Serialize, Clone, PartialEq)]
pub struct Cluster {
pub name: String,
pub id: ClusterId,
pub config: ClusterConfig,
#[serde(skip)]
pub log_indexes: BTreeMap<LogVariant, GlobalId>,
pub bound_objects: BTreeSet<CatalogItemId>,
pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
#[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
pub owner_id: RoleId,
pub privileges: PrivilegeMap,
}
impl Cluster {
pub fn role(&self) -> ClusterRole {
if self.name == MZ_SYSTEM_CLUSTER.name {
ClusterRole::SystemCritical
} else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
ClusterRole::System
} else {
ClusterRole::User
}
}
pub fn is_managed(&self) -> bool {
matches!(self.config.variant, ClusterVariant::Managed { .. })
}
pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
self.replicas().filter(|r| !r.config.location.internal())
}
pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
self.replicas_by_id_.values()
}
pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
self.replicas_by_id_.get(&replica_id)
}
pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
self.replica_id_by_name_.get(name).copied()
}
pub fn availability_zones(&self) -> Option<&[String]> {
match &self.config.variant {
ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
ClusterVariant::Unmanaged => None,
}
}
pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
let name = self.name.clone();
let variant = match &self.config.variant {
ClusterVariant::Managed(ClusterVariantManaged {
size,
availability_zones,
logging,
replication_factor,
disk,
optimizer_feature_overrides,
schedule,
}) => {
let introspection = match logging {
ReplicaLogging {
log_logging,
interval: Some(interval),
} => Some(ComputeReplicaIntrospectionConfig {
debugging: *log_logging,
interval: interval.clone(),
}),
ReplicaLogging {
log_logging: _,
interval: None,
} => None,
};
let compute = ComputeReplicaConfig { introspection };
CreateClusterVariant::Managed(CreateClusterManagedPlan {
replication_factor: replication_factor.clone(),
size: size.clone(),
availability_zones: availability_zones.clone(),
compute,
disk: disk.clone(),
optimizer_feature_overrides: optimizer_feature_overrides.clone(),
schedule: schedule.clone(),
})
}
ClusterVariant::Unmanaged => {
return Err(PlanError::Unsupported {
feature: "SHOW CREATE for unmanaged clusters".to_string(),
discussion_no: None,
});
}
};
let workload_class = self.config.workload_class.clone();
Ok(CreateClusterPlan {
name,
variant,
workload_class,
})
}
}
impl From<Cluster> for durable::Cluster {
fn from(cluster: Cluster) -> durable::Cluster {
durable::Cluster {
id: cluster.id,
name: cluster.name,
owner_id: cluster.owner_id,
privileges: cluster.privileges.into_all_values().collect(),
config: cluster.config.into(),
}
}
}
impl From<durable::Cluster> for Cluster {
fn from(
durable::Cluster {
id,
name,
owner_id,
privileges,
config,
}: durable::Cluster,
) -> Self {
Cluster {
name: name.clone(),
id,
bound_objects: BTreeSet::new(),
log_indexes: BTreeMap::new(),
replica_id_by_name_: BTreeMap::new(),
replicas_by_id_: BTreeMap::new(),
owner_id,
privileges: PrivilegeMap::from_mz_acl_items(privileges),
config: config.into(),
}
}
}
impl UpdateFrom<durable::Cluster> for Cluster {
fn update_from(
&mut self,
durable::Cluster {
id,
name,
owner_id,
privileges,
config,
}: durable::Cluster,
) {
self.id = id;
self.name = name;
self.owner_id = owner_id;
self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
self.config = config.into();
}
}
#[derive(Debug, Serialize, Clone, PartialEq)]
pub struct ClusterReplica {
pub name: String,
pub cluster_id: ClusterId,
pub replica_id: ReplicaId,
pub config: ReplicaConfig,
pub owner_id: RoleId,
}
impl From<ClusterReplica> for durable::ClusterReplica {
fn from(replica: ClusterReplica) -> durable::ClusterReplica {
durable::ClusterReplica {
cluster_id: replica.cluster_id,
replica_id: replica.replica_id,
name: replica.name,
config: replica.config.into(),
owner_id: replica.owner_id,
}
}
}
#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
pub struct ClusterReplicaProcessStatus {
pub status: ClusterStatus,
pub time: DateTime<Utc>,
}
#[derive(Debug, Serialize, Clone, PartialEq)]
pub struct SourceReferences {
pub updated_at: u64,
pub references: Vec<SourceReference>,
}
#[derive(Debug, Serialize, Clone, PartialEq)]
pub struct SourceReference {
pub name: String,
pub namespace: Option<String>,
pub columns: Vec<String>,
}
impl From<SourceReference> for durable::SourceReference {
fn from(source_reference: SourceReference) -> durable::SourceReference {
durable::SourceReference {
name: source_reference.name,
namespace: source_reference.namespace,
columns: source_reference.columns,
}
}
}
impl SourceReferences {
pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
durable::SourceReferences {
source_id,
updated_at: self.updated_at,
references: self.references.into_iter().map(Into::into).collect(),
}
}
}
impl From<durable::SourceReference> for SourceReference {
fn from(source_reference: durable::SourceReference) -> SourceReference {
SourceReference {
name: source_reference.name,
namespace: source_reference.namespace,
columns: source_reference.columns,
}
}
}
impl From<durable::SourceReferences> for SourceReferences {
fn from(source_references: durable::SourceReferences) -> SourceReferences {
SourceReferences {
updated_at: source_references.updated_at,
references: source_references
.references
.into_iter()
.map(|source_reference| source_reference.into())
.collect(),
}
}
}
impl From<mz_sql::plan::SourceReference> for SourceReference {
fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
SourceReference {
name: source_reference.name,
namespace: source_reference.namespace,
columns: source_reference.columns,
}
}
}
impl From<mz_sql::plan::SourceReferences> for SourceReferences {
fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
SourceReferences {
updated_at: source_references.updated_at,
references: source_references
.references
.into_iter()
.map(|source_reference| source_reference.into())
.collect(),
}
}
}
impl From<SourceReferences> for mz_sql::plan::SourceReferences {
fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
mz_sql::plan::SourceReferences {
updated_at: source_references.updated_at,
references: source_references
.references
.into_iter()
.map(|source_reference| source_reference.into())
.collect(),
}
}
}
impl From<SourceReference> for mz_sql::plan::SourceReference {
fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
mz_sql::plan::SourceReference {
name: source_reference.name,
namespace: source_reference.namespace,
columns: source_reference.columns,
}
}
}
#[derive(Clone, Debug, Serialize)]
pub struct CatalogEntry {
pub item: CatalogItem,
#[serde(skip)]
pub referenced_by: Vec<CatalogItemId>,
#[serde(skip)]
pub used_by: Vec<CatalogItemId>,
pub id: CatalogItemId,
pub oid: u32,
pub name: QualifiedItemName,
pub owner_id: RoleId,
pub privileges: PrivilegeMap,
}
#[derive(Clone, Debug)]
pub struct CatalogCollectionEntry {
entry: CatalogEntry,
#[allow(dead_code)]
version: RelationVersionSelector,
}
impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
fn desc(&self, name: &FullItemName) -> Result<Cow<RelationDesc>, SqlCatalogError> {
self.entry.desc(name)
}
fn global_id(&self) -> GlobalId {
match self.entry.item() {
CatalogItem::Source(source) => source.global_id,
CatalogItem::Log(log) => log.global_id,
CatalogItem::View(view) => view.global_id,
CatalogItem::MaterializedView(mv) => mv.global_id,
CatalogItem::Sink(sink) => sink.global_id,
CatalogItem::Index(index) => index.global_id,
CatalogItem::Type(ty) => ty.global_id,
CatalogItem::Func(func) => func.global_id,
CatalogItem::Secret(secret) => secret.global_id,
CatalogItem::Connection(conn) => conn.global_id,
CatalogItem::ContinualTask(ct) => ct.global_id,
CatalogItem::Table(table) => match self.version {
RelationVersionSelector::Latest => {
let (_version, gid) = table
.collections
.last_key_value()
.expect("at least one version");
*gid
}
RelationVersionSelector::Specific(version) => table
.collections
.get(&version)
.expect("catalog corruption, missing version!")
.clone(),
},
}
}
}
impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
fn name(&self) -> &QualifiedItemName {
self.entry.name()
}
fn id(&self) -> CatalogItemId {
self.entry.id()
}
fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
Box::new(self.entry.global_ids())
}
fn oid(&self) -> u32 {
self.entry.oid()
}
fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
self.entry.func()
}
fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
self.entry.source_desc()
}
fn connection(
&self,
) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
{
mz_sql::catalog::CatalogItem::connection(&self.entry)
}
fn create_sql(&self) -> &str {
self.entry.create_sql()
}
fn item_type(&self) -> SqlCatalogItemType {
self.entry.item_type()
}
fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
self.entry.index_details()
}
fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
self.entry.writable_table_details()
}
fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
self.entry.type_details()
}
fn references(&self) -> &ResolvedIds {
self.entry.references()
}
fn uses(&self) -> BTreeSet<CatalogItemId> {
self.entry.uses()
}
fn referenced_by(&self) -> &[CatalogItemId] {
self.entry.referenced_by()
}
fn used_by(&self) -> &[CatalogItemId] {
self.entry.used_by()
}
fn subsource_details(
&self,
) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
self.entry.subsource_details()
}
fn source_export_details(
&self,
) -> Option<(
CatalogItemId,
&UnresolvedItemName,
&SourceExportDetails,
&SourceExportDataConfig<ReferencedConnection>,
)> {
self.entry.source_export_details()
}
fn is_progress_source(&self) -> bool {
self.entry.is_progress_source()
}
fn progress_id(&self) -> Option<CatalogItemId> {
self.entry.progress_id()
}
fn owner_id(&self) -> RoleId {
*self.entry.owner_id()
}
fn privileges(&self) -> &PrivilegeMap {
self.entry.privileges()
}
fn cluster_id(&self) -> Option<ClusterId> {
self.entry.item().cluster_id()
}
fn at_version(
&self,
version: RelationVersionSelector,
) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
self.entry.at_version(version)
}
}
#[derive(Debug, Clone, Serialize)]
pub enum CatalogItem {
Table(Table),
Source(Source),
Log(Log),
View(View),
MaterializedView(MaterializedView),
Sink(Sink),
Index(Index),
Type(Type),
Func(Func),
Secret(Secret),
Connection(Connection),
ContinualTask(ContinualTask),
}
impl From<CatalogEntry> for durable::Item {
fn from(entry: CatalogEntry) -> durable::Item {
let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
durable::Item {
id: entry.id,
oid: entry.oid,
global_id,
schema_id: entry.name.qualifiers.schema_spec.into(),
name: entry.name.item,
create_sql,
owner_id: entry.owner_id,
privileges: entry.privileges.into_all_values().collect(),
extra_versions,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct Table {
pub create_sql: Option<String>,
pub desc: RelationDesc,
#[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
pub collections: BTreeMap<RelationVersion, GlobalId>,
#[serde(skip)]
pub conn_id: Option<ConnectionId>,
pub resolved_ids: ResolvedIds,
pub custom_logical_compaction_window: Option<CompactionWindow>,
pub is_retained_metrics_object: bool,
pub data_source: TableDataSource,
}
impl Table {
pub fn timeline(&self) -> Timeline {
match &self.data_source {
TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
TableDataSource::DataSource { timeline, .. } => timeline.clone(),
}
}
pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
self.collections.values().copied()
}
pub fn global_id_writes(&self) -> GlobalId {
self.collections
.last_key_value()
.expect("at least one version of a table")
.1
.clone()
}
pub fn collection_descs(&self) -> impl Iterator<Item = (GlobalId, RelationDesc)> + '_ {
assert_eq!(self.collections.len(), 1);
self.collections
.values()
.map(|gid| (*gid, self.desc.clone()))
}
}
#[derive(Clone, Debug, Serialize)]
pub enum TableDataSource {
TableWrites {
#[serde(skip)]
defaults: Vec<Expr<Aug>>,
},
DataSource {
desc: DataSourceDesc,
timeline: Timeline,
},
}
#[derive(Debug, Clone, Serialize)]
pub enum DataSourceDesc {
Ingestion {
ingestion_desc: PlanIngestion,
cluster_id: ClusterId,
},
IngestionExport {
ingestion_id: CatalogItemId,
external_reference: UnresolvedItemName,
details: SourceExportDetails,
data_config: SourceExportDataConfig<ReferencedConnection>,
},
Introspection(IntrospectionType),
Progress,
Webhook {
validate_using: Option<WebhookValidation>,
body_format: WebhookBodyFormat,
headers: WebhookHeaders,
cluster_id: ClusterId,
},
}
impl DataSourceDesc {
pub fn formats(&self) -> (Option<&str>, Option<&str>) {
match &self {
DataSourceDesc::Ingestion { ingestion_desc, .. } => {
match &ingestion_desc.desc.primary_export {
Some(export) => match export.encoding.as_ref() {
Some(encoding) => match &encoding.key {
Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
None => (None, Some(encoding.value.type_())),
},
None => (None, None),
},
None => (None, None),
}
}
DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
Some(encoding) => match &encoding.key {
Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
None => (None, Some(encoding.value.type_())),
},
None => (None, None),
},
DataSourceDesc::Introspection(_)
| DataSourceDesc::Webhook { .. }
| DataSourceDesc::Progress => (None, None),
}
}
pub fn envelope(&self) -> Option<&str> {
fn envelope_string(envelope: &SourceEnvelope) -> &str {
match envelope {
SourceEnvelope::None(_) => "none",
SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
"debezium"
}
mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
"upsert-value-err-inline"
}
},
SourceEnvelope::CdcV2 => {
"materialize"
}
}
}
match self {
DataSourceDesc::Ingestion { ingestion_desc, .. } => ingestion_desc
.desc
.primary_export
.as_ref()
.map(|export| envelope_string(&export.envelope)),
DataSourceDesc::IngestionExport { data_config, .. } => {
Some(envelope_string(&data_config.envelope))
}
DataSourceDesc::Introspection(_)
| DataSourceDesc::Webhook { .. }
| DataSourceDesc::Progress => None,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct Source {
pub create_sql: Option<String>,
pub global_id: GlobalId,
#[serde(skip)]
pub data_source: DataSourceDesc,
pub desc: RelationDesc,
pub timeline: Timeline,
pub resolved_ids: ResolvedIds,
pub custom_logical_compaction_window: Option<CompactionWindow>,
pub is_retained_metrics_object: bool,
}
impl Source {
pub fn new(
plan: CreateSourcePlan,
global_id: GlobalId,
resolved_ids: ResolvedIds,
custom_logical_compaction_window: Option<CompactionWindow>,
is_retained_metrics_object: bool,
) -> Source {
Source {
create_sql: Some(plan.source.create_sql),
data_source: match plan.source.data_source {
mz_sql::plan::DataSourceDesc::Ingestion(ingestion_desc) => {
DataSourceDesc::Ingestion {
ingestion_desc,
cluster_id: plan
.in_cluster
.expect("ingestion-based sources must be given a cluster ID"),
}
}
mz_sql::plan::DataSourceDesc::Progress => {
assert!(
plan.in_cluster.is_none(),
"subsources must not have a host config or cluster_id defined"
);
DataSourceDesc::Progress
}
mz_sql::plan::DataSourceDesc::IngestionExport {
ingestion_id,
external_reference,
details,
data_config,
} => {
assert!(
plan.in_cluster.is_none(),
"subsources must not have a host config or cluster_id defined"
);
DataSourceDesc::IngestionExport {
ingestion_id,
external_reference,
details,
data_config,
}
}
mz_sql::plan::DataSourceDesc::Webhook {
validate_using,
body_format,
headers,
} => DataSourceDesc::Webhook {
validate_using,
body_format,
headers,
cluster_id: plan
.in_cluster
.expect("webhook sources must be given a cluster ID"),
},
},
desc: plan.source.desc,
global_id,
timeline: plan.timeline,
resolved_ids,
custom_logical_compaction_window: plan
.source
.compaction_window
.or(custom_logical_compaction_window),
is_retained_metrics_object,
}
}
pub fn source_type(&self) -> &str {
match &self.data_source {
DataSourceDesc::Ingestion { ingestion_desc, .. } => {
ingestion_desc.desc.connection.name()
}
DataSourceDesc::Progress => "progress",
DataSourceDesc::IngestionExport { .. } => "subsource",
DataSourceDesc::Introspection(_) => "source",
DataSourceDesc::Webhook { .. } => "webhook",
}
}
pub fn connection_id(&self) -> Option<CatalogItemId> {
match &self.data_source {
DataSourceDesc::Ingestion { ingestion_desc, .. } => {
ingestion_desc.desc.connection.connection_id()
}
DataSourceDesc::IngestionExport { .. }
| DataSourceDesc::Introspection(_)
| DataSourceDesc::Webhook { .. }
| DataSourceDesc::Progress => None,
}
}
pub fn global_id(&self) -> GlobalId {
self.global_id
}
pub fn user_controllable_persist_shard_count(&self) -> i64 {
match &self.data_source {
DataSourceDesc::Ingestion { ingestion_desc, .. } => {
match &ingestion_desc.desc.connection {
GenericSourceConnection::Postgres(_) | GenericSourceConnection::MySql(_) => 0,
GenericSourceConnection::LoadGenerator(lg) => {
if lg.load_generator.views().is_empty() {
1
} else {
0
}
}
GenericSourceConnection::Kafka(_) => 1,
}
}
DataSourceDesc::IngestionExport { .. } => 1,
DataSourceDesc::Webhook { .. } => 1,
DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => 0,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct Log {
pub variant: LogVariant,
pub global_id: GlobalId,
}
impl Log {
pub fn global_id(&self) -> GlobalId {
self.global_id
}
}
#[derive(Debug, Clone, Serialize)]
pub struct Sink {
pub create_sql: String,
pub global_id: GlobalId,
pub from: GlobalId,
pub connection: StorageSinkConnection<ReferencedConnection>,
pub envelope: SinkEnvelope,
pub partition_strategy: SinkPartitionStrategy,
pub with_snapshot: bool,
pub version: u64,
pub resolved_ids: ResolvedIds,
pub cluster_id: ClusterId,
}
impl Sink {
pub fn sink_type(&self) -> &str {
self.connection.name()
}
pub fn envelope(&self) -> Option<&str> {
match &self.envelope {
SinkEnvelope::Debezium => Some("debezium"),
SinkEnvelope::Upsert => Some("upsert"),
}
}
pub fn combined_format(&self) -> Cow<'_, str> {
let StorageSinkConnection::Kafka(connection) = &self.connection;
connection.format.get_format_name()
}
pub fn formats(&self) -> (Option<&str>, &str) {
let StorageSinkConnection::Kafka(connection) = &self.connection;
let key_format = connection
.format
.key_format
.as_ref()
.map(|format| format.get_format_name());
let value_format = connection.format.value_format.get_format_name();
(key_format, value_format)
}
pub fn connection_id(&self) -> Option<CatalogItemId> {
self.connection.connection_id()
}
pub fn global_id(&self) -> GlobalId {
self.global_id
}
}
#[derive(Debug, Clone, Serialize)]
pub struct View {
pub create_sql: String,
pub global_id: GlobalId,
pub raw_expr: Arc<HirRelationExpr>,
pub optimized_expr: Arc<OptimizedMirRelationExpr>,
pub desc: RelationDesc,
pub conn_id: Option<ConnectionId>,
pub resolved_ids: ResolvedIds,
pub dependencies: DependencyIds,
}
impl View {
pub fn global_id(&self) -> GlobalId {
self.global_id
}
}
#[derive(Debug, Clone, Serialize)]
pub struct MaterializedView {
pub create_sql: String,
pub global_id: GlobalId,
pub raw_expr: Arc<HirRelationExpr>,
pub optimized_expr: Arc<OptimizedMirRelationExpr>,
pub desc: RelationDesc,
pub resolved_ids: ResolvedIds,
pub dependencies: DependencyIds,
pub cluster_id: ClusterId,
pub non_null_assertions: Vec<usize>,
pub custom_logical_compaction_window: Option<CompactionWindow>,
pub refresh_schedule: Option<RefreshSchedule>,
pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
}
impl MaterializedView {
pub fn global_id(&self) -> GlobalId {
self.global_id
}
}
#[derive(Debug, Clone, Serialize)]
pub struct Index {
pub create_sql: String,
pub global_id: GlobalId,
pub on: GlobalId,
pub keys: Arc<[MirScalarExpr]>,
pub conn_id: Option<ConnectionId>,
pub resolved_ids: ResolvedIds,
pub cluster_id: ClusterId,
pub custom_logical_compaction_window: Option<CompactionWindow>,
pub is_retained_metrics_object: bool,
}
impl Index {
pub fn global_id(&self) -> GlobalId {
self.global_id
}
}
#[derive(Debug, Clone, Serialize)]
pub struct Type {
pub create_sql: Option<String>,
pub global_id: GlobalId,
#[serde(skip)]
pub details: CatalogTypeDetails<IdReference>,
pub desc: Option<RelationDesc>,
pub resolved_ids: ResolvedIds,
}
#[derive(Debug, Clone, Serialize)]
pub struct Func {
#[serde(skip)]
pub inner: &'static mz_sql::func::Func,
pub global_id: GlobalId,
}
#[derive(Debug, Clone, Serialize)]
pub struct Secret {
pub create_sql: String,
pub global_id: GlobalId,
}
#[derive(Debug, Clone, Serialize)]
pub struct Connection {
pub create_sql: String,
pub global_id: GlobalId,
pub details: ConnectionDetails,
pub resolved_ids: ResolvedIds,
}
impl Connection {
pub fn global_id(&self) -> GlobalId {
self.global_id
}
}
#[derive(Debug, Clone, Serialize)]
pub struct ContinualTask {
pub create_sql: String,
pub global_id: GlobalId,
pub input_id: GlobalId,
pub with_snapshot: bool,
pub raw_expr: Arc<HirRelationExpr>,
pub desc: RelationDesc,
pub resolved_ids: ResolvedIds,
pub dependencies: DependencyIds,
pub cluster_id: ClusterId,
pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
}
impl ContinualTask {
pub fn global_id(&self) -> GlobalId {
self.global_id
}
}
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
pub struct NetworkPolicy {
pub name: String,
pub id: NetworkPolicyId,
pub oid: u32,
pub rules: Vec<NetworkPolicyRule>,
pub owner_id: RoleId,
pub privileges: PrivilegeMap,
}
impl From<NetworkPolicy> for durable::NetworkPolicy {
fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
durable::NetworkPolicy {
id: policy.id,
oid: policy.oid,
name: policy.name,
rules: policy.rules,
owner_id: policy.owner_id,
privileges: policy.privileges.into_all_values().collect(),
}
}
}
impl From<durable::NetworkPolicy> for NetworkPolicy {
fn from(
durable::NetworkPolicy {
id,
oid,
name,
rules,
owner_id,
privileges,
}: durable::NetworkPolicy,
) -> Self {
NetworkPolicy {
id,
oid,
name,
rules,
owner_id,
privileges: PrivilegeMap::from_mz_acl_items(privileges),
}
}
}
impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
fn update_from(
&mut self,
durable::NetworkPolicy {
id,
oid,
name,
rules,
owner_id,
privileges,
}: durable::NetworkPolicy,
) {
self.id = id;
self.oid = oid;
self.name = name;
self.rules = rules;
self.owner_id = owner_id;
self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
}
}
impl CatalogItem {
pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
match self {
CatalogItem::Table(_) => mz_sql::catalog::CatalogItemType::Table,
CatalogItem::Source(_) => mz_sql::catalog::CatalogItemType::Source,
CatalogItem::Log(_) => mz_sql::catalog::CatalogItemType::Source,
CatalogItem::Sink(_) => mz_sql::catalog::CatalogItemType::Sink,
CatalogItem::View(_) => mz_sql::catalog::CatalogItemType::View,
CatalogItem::MaterializedView(_) => mz_sql::catalog::CatalogItemType::MaterializedView,
CatalogItem::Index(_) => mz_sql::catalog::CatalogItemType::Index,
CatalogItem::Type(_) => mz_sql::catalog::CatalogItemType::Type,
CatalogItem::Func(_) => mz_sql::catalog::CatalogItemType::Func,
CatalogItem::Secret(_) => mz_sql::catalog::CatalogItemType::Secret,
CatalogItem::Connection(_) => mz_sql::catalog::CatalogItemType::Connection,
CatalogItem::ContinualTask(_) => mz_sql::catalog::CatalogItemType::ContinualTask,
}
}
pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
let gid = match self {
CatalogItem::Source(source) => source.global_id,
CatalogItem::Log(log) => log.global_id,
CatalogItem::Sink(sink) => sink.global_id,
CatalogItem::View(view) => view.global_id,
CatalogItem::MaterializedView(mv) => mv.global_id,
CatalogItem::ContinualTask(ct) => ct.global_id,
CatalogItem::Index(index) => index.global_id,
CatalogItem::Func(func) => func.global_id,
CatalogItem::Type(ty) => ty.global_id,
CatalogItem::Secret(secret) => secret.global_id,
CatalogItem::Connection(conn) => conn.global_id,
CatalogItem::Table(table) => {
return itertools::Either::Left(table.collections.values().copied());
}
};
itertools::Either::Right(std::iter::once(gid))
}
pub fn latest_global_id(&self) -> GlobalId {
match self {
CatalogItem::Source(source) => source.global_id,
CatalogItem::Log(log) => log.global_id,
CatalogItem::Sink(sink) => sink.global_id,
CatalogItem::View(view) => view.global_id,
CatalogItem::MaterializedView(mv) => mv.global_id,
CatalogItem::ContinualTask(ct) => ct.global_id,
CatalogItem::Index(index) => index.global_id,
CatalogItem::Func(func) => func.global_id,
CatalogItem::Type(ty) => ty.global_id,
CatalogItem::Secret(secret) => secret.global_id,
CatalogItem::Connection(conn) => conn.global_id,
CatalogItem::Table(table) => table.global_id_writes(),
}
}
pub fn is_storage_collection(&self) -> bool {
match self {
CatalogItem::Table(_)
| CatalogItem::Source(_)
| CatalogItem::MaterializedView(_)
| CatalogItem::ContinualTask(_) => true,
CatalogItem::Log(_)
| CatalogItem::Sink(_)
| CatalogItem::View(_)
| CatalogItem::Index(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_) => false,
}
}
pub fn desc(&self, name: &FullItemName) -> Result<Cow<RelationDesc>, SqlCatalogError> {
self.desc_opt().ok_or(SqlCatalogError::InvalidDependency {
name: name.to_string(),
typ: self.typ(),
})
}
pub fn desc_opt(&self) -> Option<Cow<RelationDesc>> {
match &self {
CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
CatalogItem::Table(tbl) => Some(Cow::Borrowed(&tbl.desc)),
CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
CatalogItem::MaterializedView(mview) => Some(Cow::Borrowed(&mview.desc)),
CatalogItem::Type(typ) => typ.desc.as_ref().map(Cow::Borrowed),
CatalogItem::ContinualTask(ct) => Some(Cow::Borrowed(&ct.desc)),
CatalogItem::Func(_)
| CatalogItem::Index(_)
| CatalogItem::Sink(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_) => None,
}
}
pub fn func(
&self,
entry: &CatalogEntry,
) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
match &self {
CatalogItem::Func(func) => Ok(func.inner),
_ => Err(SqlCatalogError::UnexpectedType {
name: entry.name().item.to_string(),
actual_type: entry.item_type(),
expected_type: CatalogItemType::Func,
}),
}
}
pub fn source_desc(
&self,
entry: &CatalogEntry,
) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
match &self {
CatalogItem::Source(source) => match &source.data_source {
DataSourceDesc::Ingestion { ingestion_desc, .. } => Ok(Some(&ingestion_desc.desc)),
DataSourceDesc::IngestionExport { .. }
| DataSourceDesc::Introspection(_)
| DataSourceDesc::Webhook { .. }
| DataSourceDesc::Progress => Ok(None),
},
_ => Err(SqlCatalogError::UnexpectedType {
name: entry.name().item.to_string(),
actual_type: entry.item_type(),
expected_type: CatalogItemType::Source,
}),
}
}
pub fn is_progress_source(&self) -> bool {
matches!(
self,
CatalogItem::Source(Source {
data_source: DataSourceDesc::Progress,
..
})
)
}
pub fn references(&self) -> &ResolvedIds {
static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
match self {
CatalogItem::Func(_) => &*EMPTY,
CatalogItem::Index(idx) => &idx.resolved_ids,
CatalogItem::Sink(sink) => &sink.resolved_ids,
CatalogItem::Source(source) => &source.resolved_ids,
CatalogItem::Log(_) => &*EMPTY,
CatalogItem::Table(table) => &table.resolved_ids,
CatalogItem::Type(typ) => &typ.resolved_ids,
CatalogItem::View(view) => &view.resolved_ids,
CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
CatalogItem::Secret(_) => &*EMPTY,
CatalogItem::Connection(connection) => &connection.resolved_ids,
CatalogItem::ContinualTask(ct) => &ct.resolved_ids,
}
}
pub fn uses(&self) -> BTreeSet<CatalogItemId> {
let mut uses: BTreeSet<_> = self.references().items().copied().collect();
match self {
CatalogItem::Func(_) => {}
CatalogItem::Index(_) => {}
CatalogItem::Sink(_) => {}
CatalogItem::Source(_) => {}
CatalogItem::Log(_) => {}
CatalogItem::Table(_) => {}
CatalogItem::Type(_) => {}
CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
CatalogItem::MaterializedView(mview) => {
uses.extend(mview.dependencies.0.iter().copied())
}
CatalogItem::ContinualTask(ct) => uses.extend(ct.dependencies.0.iter().copied()),
CatalogItem::Secret(_) => {}
CatalogItem::Connection(_) => {}
}
uses
}
pub fn conn_id(&self) -> Option<&ConnectionId> {
match self {
CatalogItem::View(view) => view.conn_id.as_ref(),
CatalogItem::Index(index) => index.conn_id.as_ref(),
CatalogItem::Table(table) => table.conn_id.as_ref(),
CatalogItem::Log(_)
| CatalogItem::Source(_)
| CatalogItem::Sink(_)
| CatalogItem::MaterializedView(_)
| CatalogItem::Secret(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Connection(_)
| CatalogItem::ContinualTask(_) => None,
}
}
pub fn is_temporary(&self) -> bool {
self.conn_id().is_some()
}
pub fn rename_schema_refs(
&self,
database_name: &str,
cur_schema_name: &str,
new_schema_name: &str,
) -> Result<CatalogItem, (String, String)> {
let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
let mut create_stmt = mz_sql::parse::parse(&create_sql)
.expect("invalid create sql persisted to catalog")
.into_element()
.ast;
mz_sql::ast::transform::create_stmt_rename_schema_refs(
&mut create_stmt,
database_name,
cur_schema_name,
new_schema_name,
)?;
Ok(create_stmt.to_ast_string_stable())
};
match self {
CatalogItem::Table(i) => {
let mut i = i.clone();
i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
Ok(CatalogItem::Table(i))
}
CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
CatalogItem::Source(i) => {
let mut i = i.clone();
i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
Ok(CatalogItem::Source(i))
}
CatalogItem::Sink(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::Sink(i))
}
CatalogItem::View(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::View(i))
}
CatalogItem::MaterializedView(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::MaterializedView(i))
}
CatalogItem::Index(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::Index(i))
}
CatalogItem::Secret(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::Secret(i))
}
CatalogItem::Connection(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::Connection(i))
}
CatalogItem::Type(i) => {
let mut i = i.clone();
i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
Ok(CatalogItem::Type(i))
}
CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
CatalogItem::ContinualTask(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::ContinualTask(i))
}
}
}
pub fn rename_item_refs(
&self,
from: FullItemName,
to_item_name: String,
rename_self: bool,
) -> Result<CatalogItem, String> {
let do_rewrite = |create_sql: String| -> Result<String, String> {
let mut create_stmt = mz_sql::parse::parse(&create_sql)
.expect("invalid create sql persisted to catalog")
.into_element()
.ast;
if rename_self {
mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
}
mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
Ok(create_stmt.to_ast_string_stable())
};
match self {
CatalogItem::Table(i) => {
let mut i = i.clone();
i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
Ok(CatalogItem::Table(i))
}
CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
CatalogItem::Source(i) => {
let mut i = i.clone();
i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
Ok(CatalogItem::Source(i))
}
CatalogItem::Sink(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::Sink(i))
}
CatalogItem::View(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::View(i))
}
CatalogItem::MaterializedView(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::MaterializedView(i))
}
CatalogItem::Index(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::Index(i))
}
CatalogItem::Secret(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::Secret(i))
}
CatalogItem::Func(_) | CatalogItem::Type(_) => {
unreachable!("{}s cannot be renamed", self.typ())
}
CatalogItem::Connection(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::Connection(i))
}
CatalogItem::ContinualTask(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::ContinualTask(i))
}
}
}
pub fn update_retain_history(
&mut self,
value: Option<Value>,
window: CompactionWindow,
) -> Result<Option<WithOptionValue<Raw>>, ()> {
let update = |ast: &mut Statement<Raw>| {
macro_rules! update_retain_history {
( $stmt:ident, $opt:ident, $name:ident ) => {{
let pos = $stmt
.with_options
.iter()
.rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
if let Some(value) = value {
let next = mz_sql_parser::ast::$opt {
name: mz_sql_parser::ast::$name::RetainHistory,
value: Some(WithOptionValue::RetainHistoryFor(value)),
};
if let Some(idx) = pos {
let previous = $stmt.with_options[idx].clone();
$stmt.with_options[idx] = next;
previous.value
} else {
$stmt.with_options.push(next);
None
}
} else {
if let Some(idx) = pos {
$stmt.with_options.swap_remove(idx).value
} else {
None
}
}
}};
}
let previous = match ast {
Statement::CreateTable(ref mut stmt) => {
update_retain_history!(stmt, TableOption, TableOptionName)
}
Statement::CreateIndex(ref mut stmt) => {
update_retain_history!(stmt, IndexOption, IndexOptionName)
}
Statement::CreateSource(ref mut stmt) => {
update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
}
Statement::CreateMaterializedView(ref mut stmt) => {
update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
}
_ => {
return Err(());
}
};
Ok(previous)
};
let res = self.update_sql(update)?;
let cw = self
.custom_logical_compaction_window_mut()
.expect("item must have compaction window");
*cw = Some(window);
Ok(res)
}
pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
where
F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
{
let create_sql = match self {
CatalogItem::Table(Table { create_sql, .. })
| CatalogItem::Type(Type { create_sql, .. })
| CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
CatalogItem::Sink(Sink { create_sql, .. })
| CatalogItem::View(View { create_sql, .. })
| CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
| CatalogItem::Index(Index { create_sql, .. })
| CatalogItem::Secret(Secret { create_sql, .. })
| CatalogItem::Connection(Connection { create_sql, .. })
| CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => Some(create_sql),
CatalogItem::Func(_) | CatalogItem::Log(_) => None,
};
let Some(create_sql) = create_sql else {
return Err(());
};
let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
.expect("non-system items must be parseable")
.into_element()
.ast;
debug!("rewrite: {}", ast.to_ast_string_redacted());
let t = f(&mut ast)?;
*create_sql = ast.to_ast_string_stable();
debug!("rewrote: {}", ast.to_ast_string_redacted());
Ok(t)
}
pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
match self {
CatalogItem::Index(index) => Some(index.cluster_id),
CatalogItem::Table(_)
| CatalogItem::Source(_)
| CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::MaterializedView(_)
| CatalogItem::Sink(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_)
| CatalogItem::ContinualTask(_) => None,
}
}
pub fn cluster_id(&self) -> Option<ClusterId> {
match self {
CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
CatalogItem::Index(index) => Some(index.cluster_id),
CatalogItem::Source(source) => match &source.data_source {
DataSourceDesc::Ingestion { cluster_id, .. } => Some(*cluster_id),
DataSourceDesc::IngestionExport { .. } => None,
DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => None,
},
CatalogItem::Sink(sink) => Some(sink.cluster_id),
CatalogItem::ContinualTask(ct) => Some(ct.cluster_id),
CatalogItem::Table(_)
| CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_) => None,
}
}
pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
match self {
CatalogItem::Table(table) => table.custom_logical_compaction_window,
CatalogItem::Source(source) => source.custom_logical_compaction_window,
CatalogItem::Index(index) => index.custom_logical_compaction_window,
CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::Sink(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_)
| CatalogItem::ContinualTask(_) => None,
}
}
pub fn custom_logical_compaction_window_mut(
&mut self,
) -> Option<&mut Option<CompactionWindow>> {
let cw = match self {
CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::Sink(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_)
| CatalogItem::ContinualTask(_) => return None,
};
Some(cw)
}
pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
let custom_logical_compaction_window = match self {
CatalogItem::Table(_)
| CatalogItem::Source(_)
| CatalogItem::Index(_)
| CatalogItem::MaterializedView(_)
| CatalogItem::ContinualTask(_) => self.custom_logical_compaction_window(),
CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::Sink(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_) => return None,
};
Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
}
pub fn is_retained_metrics_object(&self) -> bool {
match self {
CatalogItem::Table(table) => table.is_retained_metrics_object,
CatalogItem::Source(source) => source.is_retained_metrics_object,
CatalogItem::Index(index) => index.is_retained_metrics_object,
CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::MaterializedView(_)
| CatalogItem::Sink(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_)
| CatalogItem::ContinualTask(_) => false,
}
}
pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
match self {
CatalogItem::Table(table) => {
let create_sql = table
.create_sql
.clone()
.expect("builtin tables cannot be serialized");
let mut collections = table.collections.clone();
let global_id = collections
.remove(&RelationVersion::root())
.expect("at least one version");
(create_sql, global_id, collections)
}
CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
CatalogItem::Source(source) => {
assert!(
!matches!(source.data_source, DataSourceDesc::Introspection(_)),
"cannot serialize introspection/builtin sources",
);
let create_sql = source
.create_sql
.clone()
.expect("builtin sources cannot be serialized");
(create_sql, source.global_id, BTreeMap::new())
}
CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
CatalogItem::MaterializedView(mview) => {
(mview.create_sql.clone(), mview.global_id, BTreeMap::new())
}
CatalogItem::Index(index) => {
(index.create_sql.clone(), index.global_id, BTreeMap::new())
}
CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
CatalogItem::Type(typ) => {
let create_sql = typ
.create_sql
.clone()
.expect("builtin types cannot be serialized");
(create_sql, typ.global_id, BTreeMap::new())
}
CatalogItem::Secret(secret) => {
(secret.create_sql.clone(), secret.global_id, BTreeMap::new())
}
CatalogItem::Connection(connection) => (
connection.create_sql.clone(),
connection.global_id,
BTreeMap::new(),
),
CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
CatalogItem::ContinualTask(ct) => {
(ct.create_sql.clone(), ct.global_id, BTreeMap::new())
}
}
}
pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
match self {
CatalogItem::Table(mut table) => {
let create_sql = table
.create_sql
.expect("builtin tables cannot be serialized");
let global_id = table
.collections
.remove(&RelationVersion::root())
.expect("at least one version");
(create_sql, global_id, table.collections)
}
CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
CatalogItem::Source(source) => {
assert!(
!matches!(source.data_source, DataSourceDesc::Introspection(_)),
"cannot serialize introspection/builtin sources",
);
let create_sql = source
.create_sql
.expect("builtin sources cannot be serialized");
(create_sql, source.global_id, BTreeMap::new())
}
CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
CatalogItem::MaterializedView(mview) => {
(mview.create_sql, mview.global_id, BTreeMap::new())
}
CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
CatalogItem::Type(typ) => {
let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
(create_sql, typ.global_id, BTreeMap::new())
}
CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
CatalogItem::Connection(connection) => {
(connection.create_sql, connection.global_id, BTreeMap::new())
}
CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
CatalogItem::ContinualTask(ct) => (ct.create_sql, ct.global_id, BTreeMap::new()),
}
}
}
impl CatalogEntry {
pub fn desc(&self, name: &FullItemName) -> Result<Cow<RelationDesc>, SqlCatalogError> {
self.item.desc(name)
}
pub fn desc_opt(&self) -> Option<Cow<RelationDesc>> {
self.item.desc_opt()
}
pub fn has_columns(&self) -> bool {
self.item.desc_opt().is_some()
}
pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
self.item.func(self)
}
pub fn index(&self) -> Option<&Index> {
match self.item() {
CatalogItem::Index(idx) => Some(idx),
_ => None,
}
}
pub fn materialized_view(&self) -> Option<&MaterializedView> {
match self.item() {
CatalogItem::MaterializedView(mv) => Some(mv),
_ => None,
}
}
pub fn table(&self) -> Option<&Table> {
match self.item() {
CatalogItem::Table(tbl) => Some(tbl),
_ => None,
}
}
pub fn source(&self) -> Option<&Source> {
match self.item() {
CatalogItem::Source(src) => Some(src),
_ => None,
}
}
pub fn sink(&self) -> Option<&Sink> {
match self.item() {
CatalogItem::Sink(sink) => Some(sink),
_ => None,
}
}
pub fn secret(&self) -> Option<&Secret> {
match self.item() {
CatalogItem::Secret(secret) => Some(secret),
_ => None,
}
}
pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
match self.item() {
CatalogItem::Connection(connection) => Ok(connection),
_ => {
let db_name = match self.name().qualifiers.database_spec {
ResolvedDatabaseSpecifier::Ambient => "".to_string(),
ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
};
Err(SqlCatalogError::UnknownConnection(format!(
"{}{}.{}",
db_name,
self.name().qualifiers.schema_spec,
self.name().item
)))
}
}
}
pub fn source_desc(
&self,
) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
self.item.source_desc(self)
}
pub fn is_connection(&self) -> bool {
matches!(self.item(), CatalogItem::Connection(_))
}
pub fn is_table(&self) -> bool {
matches!(self.item(), CatalogItem::Table(_))
}
pub fn is_source(&self) -> bool {
matches!(self.item(), CatalogItem::Source(_))
}
pub fn subsource_details(
&self,
) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
match &self.item() {
CatalogItem::Source(source) => match &source.data_source {
DataSourceDesc::IngestionExport {
ingestion_id,
external_reference,
details,
data_config: _,
} => Some((*ingestion_id, external_reference, details)),
_ => None,
},
_ => None,
}
}
pub fn source_export_details(
&self,
) -> Option<(
CatalogItemId,
&UnresolvedItemName,
&SourceExportDetails,
&SourceExportDataConfig<ReferencedConnection>,
)> {
match &self.item() {
CatalogItem::Source(source) => match &source.data_source {
DataSourceDesc::IngestionExport {
ingestion_id,
external_reference,
details,
data_config,
} => Some((*ingestion_id, external_reference, details, data_config)),
_ => None,
},
CatalogItem::Table(table) => match &table.data_source {
TableDataSource::DataSource {
desc:
DataSourceDesc::IngestionExport {
ingestion_id,
external_reference,
details,
data_config,
},
timeline: _,
} => Some((*ingestion_id, external_reference, details, data_config)),
_ => None,
},
_ => None,
}
}
pub fn is_progress_source(&self) -> bool {
self.item().is_progress_source()
}
pub fn progress_id(&self) -> Option<CatalogItemId> {
match &self.item() {
CatalogItem::Source(source) => match &source.data_source {
DataSourceDesc::Ingestion { ingestion_desc, .. } => {
Some(ingestion_desc.progress_subsource)
}
DataSourceDesc::IngestionExport { .. }
| DataSourceDesc::Introspection(_)
| DataSourceDesc::Progress
| DataSourceDesc::Webhook { .. } => None,
},
CatalogItem::Table(_)
| CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::MaterializedView(_)
| CatalogItem::Sink(_)
| CatalogItem::Index(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_)
| CatalogItem::ContinualTask(_) => None,
}
}
pub fn is_sink(&self) -> bool {
matches!(self.item(), CatalogItem::Sink(_))
}
pub fn is_materialized_view(&self) -> bool {
matches!(self.item(), CatalogItem::MaterializedView(_))
}
pub fn is_view(&self) -> bool {
matches!(self.item(), CatalogItem::View(_))
}
pub fn is_secret(&self) -> bool {
matches!(self.item(), CatalogItem::Secret(_))
}
pub fn is_introspection_source(&self) -> bool {
matches!(self.item(), CatalogItem::Log(_))
}
pub fn is_index(&self) -> bool {
matches!(self.item(), CatalogItem::Index(_))
}
pub fn is_continual_task(&self) -> bool {
matches!(self.item(), CatalogItem::ContinualTask(_))
}
pub fn is_relation(&self) -> bool {
mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
}
pub fn references(&self) -> &ResolvedIds {
self.item.references()
}
pub fn uses(&self) -> BTreeSet<CatalogItemId> {
self.item.uses()
}
pub fn item(&self) -> &CatalogItem {
&self.item
}
pub fn id(&self) -> CatalogItemId {
self.id
}
pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
self.item().global_ids()
}
pub fn latest_global_id(&self) -> GlobalId {
self.item().latest_global_id()
}
pub fn oid(&self) -> u32 {
self.oid
}
pub fn name(&self) -> &QualifiedItemName {
&self.name
}
pub fn referenced_by(&self) -> &[CatalogItemId] {
&self.referenced_by
}
pub fn used_by(&self) -> &[CatalogItemId] {
&self.used_by
}
pub fn conn_id(&self) -> Option<&ConnectionId> {
self.item.conn_id()
}
pub fn owner_id(&self) -> &RoleId {
&self.owner_id
}
pub fn privileges(&self) -> &PrivilegeMap {
&self.privileges
}
}
#[derive(Debug, Clone, Default)]
pub struct CommentsMap {
map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
}
impl CommentsMap {
pub fn update_comment(
&mut self,
object_id: CommentObjectId,
sub_component: Option<usize>,
comment: Option<String>,
) -> Option<String> {
let object_comments = self.map.entry(object_id).or_default();
let (empty, prev) = if let Some(comment) = comment {
let prev = object_comments.insert(sub_component, comment);
(false, prev)
} else {
let prev = object_comments.remove(&sub_component);
(object_comments.is_empty(), prev)
};
if empty {
self.map.remove(&object_id);
}
prev
}
pub fn drop_comments(
&mut self,
object_ids: &BTreeSet<CommentObjectId>,
) -> Vec<(CommentObjectId, Option<usize>, String)> {
let mut removed_comments = Vec::new();
for object_id in object_ids {
if let Some(comments) = self.map.remove(object_id) {
let removed = comments
.into_iter()
.map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
removed_comments.extend(removed);
}
}
removed_comments
}
pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
self.map
.iter()
.map(|(id, comments)| {
comments
.iter()
.map(|(pos, comment)| (*id, *pos, comment.as_str()))
})
.flatten()
}
pub fn get_object_comments(
&self,
object_id: CommentObjectId,
) -> Option<&BTreeMap<Option<usize>, String>> {
self.map.get(&object_id)
}
}
impl Serialize for CommentsMap {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let comment_count = self
.map
.iter()
.map(|(_object_id, comments)| comments.len())
.sum();
let mut seq = serializer.serialize_seq(Some(comment_count))?;
for (object_id, sub) in &self.map {
for (sub_component, comment) in sub {
seq.serialize_element(&(
format!("{object_id:?}"),
format!("{sub_component:?}"),
comment,
))?;
}
}
seq.end()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
pub struct DefaultPrivileges {
#[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
struct RoleDefaultPrivileges(
#[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
BTreeMap<RoleId, DefaultPrivilegeAclItem>,
);
impl Deref for RoleDefaultPrivileges {
type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for RoleDefaultPrivileges {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl DefaultPrivileges {
pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
if privilege.acl_mode.is_empty() {
return;
}
let privileges = self.privileges.entry(object).or_default();
if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
default_privilege.acl_mode |= privilege.acl_mode;
} else {
privileges.insert(privilege.grantee, privilege);
}
}
pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
if let Some(privileges) = self.privileges.get_mut(object) {
if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
default_privilege.acl_mode =
default_privilege.acl_mode.difference(privilege.acl_mode);
if default_privilege.acl_mode.is_empty() {
privileges.remove(&privilege.grantee);
}
}
if privileges.is_empty() {
self.privileges.remove(object);
}
}
}
pub fn get_privileges_for_grantee(
&self,
object: &DefaultPrivilegeObject,
grantee: &RoleId,
) -> Option<&AclMode> {
self.privileges
.get(object)
.and_then(|privileges| privileges.get(grantee))
.map(|privilege| &privilege.acl_mode)
}
pub fn get_applicable_privileges(
&self,
role_id: RoleId,
database_id: Option<DatabaseId>,
schema_id: Option<SchemaId>,
object_type: mz_sql::catalog::ObjectType,
) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
let privilege_object_type = if object_type.is_relation() {
mz_sql::catalog::ObjectType::Table
} else {
object_type
};
let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
[
DefaultPrivilegeObject {
role_id,
database_id,
schema_id,
object_type: privilege_object_type,
},
DefaultPrivilegeObject {
role_id,
database_id,
schema_id: None,
object_type: privilege_object_type,
},
DefaultPrivilegeObject {
role_id,
database_id: None,
schema_id: None,
object_type: privilege_object_type,
},
DefaultPrivilegeObject {
role_id: RoleId::Public,
database_id,
schema_id,
object_type: privilege_object_type,
},
DefaultPrivilegeObject {
role_id: RoleId::Public,
database_id,
schema_id: None,
object_type: privilege_object_type,
},
DefaultPrivilegeObject {
role_id: RoleId::Public,
database_id: None,
schema_id: None,
object_type: privilege_object_type,
},
]
.into_iter()
.filter_map(|object| self.privileges.get(&object))
.flat_map(|acl_map| acl_map.values())
.fold(
BTreeMap::new(),
|mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
*accum_acl_mode |= *acl_mode;
accum
},
)
.into_iter()
.map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
.filter(|(_, acl_mode)| !acl_mode.is_empty())
.map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
grantee: *grantee,
acl_mode,
})
}
pub fn iter(
&self,
) -> impl Iterator<
Item = (
&DefaultPrivilegeObject,
impl Iterator<Item = &DefaultPrivilegeAclItem>,
),
> {
self.privileges
.iter()
.map(|(object, acl_map)| (object, acl_map.values()))
}
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
pub struct ClusterConfig {
pub variant: ClusterVariant,
pub workload_class: Option<String>,
}
impl ClusterConfig {
pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
match &self.variant {
ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
ClusterVariant::Unmanaged => None,
}
}
}
impl From<ClusterConfig> for durable::ClusterConfig {
fn from(config: ClusterConfig) -> Self {
Self {
variant: config.variant.into(),
workload_class: config.workload_class,
}
}
}
impl From<durable::ClusterConfig> for ClusterConfig {
fn from(config: durable::ClusterConfig) -> Self {
Self {
variant: config.variant.into(),
workload_class: config.workload_class,
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
pub struct ClusterVariantManaged {
pub size: String,
pub availability_zones: Vec<String>,
pub logging: ReplicaLogging,
pub replication_factor: u32,
pub disk: bool,
pub optimizer_feature_overrides: OptimizerFeatureOverrides,
pub schedule: ClusterSchedule,
}
impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
fn from(managed: ClusterVariantManaged) -> Self {
Self {
size: managed.size,
availability_zones: managed.availability_zones,
logging: managed.logging,
replication_factor: managed.replication_factor,
disk: managed.disk,
optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
schedule: managed.schedule,
}
}
}
impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
fn from(managed: durable::ClusterVariantManaged) -> Self {
Self {
size: managed.size,
availability_zones: managed.availability_zones,
logging: managed.logging,
replication_factor: managed.replication_factor,
disk: managed.disk,
optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
schedule: managed.schedule,
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
pub enum ClusterVariant {
Managed(ClusterVariantManaged),
Unmanaged,
}
impl From<ClusterVariant> for durable::ClusterVariant {
fn from(variant: ClusterVariant) -> Self {
match variant {
ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
ClusterVariant::Unmanaged => Self::Unmanaged,
}
}
}
impl From<durable::ClusterVariant> for ClusterVariant {
fn from(variant: durable::ClusterVariant) -> Self {
match variant {
durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
durable::ClusterVariant::Unmanaged => Self::Unmanaged,
}
}
}
impl mz_sql::catalog::CatalogDatabase for Database {
fn name(&self) -> &str {
&self.name
}
fn id(&self) -> DatabaseId {
self.id
}
fn has_schemas(&self) -> bool {
!self.schemas_by_name.is_empty()
}
fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
&self.schemas_by_name
}
#[allow(clippy::as_conversions)]
fn schemas(&self) -> Vec<&dyn CatalogSchema> {
self.schemas_by_id
.values()
.map(|schema| schema as &dyn CatalogSchema)
.collect()
}
fn owner_id(&self) -> RoleId {
self.owner_id
}
fn privileges(&self) -> &PrivilegeMap {
&self.privileges
}
}
impl mz_sql::catalog::CatalogSchema for Schema {
fn database(&self) -> &ResolvedDatabaseSpecifier {
&self.name.database
}
fn name(&self) -> &QualifiedSchemaName {
&self.name
}
fn id(&self) -> &SchemaSpecifier {
&self.id
}
fn has_items(&self) -> bool {
!self.items.is_empty()
}
fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
Box::new(
self.items
.values()
.chain(self.functions.values())
.chain(self.types.values())
.copied(),
)
}
fn owner_id(&self) -> RoleId {
self.owner_id
}
fn privileges(&self) -> &PrivilegeMap {
&self.privileges
}
}
impl mz_sql::catalog::CatalogRole for Role {
fn name(&self) -> &str {
&self.name
}
fn id(&self) -> RoleId {
self.id
}
fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
&self.membership.map
}
fn attributes(&self) -> &RoleAttributes {
&self.attributes
}
fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
&self.vars.map
}
}
impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
fn name(&self) -> &str {
&self.name
}
fn id(&self) -> NetworkPolicyId {
self.id
}
fn owner_id(&self) -> RoleId {
self.owner_id
}
fn privileges(&self) -> &PrivilegeMap {
&self.privileges
}
}
impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
fn name(&self) -> &str {
&self.name
}
fn id(&self) -> ClusterId {
self.id
}
fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
&self.bound_objects
}
fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
&self.replica_id_by_name_
}
#[allow(clippy::as_conversions)]
fn replicas(&self) -> Vec<&dyn CatalogClusterReplica> {
self.replicas()
.map(|replica| replica as &dyn CatalogClusterReplica)
.collect()
}
fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica {
self.replica(id).expect("catalog out of sync")
}
fn owner_id(&self) -> RoleId {
self.owner_id
}
fn privileges(&self) -> &PrivilegeMap {
&self.privileges
}
fn is_managed(&self) -> bool {
self.is_managed()
}
fn managed_size(&self) -> Option<&str> {
match &self.config.variant {
ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
_ => None,
}
}
fn schedule(&self) -> Option<&ClusterSchedule> {
match &self.config.variant {
ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
_ => None,
}
}
fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
self.try_to_plan()
}
}
impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
fn name(&self) -> &str {
&self.name
}
fn cluster_id(&self) -> ClusterId {
self.cluster_id
}
fn replica_id(&self) -> ReplicaId {
self.replica_id
}
fn owner_id(&self) -> RoleId {
self.owner_id
}
fn internal(&self) -> bool {
self.config.location.internal()
}
}
impl mz_sql::catalog::CatalogItem for CatalogEntry {
fn name(&self) -> &QualifiedItemName {
self.name()
}
fn id(&self) -> CatalogItemId {
self.id()
}
fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
Box::new(self.global_ids())
}
fn oid(&self) -> u32 {
self.oid()
}
fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
self.func()
}
fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
self.source_desc()
}
fn connection(
&self,
) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
{
Ok(self.connection()?.details.to_connection())
}
fn create_sql(&self) -> &str {
match self.item() {
CatalogItem::Table(Table { create_sql, .. }) => {
create_sql.as_deref().unwrap_or("<builtin>")
}
CatalogItem::Source(Source { create_sql, .. }) => {
create_sql.as_deref().unwrap_or("<builtin>")
}
CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
CatalogItem::View(View { create_sql, .. }) => create_sql,
CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
CatalogItem::Index(Index { create_sql, .. }) => create_sql,
CatalogItem::Type(Type { create_sql, .. }) => {
create_sql.as_deref().unwrap_or("<builtin>")
}
CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
CatalogItem::Func(_) => "<builtin>",
CatalogItem::Log(_) => "<builtin>",
CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => create_sql,
}
}
fn item_type(&self) -> SqlCatalogItemType {
self.item().typ()
}
fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
Some((keys, *on))
} else {
None
}
}
fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
if let CatalogItem::Table(Table {
data_source: TableDataSource::TableWrites { defaults },
..
}) = self.item()
{
Some(defaults.as_slice())
} else {
None
}
}
fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
if let CatalogItem::Type(Type { details, .. }) = self.item() {
Some(details)
} else {
None
}
}
fn references(&self) -> &ResolvedIds {
self.references()
}
fn uses(&self) -> BTreeSet<CatalogItemId> {
self.uses()
}
fn referenced_by(&self) -> &[CatalogItemId] {
self.referenced_by()
}
fn used_by(&self) -> &[CatalogItemId] {
self.used_by()
}
fn subsource_details(
&self,
) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
self.subsource_details()
}
fn source_export_details(
&self,
) -> Option<(
CatalogItemId,
&UnresolvedItemName,
&SourceExportDetails,
&SourceExportDataConfig<ReferencedConnection>,
)> {
self.source_export_details()
}
fn is_progress_source(&self) -> bool {
self.is_progress_source()
}
fn progress_id(&self) -> Option<CatalogItemId> {
self.progress_id()
}
fn owner_id(&self) -> RoleId {
self.owner_id
}
fn privileges(&self) -> &PrivilegeMap {
&self.privileges
}
fn cluster_id(&self) -> Option<ClusterId> {
self.item().cluster_id()
}
fn at_version(
&self,
version: RelationVersionSelector,
) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
Box::new(CatalogCollectionEntry {
entry: self.clone(),
version,
})
}
}
#[derive(Debug)]
pub struct StateUpdate {
pub kind: StateUpdateKind,
pub ts: Timestamp,
pub diff: StateDiff,
}
#[derive(Debug, Clone)]
pub enum StateUpdateKind {
Role(durable::objects::Role),
Database(durable::objects::Database),
Schema(durable::objects::Schema),
DefaultPrivilege(durable::objects::DefaultPrivilege),
SystemPrivilege(MzAclItem),
SystemConfiguration(durable::objects::SystemConfiguration),
Cluster(durable::objects::Cluster),
NetworkPolicy(durable::objects::NetworkPolicy),
IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
ClusterReplica(durable::objects::ClusterReplica),
SourceReferences(durable::objects::SourceReferences),
SystemObjectMapping(durable::objects::SystemObjectMapping),
TemporaryItem(TemporaryItem),
Item(durable::objects::Item),
Comment(durable::objects::Comment),
AuditLog(durable::objects::AuditLog),
StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
UnfinalizedShard(durable::objects::UnfinalizedShard),
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum StateDiff {
Retraction,
Addition,
}
impl From<StateDiff> for Diff {
fn from(diff: StateDiff) -> Self {
match diff {
StateDiff::Retraction => -1,
StateDiff::Addition => 1,
}
}
}
impl TryFrom<Diff> for StateDiff {
type Error = String;
fn try_from(diff: Diff) -> Result<Self, Self::Error> {
match diff {
-1 => Ok(Self::Retraction),
1 => Ok(Self::Addition),
diff => Err(format!("invalid diff {diff}")),
}
}
}
#[derive(Debug, Clone)]
pub struct TemporaryItem {
pub id: CatalogItemId,
pub oid: u32,
pub name: QualifiedItemName,
pub item: CatalogItem,
pub owner_id: RoleId,
pub privileges: PrivilegeMap,
}
impl From<CatalogEntry> for TemporaryItem {
fn from(entry: CatalogEntry) -> Self {
TemporaryItem {
id: entry.id,
oid: entry.oid,
name: entry.name,
item: entry.item,
owner_id: entry.owner_id,
privileges: entry.privileges,
}
}
}
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
pub enum BootstrapStateUpdateKind {
Role(durable::objects::Role),
Database(durable::objects::Database),
Schema(durable::objects::Schema),
DefaultPrivilege(durable::objects::DefaultPrivilege),
SystemPrivilege(MzAclItem),
SystemConfiguration(durable::objects::SystemConfiguration),
Cluster(durable::objects::Cluster),
NetworkPolicy(durable::objects::NetworkPolicy),
IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
ClusterReplica(durable::objects::ClusterReplica),
SourceReferences(durable::objects::SourceReferences),
SystemObjectMapping(durable::objects::SystemObjectMapping),
Item(durable::objects::Item),
Comment(durable::objects::Comment),
AuditLog(durable::objects::AuditLog),
StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
UnfinalizedShard(durable::objects::UnfinalizedShard),
}
impl From<BootstrapStateUpdateKind> for StateUpdateKind {
fn from(value: BootstrapStateUpdateKind) -> Self {
match value {
BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
StateUpdateKind::DefaultPrivilege(kind)
}
BootstrapStateUpdateKind::SystemPrivilege(kind) => {
StateUpdateKind::SystemPrivilege(kind)
}
BootstrapStateUpdateKind::SystemConfiguration(kind) => {
StateUpdateKind::SystemConfiguration(kind)
}
BootstrapStateUpdateKind::SourceReferences(kind) => {
StateUpdateKind::SourceReferences(kind)
}
BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
StateUpdateKind::IntrospectionSourceIndex(kind)
}
BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
StateUpdateKind::SystemObjectMapping(kind)
}
BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
StateUpdateKind::StorageCollectionMetadata(kind)
}
BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
StateUpdateKind::UnfinalizedShard(kind)
}
}
}
}
impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
type Error = TemporaryItem;
fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
match value {
StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
StateUpdateKind::DefaultPrivilege(kind) => {
Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
}
StateUpdateKind::SystemPrivilege(kind) => {
Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
}
StateUpdateKind::SystemConfiguration(kind) => {
Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
}
StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
StateUpdateKind::NetworkPolicy(kind) => {
Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
}
StateUpdateKind::IntrospectionSourceIndex(kind) => {
Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
}
StateUpdateKind::ClusterReplica(kind) => {
Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
}
StateUpdateKind::SourceReferences(kind) => {
Ok(BootstrapStateUpdateKind::SourceReferences(kind))
}
StateUpdateKind::SystemObjectMapping(kind) => {
Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
}
StateUpdateKind::TemporaryItem(kind) => Err(kind),
StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
StateUpdateKind::StorageCollectionMetadata(kind) => {
Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
}
StateUpdateKind::UnfinalizedShard(kind) => {
Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
}
}
}
}