1use std::fmt::{Display, Formatter, Write};
11use std::ops::Deref;
12use std::str::FromStr;
13
14use mz_persist::location::SeqNo;
15use proptest_derive::Arbitrary;
16use semver::Version;
17use serde::{Deserialize, Serialize};
18use uuid::Uuid;
19
20use crate::internal::encoding::parse_id;
21use crate::{ShardId, WriterId};
22
23#[derive(Clone, PartialEq, Eq, Hash)]
26pub struct PartId(pub(crate) [u8; 16]);
27
28impl std::fmt::Display for PartId {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 write!(f, "p{}", Uuid::from_bytes(self.0))
31 }
32}
33
34impl std::fmt::Debug for PartId {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 write!(f, "PartId({})", Uuid::from_bytes(self.0))
37 }
38}
39
40impl FromStr for PartId {
41 type Err = String;
42
43 fn from_str(s: &str) -> Result<Self, Self::Err> {
44 parse_id('p', "PartId", s).map(PartId)
45 }
46}
47
48impl PartId {
49 pub(crate) fn new() -> Self {
50 PartId(*Uuid::new_v4().as_bytes())
51 }
52}
53
54#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Clone)]
62pub enum WriterKey {
63 Id(WriterId),
64 Version(String),
65}
66
67impl WriterKey {
68 pub fn for_version(version: &Version) -> WriterKey {
71 assert!(version.major <= 99);
72 assert!(version.minor <= 999);
73 assert!(version.patch <= 99);
74 WriterKey::Version(format!(
75 "{:02}{:03}{:02}",
76 version.major, version.minor, version.patch
77 ))
78 }
79}
80
81impl FromStr for WriterKey {
82 type Err = String;
83
84 fn from_str(s: &str) -> Result<Self, Self::Err> {
85 if s.is_empty() {
86 return Err("empty version string".to_owned());
87 }
88
89 let key = match &s[..1] {
90 "w" => WriterKey::Id(WriterId::from_str(s)?),
91 "n" => WriterKey::Version(s[1..].to_owned()),
92 c => {
93 return Err(format!("unknown prefix for version: {c}"));
94 }
95 };
96 Ok(key)
97 }
98}
99
100impl Display for WriterKey {
101 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
102 match self {
103 WriterKey::Id(id) => id.fmt(f),
104 WriterKey::Version(s) => {
105 f.write_char('n')?;
106 f.write_str(s)
107 }
108 }
109 }
110}
111
112#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
120pub struct PartialBatchKey(pub(crate) String);
121
122fn split_batch_key(key: &str) -> Result<(WriterKey, PartId), String> {
123 let (writer_key, part_id) = key
124 .split_once('/')
125 .ok_or_else(|| "partial batch key should contain a /".to_owned())?;
126
127 let writer_key = WriterKey::from_str(writer_key)?;
128 let part_id = PartId::from_str(part_id)?;
129 Ok((writer_key, part_id))
130}
131
132impl PartialBatchKey {
133 pub fn new(version: &WriterKey, part_id: &PartId) -> Self {
134 PartialBatchKey(format!("{}/{}", version, part_id))
135 }
136
137 pub fn split(&self) -> Option<(WriterKey, PartId)> {
138 split_batch_key(&self.0).ok()
139 }
140
141 pub fn complete(&self, shard_id: &ShardId) -> BlobKey {
142 BlobKey(format!("{}/{}", shard_id, self))
143 }
144}
145
146impl std::fmt::Display for PartialBatchKey {
147 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 f.write_str(&self.0)
149 }
150}
151
152#[derive(Clone, PartialEq, Eq, Hash)]
154pub struct RollupId(pub(crate) [u8; 16]);
155
156impl std::fmt::Display for RollupId {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 write!(f, "r{}", Uuid::from_bytes(self.0))
159 }
160}
161
162impl std::fmt::Debug for RollupId {
163 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164 write!(f, "RollupId({})", Uuid::from_bytes(self.0))
165 }
166}
167
168impl FromStr for RollupId {
169 type Err = String;
170
171 fn from_str(s: &str) -> Result<Self, Self::Err> {
172 parse_id('r', "RollupId", s).map(RollupId)
173 }
174}
175
176impl RollupId {
177 pub(crate) fn new() -> Self {
178 RollupId(*Uuid::new_v4().as_bytes())
179 }
180}
181
182#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
190pub struct PartialRollupKey(pub(crate) String);
191
192impl PartialRollupKey {
193 pub fn new(seqno: SeqNo, rollup_id: &RollupId) -> Self {
194 PartialRollupKey(format!("{}/{}", seqno, rollup_id))
195 }
196
197 pub fn complete(&self, shard_id: &ShardId) -> BlobKey {
198 BlobKey(format!("{}/{}", shard_id, self))
199 }
200}
201
202impl std::fmt::Display for PartialRollupKey {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 f.write_str(&self.0)
205 }
206}
207
208impl Deref for PartialRollupKey {
209 type Target = String;
210
211 fn deref(&self) -> &Self::Target {
212 &self.0
213 }
214}
215
216#[derive(Debug, PartialEq)]
220pub enum PartialBlobKey {
221 Batch(WriterKey, PartId),
223 Rollup(SeqNo, RollupId),
225}
226
227#[derive(Clone, Debug, PartialEq)]
234pub struct BlobKey(String);
235
236impl std::fmt::Display for BlobKey {
237 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238 f.write_str(&self.0)
239 }
240}
241
242impl Deref for BlobKey {
243 type Target = String;
244
245 fn deref(&self) -> &Self::Target {
246 &self.0
247 }
248}
249
250impl BlobKey {
251 pub fn parse_ids(key: &str) -> Result<(ShardId, PartialBlobKey), String> {
252 let err = || {
253 format!(
254 "invalid blob key format. expected either <shard_id>/<writer_id>/<part_id> or <shard_id>/<seqno>/<rollup_id>. got: {}",
255 key
256 )
257 };
258 let (shard, blob) = key.split_once('/').ok_or_else(err)?;
259 let shard_id = ShardId::from_str(shard)?;
260
261 let blob_key = if blob.starts_with('w') | blob.starts_with('n') {
262 let (writer, part) = split_batch_key(blob)?;
263 PartialBlobKey::Batch(writer, part)
264 } else {
265 let (seqno, rollup) = blob.split_once('/').ok_or_else(err)?;
266 PartialBlobKey::Rollup(SeqNo::from_str(seqno)?, RollupId::from_str(rollup)?)
267 };
268 Ok((shard_id, blob_key))
269 }
270}
271
272#[derive(Debug)]
274pub enum BlobKeyPrefix<'a> {
275 All,
277 Shard(&'a ShardId),
279 #[cfg(test)]
281 Writer(&'a ShardId, &'a WriterKey),
282 #[cfg(test)]
284 Rollups(&'a ShardId),
285}
286
287impl std::fmt::Display for BlobKeyPrefix<'_> {
288 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289 let s = match self {
290 BlobKeyPrefix::All => "".into(),
291 BlobKeyPrefix::Shard(shard) => format!("{}", shard),
292 #[cfg(test)]
293 BlobKeyPrefix::Writer(shard, writer) => format!("{}/{}", shard, writer),
294 #[cfg(test)]
295 BlobKeyPrefix::Rollups(shard) => format!("{}/v", shard),
296 };
297 f.write_str(&s)
298 }
299}
300
301#[cfg(test)]
302mod tests {
303 use super::*;
304 use proptest::prelude::*;
305 use semver::Version;
306
307 fn gen_version() -> impl Strategy<Value = Version> {
308 (0u64..=99, 0u64..=999, 0u64..=99)
309 .prop_map(|(major, minor, patch)| Version::new(major, minor, patch))
310 }
311
312 #[mz_ore::test]
313 fn key_ordering_compatible() {
314 proptest!(|(a in gen_version(), b in gen_version())| {
317 let a_key = WriterKey::for_version(&a);
318 let b_key = WriterKey::for_version(&b);
319 if a >= b {
320 assert!(a_key >= b_key);
321 }
322 if a <= b {
323 assert!(a_key <= b_key);
324 }
325 })
326 }
327
328 #[mz_ore::test]
329 fn partial_blob_key_completion() {
330 let (shard_id, writer_id, part_id) = (ShardId::new(), WriterId::new(), PartId::new());
331 let partial_key = PartialBatchKey::new(&WriterKey::Id(writer_id.clone()), &part_id);
332 assert_eq!(
333 partial_key.complete(&shard_id),
334 BlobKey(format!("{}/{}/{}", shard_id, writer_id, part_id))
335 );
336 }
337
338 #[mz_ore::test]
339 fn blob_key_parse() -> Result<(), String> {
340 let (shard_id, writer_id, part_id) = (ShardId::new(), WriterId::new(), PartId::new());
341
342 assert_eq!(
344 BlobKey::parse_ids(&format!("{}/{}/{}", shard_id, writer_id, part_id)),
345 Ok((
346 shard_id,
347 PartialBlobKey::Batch(WriterKey::Id(writer_id), part_id.clone())
348 ))
349 );
350
351 let version = Version::new(1, 0, 0);
352 assert_eq!(
353 BlobKey::parse_ids(&format!(
354 "{}/{}/{}",
355 shard_id,
356 WriterKey::for_version(&version),
357 part_id
358 )),
359 Ok((
360 shard_id,
361 PartialBlobKey::Batch(WriterKey::for_version(&version), part_id)
362 ))
363 );
364
365 assert!(matches!(
367 BlobKey::parse_ids(&format!("{}/{}", WriterId::new(), PartId::new())),
368 Err(_)
369 ));
370 assert!(matches!(
371 BlobKey::parse_ids(&format!(
372 "{}/{}/{}/{}",
373 ShardId::new(),
374 WriterId::new(),
375 PartId::new(),
376 PartId::new()
377 )),
378 Err(_)
379 ));
380 assert!(matches!(BlobKey::parse_ids("abc/def/ghi"), Err(_)));
381 assert!(matches!(BlobKey::parse_ids(""), Err(_)));
382
383 assert!(matches!(
385 BlobKey::parse_ids(&format!(
386 "{}/{}/{}",
387 PartId::new(),
388 ShardId::new(),
389 WriterId::new()
390 )),
391 Err(_)
392 ));
393
394 Ok(())
395 }
396}