Skip to main content

mz_persist_client/internal/
paths.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::fmt::{Display, Formatter, Write};
11use std::ops::Deref;
12use std::str::FromStr;
13
14use mz_persist::location::SeqNo;
15use proptest_derive::Arbitrary;
16use semver::Version;
17use serde::{Deserialize, Serialize};
18use uuid::Uuid;
19
20use crate::internal::encoding::parse_id;
21use crate::{ShardId, WriterId};
22
23/// An opaque identifier for an individual batch of a persist durable TVC (aka
24/// shard).
25#[derive(Clone, PartialEq, Eq, Hash)]
26pub struct PartId(pub(crate) [u8; 16]);
27
28impl std::fmt::Display for PartId {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        write!(f, "p{}", Uuid::from_bytes(self.0))
31    }
32}
33
34impl std::fmt::Debug for PartId {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        write!(f, "PartId({})", Uuid::from_bytes(self.0))
37    }
38}
39
40impl FromStr for PartId {
41    type Err = String;
42
43    fn from_str(s: &str) -> Result<Self, Self::Err> {
44        parse_id("p", "PartId", s).map(PartId)
45    }
46}
47
48impl PartId {
49    pub(crate) fn new() -> Self {
50        PartId(*Uuid::new_v4().as_bytes())
51    }
52}
53
54/// A component that provides information about the writer of a blob.
55/// For older blobs, this is a UUID for the specific writer instance;
56/// for newer blobs, this is a string representing the version at which the blob was written.
57/// In either case, it's used to help determine whether a blob may eventually
58/// be linked into state, or whether it's junk that we can clean up.
59/// Note that the ordering is meaningful: all writer-id keys are considered smaller than
60/// all version keys.
61#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Clone)]
62pub enum WriterKey {
63    Id(WriterId),
64    Version(String),
65}
66
67impl WriterKey {
68    /// This uses the version numbering scheme specified in [mz_build_info::BuildInfo::version_num].
69    /// (And asserts that the version isn't so large that that scheme is no longer sufficient.)
70    pub fn for_version(version: &Version) -> WriterKey {
71        assert!(version.major <= 99);
72        assert!(version.minor <= 999);
73        assert!(version.patch <= 99);
74        WriterKey::Version(format!(
75            "{:02}{:03}{:02}",
76            version.major, version.minor, version.patch
77        ))
78    }
79}
80
81impl FromStr for WriterKey {
82    type Err = String;
83
84    fn from_str(s: &str) -> Result<Self, Self::Err> {
85        if s.is_empty() {
86            return Err("empty version string".to_owned());
87        }
88
89        let key = match &s[..1] {
90            "w" => WriterKey::Id(WriterId::from_str(s)?),
91            "n" => WriterKey::Version(s[1..].to_owned()),
92            c => {
93                return Err(format!("unknown prefix for version: {c}"));
94            }
95        };
96        Ok(key)
97    }
98}
99
100impl Display for WriterKey {
101    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
102        match self {
103            WriterKey::Id(id) => id.fmt(f),
104            WriterKey::Version(s) => {
105                f.write_char('n')?;
106                f.write_str(s)
107            }
108        }
109    }
110}
111
112/// Partially encoded path used in [mz_persist::location::Blob] storage.
113/// Composed of a [WriterId] and [PartId]. Can be completed with a [ShardId] to
114/// form a full [BlobKey].
115///
116/// Used to reduce the bytes needed to refer to a blob key in memory and in
117/// persistent state, all access to blobs are always within the context of an
118/// individual shard.
119#[derive(
120    Arbitrary,
121    Clone,
122    Debug,
123    PartialEq,
124    Eq,
125    PartialOrd,
126    Ord,
127    Hash,
128    Serialize,
129    Deserialize
130)]
131pub struct PartialBatchKey(pub(crate) String);
132
133fn split_batch_key(key: &str) -> Result<(WriterKey, PartId), String> {
134    let (writer_key, part_id) = key
135        .split_once('/')
136        .ok_or_else(|| "partial batch key should contain a /".to_owned())?;
137
138    let writer_key = WriterKey::from_str(writer_key)?;
139    let part_id = PartId::from_str(part_id)?;
140    Ok((writer_key, part_id))
141}
142
143impl PartialBatchKey {
144    pub fn new(version: &WriterKey, part_id: &PartId) -> Self {
145        PartialBatchKey(format!("{}/{}", version, part_id))
146    }
147
148    pub fn split(&self) -> Option<(WriterKey, PartId)> {
149        split_batch_key(&self.0).ok()
150    }
151
152    pub fn complete(&self, shard_id: &ShardId) -> BlobKey {
153        BlobKey(format!("{}/{}", shard_id, self))
154    }
155}
156
157impl std::fmt::Display for PartialBatchKey {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        f.write_str(&self.0)
160    }
161}
162
163/// An opaque identifier for an individual blob of a persist durable TVC (aka shard).
164#[derive(Clone, PartialEq, Eq, Hash)]
165pub struct RollupId(pub(crate) [u8; 16]);
166
167impl std::fmt::Display for RollupId {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        write!(f, "r{}", Uuid::from_bytes(self.0))
170    }
171}
172
173impl std::fmt::Debug for RollupId {
174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        write!(f, "RollupId({})", Uuid::from_bytes(self.0))
176    }
177}
178
179impl FromStr for RollupId {
180    type Err = String;
181
182    fn from_str(s: &str) -> Result<Self, Self::Err> {
183        parse_id("r", "RollupId", s).map(RollupId)
184    }
185}
186
187impl RollupId {
188    pub(crate) fn new() -> Self {
189        RollupId(*Uuid::new_v4().as_bytes())
190    }
191}
192
193/// Partially encoded path used in [mz_persist::location::Blob] storage.
194/// Composed of a [SeqNo] and [RollupId]. Can be completed with a [ShardId] to
195/// form a full [BlobKey].
196///
197/// Used to reduce the bytes needed to refer to a blob key in memory and in
198/// persistent state, all access to blobs are always within the context of an
199/// individual shard.
200#[derive(
201    Arbitrary,
202    Clone,
203    Debug,
204    PartialEq,
205    Eq,
206    PartialOrd,
207    Ord,
208    Hash,
209    Serialize,
210    Deserialize
211)]
212pub struct PartialRollupKey(pub(crate) String);
213
214impl PartialRollupKey {
215    pub fn new(seqno: SeqNo, rollup_id: &RollupId) -> Self {
216        PartialRollupKey(format!("{}/{}", seqno, rollup_id))
217    }
218
219    pub fn complete(&self, shard_id: &ShardId) -> BlobKey {
220        BlobKey(format!("{}/{}", shard_id, self))
221    }
222}
223
224impl std::fmt::Display for PartialRollupKey {
225    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226        f.write_str(&self.0)
227    }
228}
229
230impl Deref for PartialRollupKey {
231    type Target = String;
232
233    fn deref(&self) -> &Self::Target {
234        &self.0
235    }
236}
237
238/// A parsed, partial path used in [mz_persist::location::Blob] storage.
239///
240/// This enumerates all types of partial blob keys used in persist.
241#[derive(Debug, PartialEq)]
242pub enum PartialBlobKey {
243    /// A parsed [PartialBatchKey].
244    Batch(WriterKey, PartId),
245    /// A parsed [PartialRollupKey].
246    Rollup(SeqNo, RollupId),
247}
248
249/// Fully encoded path used in [mz_persist::location::Blob] storage. Composed of
250/// a [ShardId], [WriterId] and [PartId].
251///
252/// Use when directly interacting with a [mz_persist::location::Blob], otherwise
253/// use [PartialBatchKey] or [PartialRollupKey] to refer to a blob without
254/// needing to copy the [ShardId].
255#[derive(Clone, Debug, PartialEq)]
256pub struct BlobKey(String);
257
258impl std::fmt::Display for BlobKey {
259    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260        f.write_str(&self.0)
261    }
262}
263
264impl Deref for BlobKey {
265    type Target = String;
266
267    fn deref(&self) -> &Self::Target {
268        &self.0
269    }
270}
271
272impl BlobKey {
273    pub fn parse_ids(key: &str) -> Result<(ShardId, PartialBlobKey), String> {
274        let err = || {
275            format!(
276                "invalid blob key format. expected either <shard_id>/<writer_id>/<part_id> or <shard_id>/<seqno>/<rollup_id>. got: {}",
277                key
278            )
279        };
280        let (shard, blob) = key.split_once('/').ok_or_else(err)?;
281        let shard_id = ShardId::from_str(shard)?;
282
283        let blob_key = if blob.starts_with('w') | blob.starts_with('n') {
284            let (writer, part) = split_batch_key(blob)?;
285            PartialBlobKey::Batch(writer, part)
286        } else {
287            let (seqno, rollup) = blob.split_once('/').ok_or_else(err)?;
288            PartialBlobKey::Rollup(SeqNo::from_str(seqno)?, RollupId::from_str(rollup)?)
289        };
290        Ok((shard_id, blob_key))
291    }
292}
293
294/// Represents the prefix of a blob path. Used for selecting subsets of blobs
295#[derive(Debug)]
296pub enum BlobKeyPrefix<'a> {
297    /// For accessing all blobs
298    All,
299    /// Scoped to the batch and state rollup blobs of an individual shard
300    Shard(&'a ShardId),
301    /// Scoped to the batch blobs of an individual writer
302    #[cfg(test)]
303    Writer(&'a ShardId, &'a WriterKey),
304    /// Scoped to all state rollup blobs  of an individual shard
305    #[cfg(test)]
306    Rollups(&'a ShardId),
307}
308
309impl std::fmt::Display for BlobKeyPrefix<'_> {
310    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
311        let s = match self {
312            BlobKeyPrefix::All => "".into(),
313            BlobKeyPrefix::Shard(shard) => format!("{}", shard),
314            #[cfg(test)]
315            BlobKeyPrefix::Writer(shard, writer) => format!("{}/{}", shard, writer),
316            #[cfg(test)]
317            BlobKeyPrefix::Rollups(shard) => format!("{}/v", shard),
318        };
319        f.write_str(&s)
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326    use proptest::prelude::*;
327    use semver::Version;
328
329    fn gen_version() -> impl Strategy<Value = Version> {
330        (0u64..=99, 0u64..=999, 0u64..=99)
331            .prop_map(|(major, minor, patch)| Version::new(major, minor, patch))
332    }
333
334    #[mz_ore::test]
335    fn key_ordering_compatible() {
336        // The WriterKey's ordering should never disagree with the Version ordering.
337        // (Though the writer key might compare equal when the version does not.)
338        proptest!(|(a in gen_version(), b in gen_version())| {
339            let a_key = WriterKey::for_version(&a);
340            let b_key = WriterKey::for_version(&b);
341            if a >= b {
342                assert!(a_key >= b_key);
343            }
344            if a <= b {
345                assert!(a_key <= b_key);
346            }
347        })
348    }
349
350    #[mz_ore::test]
351    fn partial_blob_key_completion() {
352        let (shard_id, writer_id, part_id) = (ShardId::new(), WriterId::new(), PartId::new());
353        let partial_key = PartialBatchKey::new(&WriterKey::Id(writer_id.clone()), &part_id);
354        assert_eq!(
355            partial_key.complete(&shard_id),
356            BlobKey(format!("{}/{}/{}", shard_id, writer_id, part_id))
357        );
358    }
359
360    #[mz_ore::test]
361    fn blob_key_parse() -> Result<(), String> {
362        let (shard_id, writer_id, part_id) = (ShardId::new(), WriterId::new(), PartId::new());
363
364        // can parse full blob key
365        assert_eq!(
366            BlobKey::parse_ids(&format!("{}/{}/{}", shard_id, writer_id, part_id)),
367            Ok((
368                shard_id,
369                PartialBlobKey::Batch(WriterKey::Id(writer_id), part_id.clone())
370            ))
371        );
372
373        let version = Version::new(1, 0, 0);
374        assert_eq!(
375            BlobKey::parse_ids(&format!(
376                "{}/{}/{}",
377                shard_id,
378                WriterKey::for_version(&version),
379                part_id
380            )),
381            Ok((
382                shard_id,
383                PartialBlobKey::Batch(WriterKey::for_version(&version), part_id)
384            ))
385        );
386
387        // fails on invalid blob key formats
388        assert!(matches!(
389            BlobKey::parse_ids(&format!("{}/{}", WriterId::new(), PartId::new())),
390            Err(_)
391        ));
392        assert!(matches!(
393            BlobKey::parse_ids(&format!(
394                "{}/{}/{}/{}",
395                ShardId::new(),
396                WriterId::new(),
397                PartId::new(),
398                PartId::new()
399            )),
400            Err(_)
401        ));
402        assert!(matches!(BlobKey::parse_ids("abc/def/ghi"), Err(_)));
403        assert!(matches!(BlobKey::parse_ids(""), Err(_)));
404
405        // fails if shard/writer/part id are in the wrong spots
406        assert!(matches!(
407            BlobKey::parse_ids(&format!(
408                "{}/{}/{}",
409                PartId::new(),
410                ShardId::new(),
411                WriterId::new()
412            )),
413            Err(_)
414        ));
415
416        Ok(())
417    }
418}