mz_persist_client/internal/
paths.rs1use std::fmt::{Display, Formatter, Write};
11use std::ops::Deref;
12use std::str::FromStr;
13
14use mz_persist::location::SeqNo;
15use proptest_derive::Arbitrary;
16use semver::Version;
17use serde::{Deserialize, Serialize};
18use uuid::Uuid;
19
20use crate::internal::encoding::parse_id;
21use crate::{ShardId, WriterId};
22
23#[derive(Clone, PartialEq, Eq, Hash)]
26pub struct PartId(pub(crate) [u8; 16]);
27
28impl std::fmt::Display for PartId {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 write!(f, "p{}", Uuid::from_bytes(self.0))
31 }
32}
33
34impl std::fmt::Debug for PartId {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 write!(f, "PartId({})", Uuid::from_bytes(self.0))
37 }
38}
39
40impl FromStr for PartId {
41 type Err = String;
42
43 fn from_str(s: &str) -> Result<Self, Self::Err> {
44 parse_id("p", "PartId", s).map(PartId)
45 }
46}
47
48impl PartId {
49 pub(crate) fn new() -> Self {
50 PartId(*Uuid::new_v4().as_bytes())
51 }
52}
53
54#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Clone)]
62pub enum WriterKey {
63 Id(WriterId),
64 Version(String),
65}
66
67impl WriterKey {
68 pub fn for_version(version: &Version) -> WriterKey {
71 assert!(version.major <= 99);
72 assert!(version.minor <= 999);
73 assert!(version.patch <= 99);
74 WriterKey::Version(format!(
75 "{:02}{:03}{:02}",
76 version.major, version.minor, version.patch
77 ))
78 }
79}
80
81impl FromStr for WriterKey {
82 type Err = String;
83
84 fn from_str(s: &str) -> Result<Self, Self::Err> {
85 if s.is_empty() {
86 return Err("empty version string".to_owned());
87 }
88
89 let key = match &s[..1] {
90 "w" => WriterKey::Id(WriterId::from_str(s)?),
91 "n" => WriterKey::Version(s[1..].to_owned()),
92 c => {
93 return Err(format!("unknown prefix for version: {c}"));
94 }
95 };
96 Ok(key)
97 }
98}
99
100impl Display for WriterKey {
101 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
102 match self {
103 WriterKey::Id(id) => id.fmt(f),
104 WriterKey::Version(s) => {
105 f.write_char('n')?;
106 f.write_str(s)
107 }
108 }
109 }
110}
111
112#[derive(
120 Arbitrary,
121 Clone,
122 Debug,
123 PartialEq,
124 Eq,
125 PartialOrd,
126 Ord,
127 Hash,
128 Serialize,
129 Deserialize
130)]
131pub struct PartialBatchKey(pub(crate) String);
132
133fn split_batch_key(key: &str) -> Result<(WriterKey, PartId), String> {
134 let (writer_key, part_id) = key
135 .split_once('/')
136 .ok_or_else(|| "partial batch key should contain a /".to_owned())?;
137
138 let writer_key = WriterKey::from_str(writer_key)?;
139 let part_id = PartId::from_str(part_id)?;
140 Ok((writer_key, part_id))
141}
142
143impl PartialBatchKey {
144 pub fn new(version: &WriterKey, part_id: &PartId) -> Self {
145 PartialBatchKey(format!("{}/{}", version, part_id))
146 }
147
148 pub fn split(&self) -> Option<(WriterKey, PartId)> {
149 split_batch_key(&self.0).ok()
150 }
151
152 pub fn complete(&self, shard_id: &ShardId) -> BlobKey {
153 BlobKey(format!("{}/{}", shard_id, self))
154 }
155}
156
157impl std::fmt::Display for PartialBatchKey {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 f.write_str(&self.0)
160 }
161}
162
163#[derive(Clone, PartialEq, Eq, Hash)]
165pub struct RollupId(pub(crate) [u8; 16]);
166
167impl std::fmt::Display for RollupId {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 write!(f, "r{}", Uuid::from_bytes(self.0))
170 }
171}
172
173impl std::fmt::Debug for RollupId {
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 write!(f, "RollupId({})", Uuid::from_bytes(self.0))
176 }
177}
178
179impl FromStr for RollupId {
180 type Err = String;
181
182 fn from_str(s: &str) -> Result<Self, Self::Err> {
183 parse_id("r", "RollupId", s).map(RollupId)
184 }
185}
186
187impl RollupId {
188 pub(crate) fn new() -> Self {
189 RollupId(*Uuid::new_v4().as_bytes())
190 }
191}
192
193#[derive(
201 Arbitrary,
202 Clone,
203 Debug,
204 PartialEq,
205 Eq,
206 PartialOrd,
207 Ord,
208 Hash,
209 Serialize,
210 Deserialize
211)]
212pub struct PartialRollupKey(pub(crate) String);
213
214impl PartialRollupKey {
215 pub fn new(seqno: SeqNo, rollup_id: &RollupId) -> Self {
216 PartialRollupKey(format!("{}/{}", seqno, rollup_id))
217 }
218
219 pub fn complete(&self, shard_id: &ShardId) -> BlobKey {
220 BlobKey(format!("{}/{}", shard_id, self))
221 }
222}
223
224impl std::fmt::Display for PartialRollupKey {
225 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226 f.write_str(&self.0)
227 }
228}
229
230impl Deref for PartialRollupKey {
231 type Target = String;
232
233 fn deref(&self) -> &Self::Target {
234 &self.0
235 }
236}
237
238#[derive(Debug, PartialEq)]
242pub enum PartialBlobKey {
243 Batch(WriterKey, PartId),
245 Rollup(SeqNo, RollupId),
247}
248
249#[derive(Clone, Debug, PartialEq)]
256pub struct BlobKey(String);
257
258impl std::fmt::Display for BlobKey {
259 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 f.write_str(&self.0)
261 }
262}
263
264impl Deref for BlobKey {
265 type Target = String;
266
267 fn deref(&self) -> &Self::Target {
268 &self.0
269 }
270}
271
272impl BlobKey {
273 pub fn parse_ids(key: &str) -> Result<(ShardId, PartialBlobKey), String> {
274 let err = || {
275 format!(
276 "invalid blob key format. expected either <shard_id>/<writer_id>/<part_id> or <shard_id>/<seqno>/<rollup_id>. got: {}",
277 key
278 )
279 };
280 let (shard, blob) = key.split_once('/').ok_or_else(err)?;
281 let shard_id = ShardId::from_str(shard)?;
282
283 let blob_key = if blob.starts_with('w') | blob.starts_with('n') {
284 let (writer, part) = split_batch_key(blob)?;
285 PartialBlobKey::Batch(writer, part)
286 } else {
287 let (seqno, rollup) = blob.split_once('/').ok_or_else(err)?;
288 PartialBlobKey::Rollup(SeqNo::from_str(seqno)?, RollupId::from_str(rollup)?)
289 };
290 Ok((shard_id, blob_key))
291 }
292}
293
294#[derive(Debug)]
296pub enum BlobKeyPrefix<'a> {
297 All,
299 Shard(&'a ShardId),
301 #[cfg(test)]
303 Writer(&'a ShardId, &'a WriterKey),
304 #[cfg(test)]
306 Rollups(&'a ShardId),
307}
308
309impl std::fmt::Display for BlobKeyPrefix<'_> {
310 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
311 let s = match self {
312 BlobKeyPrefix::All => "".into(),
313 BlobKeyPrefix::Shard(shard) => format!("{}", shard),
314 #[cfg(test)]
315 BlobKeyPrefix::Writer(shard, writer) => format!("{}/{}", shard, writer),
316 #[cfg(test)]
317 BlobKeyPrefix::Rollups(shard) => format!("{}/v", shard),
318 };
319 f.write_str(&s)
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326 use proptest::prelude::*;
327 use semver::Version;
328
329 fn gen_version() -> impl Strategy<Value = Version> {
330 (0u64..=99, 0u64..=999, 0u64..=99)
331 .prop_map(|(major, minor, patch)| Version::new(major, minor, patch))
332 }
333
334 #[mz_ore::test]
335 fn key_ordering_compatible() {
336 proptest!(|(a in gen_version(), b in gen_version())| {
339 let a_key = WriterKey::for_version(&a);
340 let b_key = WriterKey::for_version(&b);
341 if a >= b {
342 assert!(a_key >= b_key);
343 }
344 if a <= b {
345 assert!(a_key <= b_key);
346 }
347 })
348 }
349
350 #[mz_ore::test]
351 fn partial_blob_key_completion() {
352 let (shard_id, writer_id, part_id) = (ShardId::new(), WriterId::new(), PartId::new());
353 let partial_key = PartialBatchKey::new(&WriterKey::Id(writer_id.clone()), &part_id);
354 assert_eq!(
355 partial_key.complete(&shard_id),
356 BlobKey(format!("{}/{}/{}", shard_id, writer_id, part_id))
357 );
358 }
359
360 #[mz_ore::test]
361 fn blob_key_parse() -> Result<(), String> {
362 let (shard_id, writer_id, part_id) = (ShardId::new(), WriterId::new(), PartId::new());
363
364 assert_eq!(
366 BlobKey::parse_ids(&format!("{}/{}/{}", shard_id, writer_id, part_id)),
367 Ok((
368 shard_id,
369 PartialBlobKey::Batch(WriterKey::Id(writer_id), part_id.clone())
370 ))
371 );
372
373 let version = Version::new(1, 0, 0);
374 assert_eq!(
375 BlobKey::parse_ids(&format!(
376 "{}/{}/{}",
377 shard_id,
378 WriterKey::for_version(&version),
379 part_id
380 )),
381 Ok((
382 shard_id,
383 PartialBlobKey::Batch(WriterKey::for_version(&version), part_id)
384 ))
385 );
386
387 assert!(matches!(
389 BlobKey::parse_ids(&format!("{}/{}", WriterId::new(), PartId::new())),
390 Err(_)
391 ));
392 assert!(matches!(
393 BlobKey::parse_ids(&format!(
394 "{}/{}/{}/{}",
395 ShardId::new(),
396 WriterId::new(),
397 PartId::new(),
398 PartId::new()
399 )),
400 Err(_)
401 ));
402 assert!(matches!(BlobKey::parse_ids("abc/def/ghi"), Err(_)));
403 assert!(matches!(BlobKey::parse_ids(""), Err(_)));
404
405 assert!(matches!(
407 BlobKey::parse_ids(&format!(
408 "{}/{}/{}",
409 PartId::new(),
410 ShardId::new(),
411 WriterId::new()
412 )),
413 Err(_)
414 ));
415
416 Ok(())
417 }
418}