use std::fmt::{Display, Formatter, Write};
use std::ops::Deref;
use std::str::FromStr;
use mz_persist::location::SeqNo;
use proptest_derive::Arbitrary;
use semver::Version;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::internal::encoding::parse_id;
use crate::{ShardId, WriterId};
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct PartId(pub(crate) [u8; 16]);
impl std::fmt::Display for PartId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "p{}", Uuid::from_bytes(self.0))
}
}
impl std::fmt::Debug for PartId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PartId({})", Uuid::from_bytes(self.0))
}
}
impl FromStr for PartId {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
parse_id('p', "PartId", s).map(PartId)
}
}
impl PartId {
pub(crate) fn new() -> Self {
PartId(*Uuid::new_v4().as_bytes())
}
}
#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Clone)]
pub enum WriterKey {
Id(WriterId),
Version(String),
}
impl WriterKey {
pub fn for_version(version: &Version) -> WriterKey {
assert!(version.major <= 99);
assert!(version.minor <= 999);
assert!(version.patch <= 99);
WriterKey::Version(format!(
"{:02}{:03}{:02}",
version.major, version.minor, version.patch
))
}
}
impl FromStr for WriterKey {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.is_empty() {
return Err("empty version string".to_owned());
}
let key = match &s[..1] {
"w" => WriterKey::Id(WriterId::from_str(s)?),
"n" => WriterKey::Version(s[1..].to_owned()),
c => {
return Err(format!("unknown prefix for version: {c}"));
}
};
Ok(key)
}
}
impl Display for WriterKey {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
WriterKey::Id(id) => id.fmt(f),
WriterKey::Version(s) => {
f.write_char('n')?;
f.write_str(s)
}
}
}
}
#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct PartialBatchKey(pub(crate) String);
fn split_batch_key(key: &str) -> Result<(WriterKey, PartId), String> {
let (writer_key, part_id) = key
.split_once('/')
.ok_or("partial batch key should contain a /".to_owned())?;
let writer_key = WriterKey::from_str(writer_key)?;
let part_id = PartId::from_str(part_id)?;
Ok((writer_key, part_id))
}
impl PartialBatchKey {
pub fn new(version: &WriterKey, part_id: &PartId) -> Self {
PartialBatchKey(format!("{}/{}", version, part_id))
}
pub fn split(&self) -> Option<(WriterKey, PartId)> {
split_batch_key(&self.0).ok()
}
pub fn complete(&self, shard_id: &ShardId) -> BlobKey {
BlobKey(format!("{}/{}", shard_id, self))
}
}
impl std::fmt::Display for PartialBatchKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct RollupId(pub(crate) [u8; 16]);
impl std::fmt::Display for RollupId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "r{}", Uuid::from_bytes(self.0))
}
}
impl std::fmt::Debug for RollupId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "RollupId({})", Uuid::from_bytes(self.0))
}
}
impl FromStr for RollupId {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
parse_id('r', "RollupId", s).map(RollupId)
}
}
impl RollupId {
pub(crate) fn new() -> Self {
RollupId(*Uuid::new_v4().as_bytes())
}
}
#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct PartialRollupKey(pub(crate) String);
impl PartialRollupKey {
pub fn new(seqno: SeqNo, rollup_id: &RollupId) -> Self {
PartialRollupKey(format!("{}/{}", seqno, rollup_id))
}
pub fn complete(&self, shard_id: &ShardId) -> BlobKey {
BlobKey(format!("{}/{}", shard_id, self))
}
}
impl std::fmt::Display for PartialRollupKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
impl Deref for PartialRollupKey {
type Target = String;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Debug, PartialEq)]
pub enum PartialBlobKey {
Batch(WriterKey, PartId),
Rollup(SeqNo, RollupId),
}
#[derive(Clone, Debug, PartialEq)]
pub struct BlobKey(String);
impl std::fmt::Display for BlobKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
impl Deref for BlobKey {
type Target = String;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl BlobKey {
pub fn parse_ids(key: &str) -> Result<(ShardId, PartialBlobKey), String> {
let err = || {
format!("invalid blob key format. expected either <shard_id>/<writer_id>/<part_id> or <shard_id>/<seqno>/<rollup_id>. got: {}", key)
};
let (shard, blob) = key.split_once('/').ok_or(err())?;
let shard_id = ShardId::from_str(shard)?;
let blob_key = if blob.starts_with('w') | blob.starts_with('n') {
let (writer, part) = split_batch_key(blob)?;
PartialBlobKey::Batch(writer, part)
} else {
let (seqno, rollup) = blob.split_once('/').ok_or(err())?;
PartialBlobKey::Rollup(SeqNo::from_str(seqno)?, RollupId::from_str(rollup)?)
};
Ok((shard_id, blob_key))
}
}
#[derive(Debug)]
pub enum BlobKeyPrefix<'a> {
All,
Shard(&'a ShardId),
#[cfg(test)]
Writer(&'a ShardId, &'a WriterKey),
#[cfg(test)]
Rollups(&'a ShardId),
}
impl std::fmt::Display for BlobKeyPrefix<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
BlobKeyPrefix::All => "".into(),
BlobKeyPrefix::Shard(shard) => format!("{}", shard),
#[cfg(test)]
BlobKeyPrefix::Writer(shard, writer) => format!("{}/{}", shard, writer),
#[cfg(test)]
BlobKeyPrefix::Rollups(shard) => format!("{}/v", shard),
};
f.write_str(&s)
}
}
#[cfg(test)]
mod tests {
use super::*;
use proptest::prelude::*;
use semver::Version;
fn gen_version() -> impl Strategy<Value = Version> {
(0u64..=99, 0u64..=999, 0u64..=99)
.prop_map(|(major, minor, patch)| Version::new(major, minor, patch))
}
#[mz_ore::test]
fn key_ordering_compatible() {
proptest!(|(a in gen_version(), b in gen_version())| {
let a_key = WriterKey::for_version(&a);
let b_key = WriterKey::for_version(&b);
if a >= b {
assert!(a_key >= b_key);
}
if a <= b {
assert!(a_key <= b_key);
}
})
}
#[mz_ore::test]
fn partial_blob_key_completion() {
let (shard_id, writer_id, part_id) = (ShardId::new(), WriterId::new(), PartId::new());
let partial_key = PartialBatchKey::new(&WriterKey::Id(writer_id.clone()), &part_id);
assert_eq!(
partial_key.complete(&shard_id),
BlobKey(format!("{}/{}/{}", shard_id, writer_id, part_id))
);
}
#[mz_ore::test]
fn blob_key_parse() -> Result<(), String> {
let (shard_id, writer_id, part_id) = (ShardId::new(), WriterId::new(), PartId::new());
assert_eq!(
BlobKey::parse_ids(&format!("{}/{}/{}", shard_id, writer_id, part_id)),
Ok((
shard_id,
PartialBlobKey::Batch(WriterKey::Id(writer_id), part_id.clone())
))
);
let version = Version::new(1, 0, 0);
assert_eq!(
BlobKey::parse_ids(&format!(
"{}/{}/{}",
shard_id,
WriterKey::for_version(&version),
part_id
)),
Ok((
shard_id,
PartialBlobKey::Batch(WriterKey::for_version(&version), part_id)
))
);
assert!(matches!(
BlobKey::parse_ids(&format!("{}/{}", WriterId::new(), PartId::new())),
Err(_)
));
assert!(matches!(
BlobKey::parse_ids(&format!(
"{}/{}/{}/{}",
ShardId::new(),
WriterId::new(),
PartId::new(),
PartId::new()
)),
Err(_)
));
assert!(matches!(BlobKey::parse_ids("abc/def/ghi"), Err(_)));
assert!(matches!(BlobKey::parse_ids(""), Err(_)));
assert!(matches!(
BlobKey::parse_ids(&format!(
"{}/{}/{}",
PartId::new(),
ShardId::new(),
WriterId::new()
)),
Err(_)
));
Ok(())
}
}