use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet};
use std::ops::{Deref, DerefMut};
use chrono::{DateTime, Utc};
use mz_adapter_types::compaction::CompactionWindow;
use mz_adapter_types::connection::ConnectionId;
use mz_compute_client::logging::LogVariant;
use mz_controller::clusters::{
ClusterRole, ClusterStatus, ProcessId, ReplicaConfig, ReplicaLogging,
};
use mz_controller_types::{ClusterId, ReplicaId};
use mz_expr::refresh_schedule::RefreshSchedule;
use mz_expr::{CollectionPlan, MirScalarExpr, OptimizedMirRelationExpr};
use mz_ore::collections::CollectionExt;
use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
use mz_repr::optimize::OptimizerFeatureOverrides;
use mz_repr::role_id::RoleId;
use mz_repr::{Diff, GlobalId, RelationDesc};
use mz_sql::ast::display::AstDisplay;
use mz_sql::ast::{Expr, Raw, Statement, UnresolvedItemName, Value, WithOptionValue};
use mz_sql::catalog::{
CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogTypeDetails,
DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, RoleAttributes, RoleMembership,
RoleVars, SystemObjectType,
};
use mz_sql::names::{
Aug, CommentObjectId, DatabaseId, FullItemName, QualifiedItemName, QualifiedSchemaName,
ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
};
use mz_sql::plan::{
ClusterSchedule, CreateSourcePlan, HirRelationExpr, Ingestion as PlanIngestion,
WebhookBodyFormat, WebhookHeaders, WebhookValidation,
};
use mz_sql::rbac;
use mz_sql::session::vars::OwnedVarInput;
use mz_storage_client::controller::IntrospectionType;
use mz_storage_types::connections::inline::ReferencedConnection;
use mz_storage_types::sinks::{KafkaSinkFormat, SinkEnvelope, StorageSinkConnection};
use mz_storage_types::sources::{
GenericSourceConnection, IngestionDescription, SourceConnection, SourceDesc, SourceEnvelope,
SourceExport, Timeline,
};
use once_cell::sync::Lazy;
use serde::ser::SerializeSeq;
use serde::{Deserialize, Serialize};
use timely::progress::Antichain;
use tracing::debug;
use crate::builtin::{MZ_INTROSPECTION_CLUSTER, MZ_SYSTEM_CLUSTER};
use crate::durable;
#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
pub struct Database {
pub name: String,
pub id: DatabaseId,
pub oid: u32,
#[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
pub schemas_by_id: BTreeMap<SchemaId, Schema>,
pub schemas_by_name: BTreeMap<String, SchemaId>,
pub owner_id: RoleId,
pub privileges: PrivilegeMap,
}
impl From<Database> for durable::Database {
fn from(database: Database) -> durable::Database {
durable::Database {
id: database.id,
oid: database.oid,
name: database.name,
owner_id: database.owner_id,
privileges: database.privileges.into_all_values().collect(),
}
}
}
#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
pub struct Schema {
pub name: QualifiedSchemaName,
pub id: SchemaSpecifier,
pub oid: u32,
pub items: BTreeMap<String, GlobalId>,
pub functions: BTreeMap<String, GlobalId>,
pub types: BTreeMap<String, GlobalId>,
pub owner_id: RoleId,
pub privileges: PrivilegeMap,
}
impl Schema {
pub fn into_durable_schema(self, database_id: Option<DatabaseId>) -> durable::Schema {
durable::Schema {
id: self.id.into(),
oid: self.oid,
name: self.name.schema,
database_id,
owner_id: self.owner_id,
privileges: self.privileges.into_all_values().collect(),
}
}
}
#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
pub struct Role {
pub name: String,
pub id: RoleId,
pub oid: u32,
pub attributes: RoleAttributes,
pub membership: RoleMembership,
pub vars: RoleVars,
}
impl Role {
pub fn is_user(&self) -> bool {
self.id.is_user()
}
pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
}
}
impl From<Role> for durable::Role {
fn from(role: Role) -> durable::Role {
durable::Role {
id: role.id,
oid: role.oid,
name: role.name,
attributes: role.attributes,
membership: role.membership,
vars: role.vars,
}
}
}
#[derive(Debug, Serialize, Clone, PartialEq)]
pub struct Cluster {
pub name: String,
pub id: ClusterId,
pub config: ClusterConfig,
#[serde(skip)]
pub log_indexes: BTreeMap<LogVariant, GlobalId>,
pub bound_objects: BTreeSet<GlobalId>,
pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
#[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
pub owner_id: RoleId,
pub privileges: PrivilegeMap,
}
impl Cluster {
pub fn role(&self) -> ClusterRole {
if self.name == MZ_SYSTEM_CLUSTER.name {
ClusterRole::SystemCritical
} else if self.name == MZ_INTROSPECTION_CLUSTER.name {
ClusterRole::System
} else {
ClusterRole::User
}
}
pub fn is_managed(&self) -> bool {
matches!(self.config.variant, ClusterVariant::Managed { .. })
}
pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
self.replicas().filter(|r| !r.config.location.internal())
}
pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
self.replicas_by_id_.values()
}
pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
self.replicas_by_id_.get(&replica_id)
}
pub fn replica_mut(&mut self, replica_id: ReplicaId) -> Option<&mut ClusterReplica> {
self.replicas_by_id_.get_mut(&replica_id)
}
pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
self.replica_id_by_name_.get(name).copied()
}
pub fn insert_replica(&mut self, replica: ClusterReplica) {
assert!(self
.replica_id_by_name_
.insert(replica.name.clone(), replica.replica_id)
.is_none());
assert!(self
.replicas_by_id_
.insert(replica.replica_id, replica)
.is_none());
}
pub fn remove_replica(&mut self, replica_id: ReplicaId) {
let replica = self
.replicas_by_id_
.remove(&replica_id)
.expect("catalog out of sync");
self.replica_id_by_name_
.remove(&replica.name)
.expect("catalog out of sync");
assert_eq!(self.replica_id_by_name_.len(), self.replicas_by_id_.len());
}
pub fn rename_replica(&mut self, replica_id: ReplicaId, to_name: String) {
let replica = self.replica_mut(replica_id).expect("Must exist");
let old_name = std::mem::take(&mut replica.name);
replica.name = to_name.clone();
assert!(self.replica_id_by_name_.remove(&old_name).is_some());
assert!(self
.replica_id_by_name_
.insert(to_name, replica_id)
.is_none());
}
pub fn availability_zones(&self) -> Option<&[String]> {
match &self.config.variant {
ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
ClusterVariant::Unmanaged => None,
}
}
}
impl From<Cluster> for durable::Cluster {
fn from(cluster: Cluster) -> durable::Cluster {
durable::Cluster {
id: cluster.id,
name: cluster.name,
owner_id: cluster.owner_id,
privileges: cluster.privileges.into_all_values().collect(),
config: cluster.config.into(),
}
}
}
#[derive(Debug, Serialize, Clone, PartialEq)]
pub struct ClusterReplica {
pub name: String,
pub cluster_id: ClusterId,
pub replica_id: ReplicaId,
pub config: ReplicaConfig,
#[serde(skip)]
pub process_status: BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
pub owner_id: RoleId,
}
impl ClusterReplica {
pub fn status(&self) -> ClusterStatus {
self.process_status
.values()
.fold(ClusterStatus::Ready, |s, p| match (s, p.status) {
(ClusterStatus::Ready, ClusterStatus::Ready) => ClusterStatus::Ready,
(x, y) => {
let reason_x = match x {
ClusterStatus::NotReady(reason) => reason,
ClusterStatus::Ready => None,
};
let reason_y = match y {
ClusterStatus::NotReady(reason) => reason,
ClusterStatus::Ready => None,
};
ClusterStatus::NotReady(reason_x.or(reason_y))
}
})
}
}
impl From<ClusterReplica> for durable::ClusterReplica {
fn from(replica: ClusterReplica) -> durable::ClusterReplica {
durable::ClusterReplica {
cluster_id: replica.cluster_id,
replica_id: replica.replica_id,
name: replica.name,
config: replica.config.into(),
owner_id: replica.owner_id,
}
}
}
#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
pub struct ClusterReplicaProcessStatus {
pub status: ClusterStatus,
pub time: DateTime<Utc>,
}
#[derive(Clone, Debug, Serialize)]
pub struct CatalogEntry {
pub item: CatalogItem,
#[serde(skip)]
pub referenced_by: Vec<GlobalId>,
#[serde(skip)]
pub used_by: Vec<GlobalId>,
pub id: GlobalId,
pub oid: u32,
pub name: QualifiedItemName,
pub owner_id: RoleId,
pub privileges: PrivilegeMap,
}
#[derive(Debug, Clone, Serialize)]
pub enum CatalogItem {
Table(Table),
Source(Source),
Log(Log),
View(View),
MaterializedView(MaterializedView),
Sink(Sink),
Index(Index),
Type(Type),
Func(Func),
Secret(Secret),
Connection(Connection),
}
impl From<CatalogEntry> for durable::Item {
fn from(entry: CatalogEntry) -> durable::Item {
durable::Item {
id: entry.id,
oid: entry.oid,
schema_id: entry.name.qualifiers.schema_spec.into(),
name: entry.name.item,
create_sql: entry.item.into_serialized(),
owner_id: entry.owner_id,
privileges: entry.privileges.into_all_values().collect(),
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct Table {
pub create_sql: Option<String>,
pub desc: RelationDesc,
#[serde(skip)]
pub defaults: Vec<Expr<Aug>>,
#[serde(skip)]
pub conn_id: Option<ConnectionId>,
pub resolved_ids: ResolvedIds,
pub custom_logical_compaction_window: Option<CompactionWindow>,
pub is_retained_metrics_object: bool,
}
impl Table {
pub fn timeline(&self) -> Timeline {
Timeline::EpochMilliseconds
}
}
#[derive(Debug, Clone, Serialize)]
pub enum DataSourceDesc {
Ingestion(IngestionDescription<(), ReferencedConnection>),
IngestionExport {
ingestion_id: GlobalId,
external_reference: UnresolvedItemName,
},
Source,
Introspection(IntrospectionType),
Progress,
Webhook {
validate_using: Option<WebhookValidation>,
body_format: WebhookBodyFormat,
headers: WebhookHeaders,
cluster_id: ClusterId,
},
}
impl DataSourceDesc {
pub fn ingestion(
id: GlobalId,
ingestion: PlanIngestion,
instance_id: ClusterId,
) -> DataSourceDesc {
let source_exports = ingestion
.subsource_exports
.iter()
.chain(std::iter::once((&id, &0)))
.map(|(id, output_index)| {
let export = SourceExport {
output_index: *output_index,
storage_metadata: (),
};
(*id, export)
})
.collect();
DataSourceDesc::Ingestion(IngestionDescription {
desc: ingestion.desc.clone(),
ingestion_metadata: (),
source_exports,
instance_id,
remap_collection_id: ingestion.progress_subsource,
})
}
}
#[derive(Debug, Clone, Serialize)]
pub struct Source {
pub create_sql: Option<String>,
#[serde(skip)]
pub data_source: DataSourceDesc,
pub desc: RelationDesc,
pub timeline: Timeline,
pub resolved_ids: ResolvedIds,
pub custom_logical_compaction_window: Option<CompactionWindow>,
pub is_retained_metrics_object: bool,
}
impl Source {
pub fn new(
id: GlobalId,
plan: CreateSourcePlan,
resolved_ids: ResolvedIds,
custom_logical_compaction_window: Option<CompactionWindow>,
is_retained_metrics_object: bool,
) -> Source {
Source {
create_sql: Some(plan.source.create_sql),
data_source: match plan.source.data_source {
mz_sql::plan::DataSourceDesc::Ingestion(ingestion) => DataSourceDesc::ingestion(
id,
ingestion,
plan.in_cluster
.expect("ingestion-based sources must be given a cluster ID"),
),
mz_sql::plan::DataSourceDesc::Progress => {
assert!(
plan.in_cluster.is_none(),
"subsources must not have a host config or cluster_id defined"
);
DataSourceDesc::Progress
}
mz_sql::plan::DataSourceDesc::IngestionExport {
ingestion_id,
external_reference,
} => {
assert!(
plan.in_cluster.is_none(),
"subsources must not have a host config or cluster_id defined"
);
DataSourceDesc::IngestionExport {
ingestion_id,
external_reference,
}
}
mz_sql::plan::DataSourceDesc::Source => {
assert!(
plan.in_cluster.is_none(),
"subsources must not have a host config or cluster_id defined"
);
DataSourceDesc::Source
}
mz_sql::plan::DataSourceDesc::Webhook {
validate_using,
body_format,
headers,
} => DataSourceDesc::Webhook {
validate_using,
body_format,
headers,
cluster_id: plan
.in_cluster
.expect("webhook sources must be given a cluster ID"),
},
},
desc: plan.source.desc,
timeline: plan.timeline,
resolved_ids,
custom_logical_compaction_window: plan
.source
.compaction_window
.or(custom_logical_compaction_window),
is_retained_metrics_object,
}
}
pub fn source_type(&self) -> &str {
match &self.data_source {
DataSourceDesc::Ingestion(ingestion) => ingestion.desc.connection.name(),
DataSourceDesc::Progress => "progress",
DataSourceDesc::IngestionExport { .. } => "subsource",
DataSourceDesc::Source => "subsource",
DataSourceDesc::Introspection(_) => "source",
DataSourceDesc::Webhook { .. } => "webhook",
}
}
pub fn formats(&self) -> (Option<&str>, Option<&str>) {
match &self.data_source {
DataSourceDesc::Ingestion(ingestion) => match &ingestion.desc.encoding {
Some(encoding) => match &encoding.key {
Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
None => (None, Some(encoding.value.type_())),
},
None => (None, None),
},
DataSourceDesc::Introspection(_)
| DataSourceDesc::IngestionExport { .. }
| DataSourceDesc::Webhook { .. }
| DataSourceDesc::Progress
| DataSourceDesc::Source => (None, None),
}
}
pub fn envelope(&self) -> Option<&str> {
match &self.data_source {
DataSourceDesc::Ingestion(ingestion) => match ingestion.desc.envelope() {
SourceEnvelope::None(_) => Some("none"),
SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
mz_storage_types::sources::envelope::UpsertStyle::Default(_) => Some("upsert"),
mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
Some("debezium")
}
},
SourceEnvelope::CdcV2 => {
Some("materialize")
}
},
DataSourceDesc::IngestionExport { .. }
| DataSourceDesc::Introspection(_)
| DataSourceDesc::Webhook { .. }
| DataSourceDesc::Progress
| DataSourceDesc::Source => None,
}
}
pub fn connection_id(&self) -> Option<GlobalId> {
match &self.data_source {
DataSourceDesc::Ingestion(ingestion) => ingestion.desc.connection.connection_id(),
DataSourceDesc::IngestionExport { .. }
| DataSourceDesc::Introspection(_)
| DataSourceDesc::Webhook { .. }
| DataSourceDesc::Progress
| DataSourceDesc::Source => None,
}
}
pub fn user_controllable_persist_shard_count(&self) -> i64 {
match &self.data_source {
DataSourceDesc::Ingestion(ingestion) => {
match &ingestion.desc.connection {
GenericSourceConnection::Postgres(_) | GenericSourceConnection::MySql(_) => 0,
GenericSourceConnection::LoadGenerator(lg) => {
if lg.load_generator.views().is_empty() {
1
} else {
0
}
}
GenericSourceConnection::Kafka(_) => 1,
}
}
DataSourceDesc::IngestionExport { .. } => 1,
DataSourceDesc::Webhook { .. } => 1,
DataSourceDesc::Source
| DataSourceDesc::Introspection(_)
| DataSourceDesc::Progress => 0,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct Log {
pub variant: LogVariant,
pub has_storage_collection: bool,
}
#[derive(Debug, Clone, Serialize)]
pub struct Sink {
pub create_sql: String,
pub from: GlobalId,
pub connection: StorageSinkConnection<ReferencedConnection>,
pub envelope: SinkEnvelope,
pub with_snapshot: bool,
pub resolved_ids: ResolvedIds,
pub cluster_id: ClusterId,
}
impl Sink {
pub fn sink_type(&self) -> &str {
self.connection.name()
}
pub fn envelope(&self) -> Option<&str> {
match &self.envelope {
SinkEnvelope::Debezium => Some("debezium"),
SinkEnvelope::Upsert => Some("upsert"),
}
}
pub fn format(&self) -> &str {
let StorageSinkConnection::Kafka(connection) = &self.connection;
match &connection.format {
KafkaSinkFormat::Avro { .. } => "avro",
KafkaSinkFormat::Json => "json",
}
}
pub fn connection_id(&self) -> Option<GlobalId> {
self.connection.connection_id()
}
}
#[derive(Debug, Clone, Serialize)]
pub struct View {
pub create_sql: String,
pub raw_expr: HirRelationExpr,
pub optimized_expr: OptimizedMirRelationExpr,
pub desc: RelationDesc,
pub conn_id: Option<ConnectionId>,
pub resolved_ids: ResolvedIds,
}
#[derive(Debug, Clone, Serialize)]
pub struct MaterializedView {
pub create_sql: String,
pub raw_expr: HirRelationExpr,
pub optimized_expr: OptimizedMirRelationExpr,
pub desc: RelationDesc,
pub resolved_ids: ResolvedIds,
pub cluster_id: ClusterId,
pub non_null_assertions: Vec<usize>,
pub custom_logical_compaction_window: Option<CompactionWindow>,
pub refresh_schedule: Option<RefreshSchedule>,
pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
}
#[derive(Debug, Clone, Serialize)]
pub struct Index {
pub create_sql: String,
pub on: GlobalId,
pub keys: Vec<MirScalarExpr>,
pub conn_id: Option<ConnectionId>,
pub resolved_ids: ResolvedIds,
pub cluster_id: ClusterId,
pub custom_logical_compaction_window: Option<CompactionWindow>,
pub is_retained_metrics_object: bool,
}
#[derive(Debug, Clone, Serialize)]
pub struct Type {
pub create_sql: Option<String>,
#[serde(skip)]
pub details: CatalogTypeDetails<IdReference>,
pub desc: Option<RelationDesc>,
pub resolved_ids: ResolvedIds,
}
#[derive(Debug, Clone, Serialize)]
pub struct Func {
#[serde(skip)]
pub inner: &'static mz_sql::func::Func,
}
#[derive(Debug, Clone, Serialize)]
pub struct Secret {
pub create_sql: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct Connection {
pub create_sql: String,
pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
pub resolved_ids: ResolvedIds,
}
impl CatalogItem {
pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
match self {
CatalogItem::Table(_) => mz_sql::catalog::CatalogItemType::Table,
CatalogItem::Source(_) => mz_sql::catalog::CatalogItemType::Source,
CatalogItem::Log(_) => mz_sql::catalog::CatalogItemType::Source,
CatalogItem::Sink(_) => mz_sql::catalog::CatalogItemType::Sink,
CatalogItem::View(_) => mz_sql::catalog::CatalogItemType::View,
CatalogItem::MaterializedView(_) => mz_sql::catalog::CatalogItemType::MaterializedView,
CatalogItem::Index(_) => mz_sql::catalog::CatalogItemType::Index,
CatalogItem::Type(_) => mz_sql::catalog::CatalogItemType::Type,
CatalogItem::Func(_) => mz_sql::catalog::CatalogItemType::Func,
CatalogItem::Secret(_) => mz_sql::catalog::CatalogItemType::Secret,
CatalogItem::Connection(_) => mz_sql::catalog::CatalogItemType::Connection,
}
}
pub fn is_storage_collection(&self) -> bool {
match self {
CatalogItem::Table(_) | CatalogItem::Source(_) | CatalogItem::MaterializedView(_) => {
true
}
CatalogItem::Log(_)
| CatalogItem::Sink(_)
| CatalogItem::View(_)
| CatalogItem::Index(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_) => false,
}
}
pub fn desc(&self, name: &FullItemName) -> Result<Cow<RelationDesc>, SqlCatalogError> {
self.desc_opt().ok_or(SqlCatalogError::InvalidDependency {
name: name.to_string(),
typ: self.typ(),
})
}
pub fn desc_opt(&self) -> Option<Cow<RelationDesc>> {
match &self {
CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
CatalogItem::Table(tbl) => Some(Cow::Borrowed(&tbl.desc)),
CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
CatalogItem::MaterializedView(mview) => Some(Cow::Borrowed(&mview.desc)),
CatalogItem::Type(typ) => typ.desc.as_ref().map(Cow::Borrowed),
CatalogItem::Func(_)
| CatalogItem::Index(_)
| CatalogItem::Sink(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_) => None,
}
}
pub fn func(
&self,
entry: &CatalogEntry,
) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
match &self {
CatalogItem::Func(func) => Ok(func.inner),
_ => Err(SqlCatalogError::UnexpectedType {
name: entry.name().item.to_string(),
actual_type: entry.item_type(),
expected_type: CatalogItemType::Func,
}),
}
}
pub fn source_desc(
&self,
entry: &CatalogEntry,
) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
match &self {
CatalogItem::Source(source) => match &source.data_source {
DataSourceDesc::Ingestion(ingestion) => Ok(Some(&ingestion.desc)),
DataSourceDesc::IngestionExport { .. }
| DataSourceDesc::Introspection(_)
| DataSourceDesc::Webhook { .. }
| DataSourceDesc::Progress
| DataSourceDesc::Source => Ok(None),
},
_ => Err(SqlCatalogError::UnexpectedType {
name: entry.name().item.to_string(),
actual_type: entry.item_type(),
expected_type: CatalogItemType::Source,
}),
}
}
pub fn is_progress_source(&self) -> bool {
matches!(
self,
CatalogItem::Source(Source {
data_source: DataSourceDesc::Progress,
..
})
)
}
pub fn references(&self) -> &ResolvedIds {
static EMPTY: Lazy<ResolvedIds> = Lazy::new(|| ResolvedIds(BTreeSet::new()));
match self {
CatalogItem::Func(_) => &*EMPTY,
CatalogItem::Index(idx) => &idx.resolved_ids,
CatalogItem::Sink(sink) => &sink.resolved_ids,
CatalogItem::Source(source) => &source.resolved_ids,
CatalogItem::Log(_) => &*EMPTY,
CatalogItem::Table(table) => &table.resolved_ids,
CatalogItem::Type(typ) => &typ.resolved_ids,
CatalogItem::View(view) => &view.resolved_ids,
CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
CatalogItem::Secret(_) => &*EMPTY,
CatalogItem::Connection(connection) => &connection.resolved_ids,
}
}
pub fn uses(&self) -> BTreeSet<GlobalId> {
let mut uses = self.references().0.clone();
match self {
CatalogItem::Func(_) => {}
CatalogItem::Index(_) => {}
CatalogItem::Sink(_) => {}
CatalogItem::Source(_) => {}
CatalogItem::Log(_) => {}
CatalogItem::Table(_) => {}
CatalogItem::Type(_) => {}
CatalogItem::View(view) => uses.extend(view.raw_expr.depends_on()),
CatalogItem::MaterializedView(mview) => uses.extend(mview.raw_expr.depends_on()),
CatalogItem::Secret(_) => {}
CatalogItem::Connection(_) => {}
}
uses
}
pub fn conn_id(&self) -> Option<&ConnectionId> {
match self {
CatalogItem::View(view) => view.conn_id.as_ref(),
CatalogItem::Index(index) => index.conn_id.as_ref(),
CatalogItem::Table(table) => table.conn_id.as_ref(),
CatalogItem::Log(_)
| CatalogItem::Source(_)
| CatalogItem::Sink(_)
| CatalogItem::MaterializedView(_)
| CatalogItem::Secret(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Connection(_) => None,
}
}
pub fn is_temporary(&self) -> bool {
self.conn_id().is_some()
}
pub fn rename_schema_refs(
&self,
database_name: &str,
cur_schema_name: &str,
new_schema_name: &str,
) -> Result<CatalogItem, (String, String)> {
let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
let mut create_stmt = mz_sql::parse::parse(&create_sql)
.expect("invalid create sql persisted to catalog")
.into_element()
.ast;
mz_sql::ast::transform::create_stmt_rename_schema_refs(
&mut create_stmt,
database_name,
cur_schema_name,
new_schema_name,
)?;
Ok(create_stmt.to_ast_string_stable())
};
match self {
CatalogItem::Table(i) => {
let mut i = i.clone();
i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
Ok(CatalogItem::Table(i))
}
CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
CatalogItem::Source(i) => {
let mut i = i.clone();
i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
Ok(CatalogItem::Source(i))
}
CatalogItem::Sink(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::Sink(i))
}
CatalogItem::View(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::View(i))
}
CatalogItem::MaterializedView(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::MaterializedView(i))
}
CatalogItem::Index(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::Index(i))
}
CatalogItem::Secret(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::Secret(i))
}
CatalogItem::Connection(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::Connection(i))
}
CatalogItem::Type(i) => {
let mut i = i.clone();
i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
Ok(CatalogItem::Type(i))
}
CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
}
}
pub fn rename_item_refs(
&self,
from: FullItemName,
to_item_name: String,
rename_self: bool,
) -> Result<CatalogItem, String> {
let do_rewrite = |create_sql: String| -> Result<String, String> {
let mut create_stmt = mz_sql::parse::parse(&create_sql)
.expect("invalid create sql persisted to catalog")
.into_element()
.ast;
if rename_self {
mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
}
mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
Ok(create_stmt.to_ast_string_stable())
};
match self {
CatalogItem::Table(i) => {
let mut i = i.clone();
i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
Ok(CatalogItem::Table(i))
}
CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
CatalogItem::Source(i) => {
let mut i = i.clone();
i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
Ok(CatalogItem::Source(i))
}
CatalogItem::Sink(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::Sink(i))
}
CatalogItem::View(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::View(i))
}
CatalogItem::MaterializedView(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::MaterializedView(i))
}
CatalogItem::Index(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::Index(i))
}
CatalogItem::Secret(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::Secret(i))
}
CatalogItem::Func(_) | CatalogItem::Type(_) => {
unreachable!("{}s cannot be renamed", self.typ())
}
CatalogItem::Connection(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql)?;
Ok(CatalogItem::Connection(i))
}
}
}
pub fn update_retain_history(
&mut self,
value: Option<Value>,
window: CompactionWindow,
) -> Result<Option<WithOptionValue<Raw>>, ()> {
let update = |ast: &mut Statement<Raw>| {
macro_rules! update_retain_history {
( $stmt:ident, $opt:ident, $name:ident ) => {{
let pos = $stmt
.with_options
.iter()
.rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
if let Some(value) = value {
let next = mz_sql_parser::ast::$opt {
name: mz_sql_parser::ast::$name::RetainHistory,
value: Some(WithOptionValue::RetainHistoryFor(value)),
};
if let Some(idx) = pos {
let previous = $stmt.with_options[idx].clone();
$stmt.with_options[idx] = next;
previous.value
} else {
$stmt.with_options.push(next);
None
}
} else {
if let Some(idx) = pos {
$stmt.with_options.swap_remove(idx).value
} else {
None
}
}
}};
}
let previous = match ast {
Statement::CreateTable(ref mut stmt) => {
update_retain_history!(stmt, TableOption, TableOptionName)
}
Statement::CreateIndex(ref mut stmt) => {
update_retain_history!(stmt, IndexOption, IndexOptionName)
}
Statement::CreateSource(ref mut stmt) => {
update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
}
Statement::CreateMaterializedView(ref mut stmt) => {
update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
}
_ => {
return Err(());
}
};
Ok(previous)
};
let res = self.update_sql(update)?;
let cw = self
.custom_logical_compaction_window_mut()
.expect("item must have compaction window");
*cw = Some(window);
Ok(res)
}
pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
where
F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
{
let create_sql = match self {
CatalogItem::Table(Table { create_sql, .. })
| CatalogItem::Type(Type { create_sql, .. })
| CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
CatalogItem::Sink(Sink { create_sql, .. })
| CatalogItem::View(View { create_sql, .. })
| CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
| CatalogItem::Index(Index { create_sql, .. })
| CatalogItem::Secret(Secret { create_sql, .. })
| CatalogItem::Connection(Connection { create_sql, .. }) => Some(create_sql),
CatalogItem::Func(_) | CatalogItem::Log(_) => None,
};
let Some(create_sql) = create_sql else {
return Err(());
};
let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
.expect("non-system items must be parseable")
.into_element()
.ast;
debug!("rewrite: {}", ast.to_ast_string_redacted());
let t = f(&mut ast)?;
*create_sql = ast.to_ast_string_stable();
debug!("rewrote: {}", ast.to_ast_string_redacted());
Ok(t)
}
pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
match self {
CatalogItem::Index(index) => Some(index.cluster_id),
CatalogItem::Table(_)
| CatalogItem::Source(_)
| CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::MaterializedView(_)
| CatalogItem::Sink(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_) => None,
}
}
pub fn cluster_id(&self) -> Option<ClusterId> {
match self {
CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
CatalogItem::Index(index) => Some(index.cluster_id),
CatalogItem::Source(source) => match &source.data_source {
DataSourceDesc::Ingestion(ingestion) => Some(ingestion.instance_id),
DataSourceDesc::IngestionExport { .. } => None,
DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
DataSourceDesc::Introspection(_)
| DataSourceDesc::Progress
| DataSourceDesc::Source => None,
},
CatalogItem::Sink(sink) => Some(sink.cluster_id),
CatalogItem::Table(_)
| CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_) => None,
}
}
pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
match self {
CatalogItem::Table(table) => table.custom_logical_compaction_window,
CatalogItem::Source(source) => source.custom_logical_compaction_window,
CatalogItem::Index(index) => index.custom_logical_compaction_window,
CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::Sink(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_) => None,
}
}
pub fn custom_logical_compaction_window_mut(
&mut self,
) -> Option<&mut Option<CompactionWindow>> {
let cw = match self {
CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::Sink(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_) => return None,
};
Some(cw)
}
pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
let custom_logical_compaction_window = match self {
CatalogItem::Table(_)
| CatalogItem::Source(_)
| CatalogItem::Index(_)
| CatalogItem::MaterializedView(_) => self.custom_logical_compaction_window(),
CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::Sink(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_) => return None,
};
Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
}
pub fn is_retained_metrics_object(&self) -> bool {
match self {
CatalogItem::Table(table) => table.is_retained_metrics_object,
CatalogItem::Source(source) => source.is_retained_metrics_object,
CatalogItem::Index(index) => index.is_retained_metrics_object,
CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::MaterializedView(_)
| CatalogItem::Sink(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_) => false,
}
}
pub fn to_serialized(&self) -> String {
match self {
CatalogItem::Table(table) => table
.create_sql
.as_ref()
.expect("builtin tables cannot be serialized")
.clone(),
CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
CatalogItem::Source(source) => {
assert!(
!matches!(source.data_source, DataSourceDesc::Introspection(_)),
"cannot serialize introspection/builtin sources",
);
source
.create_sql
.as_ref()
.expect("builtin sources cannot be serialized")
.clone()
}
CatalogItem::View(view) => view.create_sql.clone(),
CatalogItem::MaterializedView(mview) => mview.create_sql.clone(),
CatalogItem::Index(index) => index.create_sql.clone(),
CatalogItem::Sink(sink) => sink.create_sql.clone(),
CatalogItem::Type(typ) => typ
.create_sql
.as_ref()
.expect("builtin types cannot be serialized")
.clone(),
CatalogItem::Secret(secret) => secret.create_sql.clone(),
CatalogItem::Connection(connection) => connection.create_sql.clone(),
CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
}
}
pub fn into_serialized(self) -> String {
match self {
CatalogItem::Table(table) => table
.create_sql
.expect("builtin tables cannot be serialized"),
CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
CatalogItem::Source(source) => {
assert!(
!matches!(source.data_source, DataSourceDesc::Introspection(_)),
"cannot serialize introspection/builtin sources",
);
source
.create_sql
.expect("builtin sources cannot be serialized")
}
CatalogItem::View(view) => view.create_sql,
CatalogItem::MaterializedView(mview) => mview.create_sql,
CatalogItem::Index(index) => index.create_sql,
CatalogItem::Sink(sink) => sink.create_sql,
CatalogItem::Type(typ) => typ.create_sql.expect("builtin types cannot be serialized"),
CatalogItem::Secret(secret) => secret.create_sql,
CatalogItem::Connection(connection) => connection.create_sql,
CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
}
}
}
impl CatalogEntry {
pub fn desc(&self, name: &FullItemName) -> Result<Cow<RelationDesc>, SqlCatalogError> {
self.item.desc(name)
}
pub fn desc_opt(&self) -> Option<Cow<RelationDesc>> {
self.item.desc_opt()
}
pub fn has_columns(&self) -> bool {
self.item.desc_opt().is_some()
}
pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
self.item.func(self)
}
pub fn index(&self) -> Option<&Index> {
match self.item() {
CatalogItem::Index(idx) => Some(idx),
_ => None,
}
}
pub fn source(&self) -> Option<&Source> {
match self.item() {
CatalogItem::Source(src) => Some(src),
_ => None,
}
}
pub fn sink(&self) -> Option<&Sink> {
match self.item() {
CatalogItem::Sink(sink) => Some(sink),
_ => None,
}
}
pub fn secret(&self) -> Option<&Secret> {
match self.item() {
CatalogItem::Secret(secret) => Some(secret),
_ => None,
}
}
pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
match self.item() {
CatalogItem::Connection(connection) => Ok(connection),
_ => {
let db_name = match self.name().qualifiers.database_spec {
ResolvedDatabaseSpecifier::Ambient => "".to_string(),
ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
};
Err(SqlCatalogError::UnknownConnection(format!(
"{}{}.{}",
db_name,
self.name().qualifiers.schema_spec,
self.name().item
)))
}
}
}
pub fn source_desc(
&self,
) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
self.item.source_desc(self)
}
pub fn is_connection(&self) -> bool {
matches!(self.item(), CatalogItem::Connection(_))
}
pub fn is_table(&self) -> bool {
matches!(self.item(), CatalogItem::Table(_))
}
pub fn is_source(&self) -> bool {
matches!(self.item(), CatalogItem::Source(_))
}
pub fn subsource_details(&self) -> Option<(GlobalId, &UnresolvedItemName)> {
match &self.item() {
CatalogItem::Source(source) => match &source.data_source {
DataSourceDesc::IngestionExport {
ingestion_id,
external_reference,
} => Some((*ingestion_id, external_reference)),
_ => None,
},
_ => None,
}
}
pub fn source_exports(&self) -> BTreeMap<GlobalId, usize> {
match &self.item() {
CatalogItem::Source(source) => match &source.data_source {
DataSourceDesc::Ingestion(ingestion) => ingestion
.source_exports
.iter()
.map(|(id, export)| (*id, export.output_index))
.collect(),
_ => BTreeMap::new(),
},
_ => BTreeMap::new(),
}
}
pub fn is_progress_source(&self) -> bool {
self.item().is_progress_source()
}
pub fn progress_id(&self) -> Option<GlobalId> {
match &self.item() {
CatalogItem::Source(source) => match &source.data_source {
DataSourceDesc::Ingestion(ingestion) => Some(ingestion.remap_collection_id),
DataSourceDesc::IngestionExport { .. }
| DataSourceDesc::Introspection(_)
| DataSourceDesc::Progress
| DataSourceDesc::Webhook { .. }
| DataSourceDesc::Source => None,
},
CatalogItem::Table(_)
| CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::MaterializedView(_)
| CatalogItem::Sink(_)
| CatalogItem::Index(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_) => None,
}
}
pub fn is_sink(&self) -> bool {
matches!(self.item(), CatalogItem::Sink(_))
}
pub fn is_materialized_view(&self) -> bool {
matches!(self.item(), CatalogItem::MaterializedView(_))
}
pub fn is_view(&self) -> bool {
matches!(self.item(), CatalogItem::View(_))
}
pub fn is_secret(&self) -> bool {
matches!(self.item(), CatalogItem::Secret(_))
}
pub fn is_introspection_source(&self) -> bool {
matches!(self.item(), CatalogItem::Log(_))
}
pub fn is_index(&self) -> bool {
matches!(self.item(), CatalogItem::Index(_))
}
pub fn is_relation(&self) -> bool {
mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
}
pub fn references(&self) -> &ResolvedIds {
self.item.references()
}
pub fn uses(&self) -> BTreeSet<GlobalId> {
self.item.uses()
}
pub fn item(&self) -> &CatalogItem {
&self.item
}
pub fn id(&self) -> GlobalId {
self.id
}
pub fn oid(&self) -> u32 {
self.oid
}
pub fn name(&self) -> &QualifiedItemName {
&self.name
}
pub fn referenced_by(&self) -> &[GlobalId] {
&self.referenced_by
}
pub fn used_by(&self) -> &[GlobalId] {
&self.used_by
}
pub fn conn_id(&self) -> Option<&ConnectionId> {
self.item.conn_id()
}
pub fn owner_id(&self) -> &RoleId {
&self.owner_id
}
pub fn privileges(&self) -> &PrivilegeMap {
&self.privileges
}
}
#[derive(Debug, Clone, Default)]
pub struct CommentsMap {
map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
}
impl CommentsMap {
pub fn update_comment(
&mut self,
object_id: CommentObjectId,
sub_component: Option<usize>,
comment: Option<String>,
) -> Option<String> {
let object_comments = self.map.entry(object_id).or_default();
let (empty, prev) = if let Some(comment) = comment {
let prev = object_comments.insert(sub_component, comment);
(false, prev)
} else {
let prev = object_comments.remove(&sub_component);
(object_comments.is_empty(), prev)
};
if empty {
self.map.remove(&object_id);
}
prev
}
pub fn drop_comments(
&mut self,
object_id: CommentObjectId,
) -> Vec<(CommentObjectId, Option<usize>, String)> {
match self.map.remove(&object_id) {
None => Vec::new(),
Some(comments) => comments
.into_iter()
.map(|(sub_comp, comment)| (object_id, sub_comp, comment))
.collect(),
}
}
pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
self.map
.iter()
.map(|(id, comments)| {
comments
.iter()
.map(|(pos, comment)| (*id, *pos, comment.as_str()))
})
.flatten()
}
pub fn get_object_comments(
&self,
object_id: CommentObjectId,
) -> Option<&BTreeMap<Option<usize>, String>> {
self.map.get(&object_id)
}
}
impl Serialize for CommentsMap {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let comment_count = self
.map
.iter()
.map(|(_object_id, comments)| comments.len())
.sum();
let mut seq = serializer.serialize_seq(Some(comment_count))?;
for (object_id, sub) in &self.map {
for (sub_component, comment) in sub {
seq.serialize_element(&(
format!("{object_id:?}"),
format!("{sub_component:?}"),
comment,
))?;
}
}
seq.end()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
pub struct DefaultPrivileges {
#[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
struct RoleDefaultPrivileges(
#[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
BTreeMap<RoleId, DefaultPrivilegeAclItem>,
);
impl Deref for RoleDefaultPrivileges {
type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for RoleDefaultPrivileges {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl DefaultPrivileges {
pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
if privilege.acl_mode.is_empty() {
return;
}
let privileges = self.privileges.entry(object).or_default();
if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
default_privilege.acl_mode |= privilege.acl_mode;
} else {
privileges.insert(privilege.grantee, privilege);
}
}
pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
if let Some(privileges) = self.privileges.get_mut(object) {
if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
default_privilege.acl_mode =
default_privilege.acl_mode.difference(privilege.acl_mode);
if default_privilege.acl_mode.is_empty() {
privileges.remove(&privilege.grantee);
}
}
if privileges.is_empty() {
self.privileges.remove(object);
}
}
}
pub fn get_privileges_for_grantee(
&self,
object: &DefaultPrivilegeObject,
grantee: &RoleId,
) -> Option<&AclMode> {
self.privileges
.get(object)
.and_then(|privileges| privileges.get(grantee))
.map(|privilege| &privilege.acl_mode)
}
pub fn get_applicable_privileges(
&self,
role_id: RoleId,
database_id: Option<DatabaseId>,
schema_id: Option<SchemaId>,
object_type: mz_sql::catalog::ObjectType,
) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
let privilege_object_type = if object_type.is_relation() {
mz_sql::catalog::ObjectType::Table
} else {
object_type
};
let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
[
DefaultPrivilegeObject {
role_id,
database_id,
schema_id,
object_type: privilege_object_type,
},
DefaultPrivilegeObject {
role_id,
database_id,
schema_id: None,
object_type: privilege_object_type,
},
DefaultPrivilegeObject {
role_id,
database_id: None,
schema_id: None,
object_type: privilege_object_type,
},
DefaultPrivilegeObject {
role_id: RoleId::Public,
database_id,
schema_id,
object_type: privilege_object_type,
},
DefaultPrivilegeObject {
role_id: RoleId::Public,
database_id,
schema_id: None,
object_type: privilege_object_type,
},
DefaultPrivilegeObject {
role_id: RoleId::Public,
database_id: None,
schema_id: None,
object_type: privilege_object_type,
},
]
.into_iter()
.filter_map(|object| self.privileges.get(&object))
.flat_map(|acl_map| acl_map.values())
.fold(
BTreeMap::new(),
|mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
*accum_acl_mode |= *acl_mode;
accum
},
)
.into_iter()
.map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
.filter(|(_, acl_mode)| !acl_mode.is_empty())
.map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
grantee: *grantee,
acl_mode,
})
}
pub fn iter(
&self,
) -> impl Iterator<
Item = (
&DefaultPrivilegeObject,
impl Iterator<Item = &DefaultPrivilegeAclItem>,
),
> {
self.privileges
.iter()
.map(|(object, acl_map)| (object, acl_map.values()))
}
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
pub struct ClusterConfig {
pub variant: ClusterVariant,
}
impl ClusterConfig {
pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
match &self.variant {
ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
ClusterVariant::Unmanaged => None,
}
}
}
impl From<ClusterConfig> for durable::ClusterConfig {
fn from(config: ClusterConfig) -> Self {
Self {
variant: config.variant.into(),
}
}
}
impl From<durable::ClusterConfig> for ClusterConfig {
fn from(config: durable::ClusterConfig) -> Self {
Self {
variant: config.variant.into(),
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
pub struct ClusterVariantManaged {
pub size: String,
pub availability_zones: Vec<String>,
pub logging: ReplicaLogging,
pub replication_factor: u32,
pub disk: bool,
pub optimizer_feature_overrides: OptimizerFeatureOverrides,
pub schedule: ClusterSchedule,
}
impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
fn from(managed: ClusterVariantManaged) -> Self {
Self {
size: managed.size,
availability_zones: managed.availability_zones,
logging: managed.logging,
replication_factor: managed.replication_factor,
disk: managed.disk,
optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
schedule: managed.schedule,
}
}
}
impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
fn from(managed: durable::ClusterVariantManaged) -> Self {
Self {
size: managed.size,
availability_zones: managed.availability_zones,
logging: managed.logging,
replication_factor: managed.replication_factor,
disk: managed.disk,
optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
schedule: managed.schedule,
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
pub enum ClusterVariant {
Managed(ClusterVariantManaged),
Unmanaged,
}
impl From<ClusterVariant> for durable::ClusterVariant {
fn from(variant: ClusterVariant) -> Self {
match variant {
ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
ClusterVariant::Unmanaged => Self::Unmanaged,
}
}
}
impl From<durable::ClusterVariant> for ClusterVariant {
fn from(variant: durable::ClusterVariant) -> Self {
match variant {
durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
durable::ClusterVariant::Unmanaged => Self::Unmanaged,
}
}
}
impl mz_sql::catalog::CatalogDatabase for Database {
fn name(&self) -> &str {
&self.name
}
fn id(&self) -> DatabaseId {
self.id
}
fn has_schemas(&self) -> bool {
!self.schemas_by_name.is_empty()
}
fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
&self.schemas_by_name
}
#[allow(clippy::as_conversions)]
fn schemas(&self) -> Vec<&dyn CatalogSchema> {
self.schemas_by_id
.values()
.map(|schema| schema as &dyn CatalogSchema)
.collect()
}
fn owner_id(&self) -> RoleId {
self.owner_id
}
fn privileges(&self) -> &PrivilegeMap {
&self.privileges
}
}
impl mz_sql::catalog::CatalogSchema for Schema {
fn database(&self) -> &ResolvedDatabaseSpecifier {
&self.name.database
}
fn name(&self) -> &QualifiedSchemaName {
&self.name
}
fn id(&self) -> &SchemaSpecifier {
&self.id
}
fn has_items(&self) -> bool {
!self.items.is_empty()
}
fn item_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
Box::new(
self.items
.values()
.chain(self.functions.values())
.chain(self.types.values())
.copied(),
)
}
fn owner_id(&self) -> RoleId {
self.owner_id
}
fn privileges(&self) -> &PrivilegeMap {
&self.privileges
}
}
impl mz_sql::catalog::CatalogRole for Role {
fn name(&self) -> &str {
&self.name
}
fn id(&self) -> RoleId {
self.id
}
fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
&self.membership.map
}
fn attributes(&self) -> &RoleAttributes {
&self.attributes
}
fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
&self.vars.map
}
}
impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
fn name(&self) -> &str {
&self.name
}
fn id(&self) -> ClusterId {
self.id
}
fn bound_objects(&self) -> &BTreeSet<GlobalId> {
&self.bound_objects
}
fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
&self.replica_id_by_name_
}
#[allow(clippy::as_conversions)]
fn replicas(&self) -> Vec<&dyn CatalogClusterReplica> {
self.replicas()
.map(|replica| replica as &dyn CatalogClusterReplica)
.collect()
}
fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica {
self.replica(id).expect("catalog out of sync")
}
fn owner_id(&self) -> RoleId {
self.owner_id
}
fn privileges(&self) -> &PrivilegeMap {
&self.privileges
}
fn is_managed(&self) -> bool {
self.is_managed()
}
fn managed_size(&self) -> Option<&str> {
match &self.config.variant {
ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
_ => None,
}
}
fn schedule(&self) -> Option<&ClusterSchedule> {
match &self.config.variant {
ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
_ => None,
}
}
}
impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
fn name(&self) -> &str {
&self.name
}
fn cluster_id(&self) -> ClusterId {
self.cluster_id
}
fn replica_id(&self) -> ReplicaId {
self.replica_id
}
fn owner_id(&self) -> RoleId {
self.owner_id
}
fn internal(&self) -> bool {
self.config.location.internal()
}
}
impl mz_sql::catalog::CatalogItem for CatalogEntry {
fn name(&self) -> &QualifiedItemName {
self.name()
}
fn id(&self) -> GlobalId {
self.id()
}
fn oid(&self) -> u32 {
self.oid()
}
fn desc(&self, name: &FullItemName) -> Result<Cow<RelationDesc>, SqlCatalogError> {
self.desc(name)
}
fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
self.func()
}
fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
self.source_desc()
}
fn connection(
&self,
) -> Result<&mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
{
Ok(&self.connection()?.connection)
}
fn create_sql(&self) -> &str {
match self.item() {
CatalogItem::Table(Table { create_sql, .. }) => {
create_sql.as_deref().unwrap_or("<builtin>")
}
CatalogItem::Source(Source { create_sql, .. }) => {
create_sql.as_deref().unwrap_or("<builtin>")
}
CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
CatalogItem::View(View { create_sql, .. }) => create_sql,
CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
CatalogItem::Index(Index { create_sql, .. }) => create_sql,
CatalogItem::Type(Type { create_sql, .. }) => {
create_sql.as_deref().unwrap_or("<builtin>")
}
CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
CatalogItem::Func(_) => "<builtin>",
CatalogItem::Log(_) => "<builtin>",
}
}
fn item_type(&self) -> SqlCatalogItemType {
self.item().typ()
}
fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
Some((keys, *on))
} else {
None
}
}
fn table_details(&self) -> Option<&[Expr<Aug>]> {
if let CatalogItem::Table(Table { defaults, .. }) = self.item() {
Some(defaults)
} else {
None
}
}
fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
if let CatalogItem::Type(Type { details, .. }) = self.item() {
Some(details)
} else {
None
}
}
fn references(&self) -> &ResolvedIds {
self.references()
}
fn uses(&self) -> BTreeSet<GlobalId> {
self.uses()
}
fn referenced_by(&self) -> &[GlobalId] {
self.referenced_by()
}
fn used_by(&self) -> &[GlobalId] {
self.used_by()
}
fn subsource_details(&self) -> Option<(GlobalId, &UnresolvedItemName)> {
self.subsource_details()
}
fn source_exports(&self) -> BTreeMap<GlobalId, usize> {
self.source_exports()
}
fn is_progress_source(&self) -> bool {
self.is_progress_source()
}
fn progress_id(&self) -> Option<GlobalId> {
self.progress_id()
}
fn owner_id(&self) -> RoleId {
self.owner_id
}
fn privileges(&self) -> &PrivilegeMap {
&self.privileges
}
fn cluster_id(&self) -> Option<ClusterId> {
self.item().cluster_id()
}
}
#[derive(Debug)]
pub struct StateUpdate {
pub kind: StateUpdateKind,
pub diff: Diff,
}
#[derive(Debug)]
pub enum StateUpdateKind {
Role(durable::objects::Role),
Database(durable::objects::Database),
Schema(durable::objects::Schema),
DefaultPrivilege(durable::objects::DefaultPrivilege),
SystemPrivilege(MzAclItem),
SystemConfiguration(durable::objects::SystemConfiguration),
Cluster(durable::objects::Cluster),
IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
ClusterReplica(durable::objects::ClusterReplica),
SystemObjectMapping(durable::objects::SystemObjectMapping),
Item(durable::objects::Item),
Comment(durable::objects::Comment),
AuditLog(durable::objects::AuditLog),
StorageUsage(durable::objects::StorageUsage),
StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
UnfinalizedShard(durable::objects::UnfinalizedShard),
}