use std::error::Error;
use std::fmt::{self, Debug, Display};
use std::str::FromStr;
use itertools::Itertools;
use mz_ore::assert_none;
use mz_ore::url::SensitiveUrl;
use mz_persist_types::codec_impls::UnitSchema;
use mz_persist_types::schema::SchemaId;
use mz_persist_types::stats::PartStats;
use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
use mz_persist_types::{PersistLocation, ShardId};
use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
use mz_repr::{Datum, GlobalId, RelationDesc, Row, ScalarType};
use mz_sql_parser::ast::UnresolvedItemName;
use mz_timely_util::antichain::AntichainExt;
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};
use timely::progress::Antichain;
use tracing::error;
use crate::errors::DataflowError;
use crate::instances::StorageInstanceId;
use crate::sources::SourceData;
include!(concat!(env!("OUT_DIR"), "/mz_storage_types.controller.rs"));
#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct CollectionMetadata {
pub persist_location: PersistLocation,
pub remap_shard: Option<ShardId>,
pub data_shard: ShardId,
pub relation_desc: RelationDesc,
pub txns_shard: Option<ShardId>,
}
impl crate::AlterCompatible for CollectionMetadata {
fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), self::AlterError> {
if self == other {
return Ok(());
}
let CollectionMetadata {
persist_location: _,
remap_shard,
data_shard,
relation_desc,
txns_shard,
} = self;
let compatibility_checks = [
(remap_shard == &other.remap_shard, "remap_shard"),
(data_shard == &other.data_shard, "data_shard"),
(relation_desc == &other.relation_desc, "relation_desc"),
(txns_shard == &other.txns_shard, "txns_shard"),
];
for (compatible, field) in compatibility_checks {
if !compatible {
tracing::warn!(
"CollectionMetadata incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
self,
other
);
return Err(AlterError { id });
}
}
Ok(())
}
}
impl RustType<ProtoCollectionMetadata> for CollectionMetadata {
fn into_proto(&self) -> ProtoCollectionMetadata {
ProtoCollectionMetadata {
blob_uri: self.persist_location.blob_uri.to_string_unredacted(),
consensus_uri: self.persist_location.consensus_uri.to_string_unredacted(),
data_shard: self.data_shard.to_string(),
remap_shard: self.remap_shard.map(|s| s.to_string()),
relation_desc: Some(self.relation_desc.into_proto()),
txns_shard: self.txns_shard.map(|x| x.to_string()),
}
}
fn from_proto(value: ProtoCollectionMetadata) -> Result<Self, TryFromProtoError> {
Ok(CollectionMetadata {
persist_location: PersistLocation {
blob_uri: SensitiveUrl::from_str(&value.blob_uri)?,
consensus_uri: SensitiveUrl::from_str(&value.consensus_uri)?,
},
remap_shard: value
.remap_shard
.map(|s| s.parse().map_err(TryFromProtoError::InvalidShardId))
.transpose()?,
data_shard: value
.data_shard
.parse()
.map_err(TryFromProtoError::InvalidShardId)?,
relation_desc: value
.relation_desc
.into_rust_if_some("ProtoCollectionMetadata::relation_desc")?,
txns_shard: value
.txns_shard
.map(|s| s.parse().map_err(TryFromProtoError::InvalidShardId))
.transpose()?,
})
}
}
#[derive(Arbitrary, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)]
pub struct DurableCollectionMetadata {
pub data_shard: ShardId,
}
impl RustType<ProtoDurableCollectionMetadata> for DurableCollectionMetadata {
fn into_proto(&self) -> ProtoDurableCollectionMetadata {
ProtoDurableCollectionMetadata {
data_shard: self.data_shard.to_string(),
}
}
fn from_proto(value: ProtoDurableCollectionMetadata) -> Result<Self, TryFromProtoError> {
Ok(DurableCollectionMetadata {
data_shard: value
.data_shard
.parse()
.map_err(TryFromProtoError::InvalidShardId)?,
})
}
}
#[derive(Debug)]
pub enum StorageError<T> {
SourceIdReused(GlobalId),
SinkIdReused(GlobalId),
IdentifierMissing(GlobalId),
IdentifierInvalid(GlobalId),
UpdateBeyondUpper(GlobalId),
ReadBeforeSince(GlobalId),
InvalidUppers(Vec<InvalidUpper<T>>),
IngestionInstanceMissing {
storage_instance_id: StorageInstanceId,
ingestion_id: GlobalId,
},
ExportInstanceMissing {
storage_instance_id: StorageInstanceId,
export_id: GlobalId,
},
DataflowError(DataflowError),
InvalidAlter(AlterError),
InvalidUsage(String),
ResourceExhausted(&'static str),
ShuttingDown(&'static str),
CollectionMetadataAlreadyExists(GlobalId),
PersistShardAlreadyInUse(ShardId),
PersistSchemaEvolveRace {
global_id: GlobalId,
shard_id: ShardId,
schema_id: SchemaId,
relation_desc: RelationDesc,
},
PersistInvalidSchemaEvolve {
global_id: GlobalId,
shard_id: ShardId,
},
TxnWalShardAlreadyExists,
MissingSubsourceReference {
ingestion_id: GlobalId,
reference: UnresolvedItemName,
},
RtrTimeout(GlobalId),
RtrDropFailure(GlobalId),
Generic(anyhow::Error),
ReadOnly,
}
impl<T: Debug + Display + 'static> Error for StorageError<T> {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::SourceIdReused(_) => None,
Self::SinkIdReused(_) => None,
Self::IdentifierMissing(_) => None,
Self::IdentifierInvalid(_) => None,
Self::UpdateBeyondUpper(_) => None,
Self::ReadBeforeSince(_) => None,
Self::InvalidUppers(_) => None,
Self::IngestionInstanceMissing { .. } => None,
Self::ExportInstanceMissing { .. } => None,
Self::DataflowError(err) => Some(err),
Self::InvalidAlter { .. } => None,
Self::InvalidUsage(_) => None,
Self::ResourceExhausted(_) => None,
Self::ShuttingDown(_) => None,
Self::CollectionMetadataAlreadyExists(_) => None,
Self::PersistShardAlreadyInUse(_) => None,
Self::PersistSchemaEvolveRace { .. } => None,
Self::PersistInvalidSchemaEvolve { .. } => None,
Self::TxnWalShardAlreadyExists => None,
Self::MissingSubsourceReference { .. } => None,
Self::RtrTimeout(_) => None,
Self::RtrDropFailure(_) => None,
Self::Generic(err) => err.source(),
Self::ReadOnly => None,
}
}
}
impl<T: fmt::Display + 'static> fmt::Display for StorageError<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("storage error: ")?;
match self {
Self::SourceIdReused(id) => write!(
f,
"source identifier was re-created after having been dropped: {id}"
),
Self::SinkIdReused(id) => write!(
f,
"sink identifier was re-created after having been dropped: {id}"
),
Self::IdentifierMissing(id) => write!(f, "collection identifier is not present: {id}"),
Self::IdentifierInvalid(id) => write!(f, "collection identifier is invalid {id}"),
Self::UpdateBeyondUpper(id) => {
write!(
f,
"append batch for {id} contained update at or beyond its upper"
)
}
Self::ReadBeforeSince(id) => {
write!(f, "read for {id} was at a timestamp before its since")
}
Self::InvalidUppers(id) => {
write!(
f,
"expected upper was different from the actual upper for: {}",
id.iter()
.map(|InvalidUpper { id, current_upper }| {
format!("(id: {}; actual upper: {})", id, current_upper.pretty())
})
.join(", ")
)
}
Self::IngestionInstanceMissing {
storage_instance_id,
ingestion_id,
} => write!(
f,
"instance {} missing for ingestion {}",
storage_instance_id, ingestion_id
),
Self::ExportInstanceMissing {
storage_instance_id,
export_id,
} => write!(
f,
"instance {} missing for export {}",
storage_instance_id, export_id
),
Self::DataflowError(_err) => write!(f, "dataflow failed to process request",),
Self::InvalidAlter(err) => std::fmt::Display::fmt(err, f),
Self::InvalidUsage(err) => write!(f, "invalid usage: {}", err),
Self::ResourceExhausted(rsc) => write!(f, "{rsc} is exhausted"),
Self::ShuttingDown(cmp) => write!(f, "{cmp} is shutting down"),
Self::CollectionMetadataAlreadyExists(key) => {
write!(f, "storage metadata for '{key}' already exists")
}
Self::PersistShardAlreadyInUse(shard) => {
write!(f, "persist shard already in use: {shard}")
}
Self::PersistSchemaEvolveRace {
global_id,
shard_id,
..
} => {
write!(f, "persist raced when trying to evolve the schema of a shard: {global_id}, {shard_id}")
}
Self::PersistInvalidSchemaEvolve {
global_id,
shard_id,
} => {
write!(
f,
"persist shard evolved in an invalid way: {global_id}, {shard_id}"
)
}
Self::TxnWalShardAlreadyExists => {
write!(f, "txn WAL already exists")
}
Self::MissingSubsourceReference {
ingestion_id,
reference,
} => write!(
f,
"ingestion {ingestion_id} unexpectedly missing reference to {}",
reference
),
Self::RtrTimeout(_) => {
write!(f, "timed out before ingesting the source's visible frontier when real-time-recency query issued")
}
Self::RtrDropFailure(_) => write!(
f,
"real-time source dropped before ingesting the upstream system's visible frontier"
),
Self::Generic(err) => std::fmt::Display::fmt(err, f),
Self::ReadOnly => write!(f, "cannot write in read-only mode"),
}
}
}
#[derive(Debug, Clone)]
pub struct InvalidUpper<T> {
pub id: GlobalId,
pub current_upper: Antichain<T>,
}
#[derive(Debug)]
pub struct AlterError {
pub id: GlobalId,
}
impl Error for AlterError {}
impl fmt::Display for AlterError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{} cannot be altered in the requested way", self.id)
}
}
impl<T> From<AlterError> for StorageError<T> {
fn from(error: AlterError) -> Self {
Self::InvalidAlter(error)
}
}
impl<T> From<DataflowError> for StorageError<T> {
fn from(error: DataflowError) -> Self {
Self::DataflowError(error)
}
}
#[derive(Debug)]
pub struct TxnsCodecRow;
impl TxnsCodecRow {
pub fn desc() -> RelationDesc {
RelationDesc::builder()
.with_column("shard_id", ScalarType::String.nullable(false))
.with_column("ts", ScalarType::UInt64.nullable(false))
.with_column("batch", ScalarType::Bytes.nullable(true))
.finish()
}
}
impl TxnsCodec for TxnsCodecRow {
type Key = SourceData;
type Val = ();
fn schemas() -> (
<Self::Key as mz_persist_types::Codec>::Schema,
<Self::Val as mz_persist_types::Codec>::Schema,
) {
(Self::desc(), UnitSchema)
}
fn encode(e: TxnsEntry) -> (Self::Key, Self::Val) {
let row = match &e {
TxnsEntry::Register(data_id, ts) => Row::pack([
Datum::from(data_id.to_string().as_str()),
Datum::from(u64::from_le_bytes(*ts)),
Datum::Null,
]),
TxnsEntry::Append(data_id, ts, batch) => Row::pack([
Datum::from(data_id.to_string().as_str()),
Datum::from(u64::from_le_bytes(*ts)),
Datum::from(batch.as_slice()),
]),
};
(SourceData(Ok(row)), ())
}
fn decode(row: SourceData, _: ()) -> TxnsEntry {
let mut datums = row.0.as_ref().expect("valid entry").iter();
let data_id = datums.next().expect("valid entry").unwrap_str();
let data_id = data_id.parse::<ShardId>().expect("valid entry");
let ts = datums.next().expect("valid entry");
let ts = u64::to_le_bytes(ts.unwrap_uint64());
let batch = datums.next().expect("valid entry");
assert_none!(datums.next());
if batch.is_null() {
TxnsEntry::Register(data_id, ts)
} else {
TxnsEntry::Append(data_id, ts, batch.unwrap_bytes().to_vec())
}
}
fn should_fetch_part(data_id: &ShardId, stats: &PartStats) -> Option<bool> {
let stats = stats
.key
.col("key")?
.try_as_optional_struct()
.map_err(|err| error!("unexpected stats type for col 'key': {}", err))
.ok()?;
let stats = stats
.some
.col("shard_id")?
.try_as_string()
.map_err(|err| error!("unexpected stats type for col 'shard_id': {}", err))
.ok()?;
let data_id_str = data_id.to_string();
Some(stats.lower <= data_id_str && stats.upper >= data_id_str)
}
}