mz_persist_client/internal/
paths.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::fmt::{Display, Formatter, Write};
11use std::ops::Deref;
12use std::str::FromStr;
13
14use mz_persist::location::SeqNo;
15use proptest_derive::Arbitrary;
16use semver::Version;
17use serde::{Deserialize, Serialize};
18use uuid::Uuid;
19
20use crate::internal::encoding::parse_id;
21use crate::{ShardId, WriterId};
22
23/// An opaque identifier for an individual batch of a persist durable TVC (aka
24/// shard).
25#[derive(Clone, PartialEq, Eq, Hash)]
26pub struct PartId(pub(crate) [u8; 16]);
27
28impl std::fmt::Display for PartId {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        write!(f, "p{}", Uuid::from_bytes(self.0))
31    }
32}
33
34impl std::fmt::Debug for PartId {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        write!(f, "PartId({})", Uuid::from_bytes(self.0))
37    }
38}
39
40impl FromStr for PartId {
41    type Err = String;
42
43    fn from_str(s: &str) -> Result<Self, Self::Err> {
44        parse_id('p', "PartId", s).map(PartId)
45    }
46}
47
48impl PartId {
49    pub(crate) fn new() -> Self {
50        PartId(*Uuid::new_v4().as_bytes())
51    }
52}
53
54/// A component that provides information about the writer of a blob.
55/// For older blobs, this is a UUID for the specific writer instance;
56/// for newer blobs, this is a string representing the version at which the blob was written.
57/// In either case, it's used to help determine whether a blob may eventually
58/// be linked into state, or whether it's junk that we can clean up.
59/// Note that the ordering is meaningful: all writer-id keys are considered smaller than
60/// all version keys.
61#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Clone)]
62pub enum WriterKey {
63    Id(WriterId),
64    Version(String),
65}
66
67impl WriterKey {
68    /// This uses the version numbering scheme specified in [mz_build_info::BuildInfo::version_num].
69    /// (And asserts that the version isn't so large that that scheme is no longer sufficient.)
70    pub fn for_version(version: &Version) -> WriterKey {
71        assert!(version.major <= 99);
72        assert!(version.minor <= 999);
73        assert!(version.patch <= 99);
74        WriterKey::Version(format!(
75            "{:02}{:03}{:02}",
76            version.major, version.minor, version.patch
77        ))
78    }
79}
80
81impl FromStr for WriterKey {
82    type Err = String;
83
84    fn from_str(s: &str) -> Result<Self, Self::Err> {
85        if s.is_empty() {
86            return Err("empty version string".to_owned());
87        }
88
89        let key = match &s[..1] {
90            "w" => WriterKey::Id(WriterId::from_str(s)?),
91            "n" => WriterKey::Version(s[1..].to_owned()),
92            c => {
93                return Err(format!("unknown prefix for version: {c}"));
94            }
95        };
96        Ok(key)
97    }
98}
99
100impl Display for WriterKey {
101    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
102        match self {
103            WriterKey::Id(id) => id.fmt(f),
104            WriterKey::Version(s) => {
105                f.write_char('n')?;
106                f.write_str(s)
107            }
108        }
109    }
110}
111
112/// Partially encoded path used in [mz_persist::location::Blob] storage.
113/// Composed of a [WriterId] and [PartId]. Can be completed with a [ShardId] to
114/// form a full [BlobKey].
115///
116/// Used to reduce the bytes needed to refer to a blob key in memory and in
117/// persistent state, all access to blobs are always within the context of an
118/// individual shard.
119#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
120pub struct PartialBatchKey(pub(crate) String);
121
122fn split_batch_key(key: &str) -> Result<(WriterKey, PartId), String> {
123    let (writer_key, part_id) = key
124        .split_once('/')
125        .ok_or_else(|| "partial batch key should contain a /".to_owned())?;
126
127    let writer_key = WriterKey::from_str(writer_key)?;
128    let part_id = PartId::from_str(part_id)?;
129    Ok((writer_key, part_id))
130}
131
132impl PartialBatchKey {
133    pub fn new(version: &WriterKey, part_id: &PartId) -> Self {
134        PartialBatchKey(format!("{}/{}", version, part_id))
135    }
136
137    pub fn split(&self) -> Option<(WriterKey, PartId)> {
138        split_batch_key(&self.0).ok()
139    }
140
141    pub fn complete(&self, shard_id: &ShardId) -> BlobKey {
142        BlobKey(format!("{}/{}", shard_id, self))
143    }
144}
145
146impl std::fmt::Display for PartialBatchKey {
147    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148        f.write_str(&self.0)
149    }
150}
151
152/// An opaque identifier for an individual blob of a persist durable TVC (aka shard).
153#[derive(Clone, PartialEq, Eq, Hash)]
154pub struct RollupId(pub(crate) [u8; 16]);
155
156impl std::fmt::Display for RollupId {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        write!(f, "r{}", Uuid::from_bytes(self.0))
159    }
160}
161
162impl std::fmt::Debug for RollupId {
163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164        write!(f, "RollupId({})", Uuid::from_bytes(self.0))
165    }
166}
167
168impl FromStr for RollupId {
169    type Err = String;
170
171    fn from_str(s: &str) -> Result<Self, Self::Err> {
172        parse_id('r', "RollupId", s).map(RollupId)
173    }
174}
175
176impl RollupId {
177    pub(crate) fn new() -> Self {
178        RollupId(*Uuid::new_v4().as_bytes())
179    }
180}
181
182/// Partially encoded path used in [mz_persist::location::Blob] storage.
183/// Composed of a [SeqNo] and [RollupId]. Can be completed with a [ShardId] to
184/// form a full [BlobKey].
185///
186/// Used to reduce the bytes needed to refer to a blob key in memory and in
187/// persistent state, all access to blobs are always within the context of an
188/// individual shard.
189#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
190pub struct PartialRollupKey(pub(crate) String);
191
192impl PartialRollupKey {
193    pub fn new(seqno: SeqNo, rollup_id: &RollupId) -> Self {
194        PartialRollupKey(format!("{}/{}", seqno, rollup_id))
195    }
196
197    pub fn complete(&self, shard_id: &ShardId) -> BlobKey {
198        BlobKey(format!("{}/{}", shard_id, self))
199    }
200}
201
202impl std::fmt::Display for PartialRollupKey {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        f.write_str(&self.0)
205    }
206}
207
208impl Deref for PartialRollupKey {
209    type Target = String;
210
211    fn deref(&self) -> &Self::Target {
212        &self.0
213    }
214}
215
216/// A parsed, partial path used in [mz_persist::location::Blob] storage.
217///
218/// This enumerates all types of partial blob keys used in persist.
219#[derive(Debug, PartialEq)]
220pub enum PartialBlobKey {
221    /// A parsed [PartialBatchKey].
222    Batch(WriterKey, PartId),
223    /// A parsed [PartialRollupKey].
224    Rollup(SeqNo, RollupId),
225}
226
227/// Fully encoded path used in [mz_persist::location::Blob] storage. Composed of
228/// a [ShardId], [WriterId] and [PartId].
229///
230/// Use when directly interacting with a [mz_persist::location::Blob], otherwise
231/// use [PartialBatchKey] or [PartialRollupKey] to refer to a blob without
232/// needing to copy the [ShardId].
233#[derive(Clone, Debug, PartialEq)]
234pub struct BlobKey(String);
235
236impl std::fmt::Display for BlobKey {
237    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238        f.write_str(&self.0)
239    }
240}
241
242impl Deref for BlobKey {
243    type Target = String;
244
245    fn deref(&self) -> &Self::Target {
246        &self.0
247    }
248}
249
250impl BlobKey {
251    pub fn parse_ids(key: &str) -> Result<(ShardId, PartialBlobKey), String> {
252        let err = || {
253            format!(
254                "invalid blob key format. expected either <shard_id>/<writer_id>/<part_id> or <shard_id>/<seqno>/<rollup_id>. got: {}",
255                key
256            )
257        };
258        let (shard, blob) = key.split_once('/').ok_or_else(err)?;
259        let shard_id = ShardId::from_str(shard)?;
260
261        let blob_key = if blob.starts_with('w') | blob.starts_with('n') {
262            let (writer, part) = split_batch_key(blob)?;
263            PartialBlobKey::Batch(writer, part)
264        } else {
265            let (seqno, rollup) = blob.split_once('/').ok_or_else(err)?;
266            PartialBlobKey::Rollup(SeqNo::from_str(seqno)?, RollupId::from_str(rollup)?)
267        };
268        Ok((shard_id, blob_key))
269    }
270}
271
272/// Represents the prefix of a blob path. Used for selecting subsets of blobs
273#[derive(Debug)]
274pub enum BlobKeyPrefix<'a> {
275    /// For accessing all blobs
276    All,
277    /// Scoped to the batch and state rollup blobs of an individual shard
278    Shard(&'a ShardId),
279    /// Scoped to the batch blobs of an individual writer
280    #[cfg(test)]
281    Writer(&'a ShardId, &'a WriterKey),
282    /// Scoped to all state rollup blobs  of an individual shard
283    #[cfg(test)]
284    Rollups(&'a ShardId),
285}
286
287impl std::fmt::Display for BlobKeyPrefix<'_> {
288    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289        let s = match self {
290            BlobKeyPrefix::All => "".into(),
291            BlobKeyPrefix::Shard(shard) => format!("{}", shard),
292            #[cfg(test)]
293            BlobKeyPrefix::Writer(shard, writer) => format!("{}/{}", shard, writer),
294            #[cfg(test)]
295            BlobKeyPrefix::Rollups(shard) => format!("{}/v", shard),
296        };
297        f.write_str(&s)
298    }
299}
300
301#[cfg(test)]
302mod tests {
303    use super::*;
304    use proptest::prelude::*;
305    use semver::Version;
306
307    fn gen_version() -> impl Strategy<Value = Version> {
308        (0u64..=99, 0u64..=999, 0u64..=99)
309            .prop_map(|(major, minor, patch)| Version::new(major, minor, patch))
310    }
311
312    #[mz_ore::test]
313    fn key_ordering_compatible() {
314        // The WriterKey's ordering should never disagree with the Version ordering.
315        // (Though the writer key might compare equal when the version does not.)
316        proptest!(|(a in gen_version(), b in gen_version())| {
317            let a_key = WriterKey::for_version(&a);
318            let b_key = WriterKey::for_version(&b);
319            if a >= b {
320                assert!(a_key >= b_key);
321            }
322            if a <= b {
323                assert!(a_key <= b_key);
324            }
325        })
326    }
327
328    #[mz_ore::test]
329    fn partial_blob_key_completion() {
330        let (shard_id, writer_id, part_id) = (ShardId::new(), WriterId::new(), PartId::new());
331        let partial_key = PartialBatchKey::new(&WriterKey::Id(writer_id.clone()), &part_id);
332        assert_eq!(
333            partial_key.complete(&shard_id),
334            BlobKey(format!("{}/{}/{}", shard_id, writer_id, part_id))
335        );
336    }
337
338    #[mz_ore::test]
339    fn blob_key_parse() -> Result<(), String> {
340        let (shard_id, writer_id, part_id) = (ShardId::new(), WriterId::new(), PartId::new());
341
342        // can parse full blob key
343        assert_eq!(
344            BlobKey::parse_ids(&format!("{}/{}/{}", shard_id, writer_id, part_id)),
345            Ok((
346                shard_id,
347                PartialBlobKey::Batch(WriterKey::Id(writer_id), part_id.clone())
348            ))
349        );
350
351        let version = Version::new(1, 0, 0);
352        assert_eq!(
353            BlobKey::parse_ids(&format!(
354                "{}/{}/{}",
355                shard_id,
356                WriterKey::for_version(&version),
357                part_id
358            )),
359            Ok((
360                shard_id,
361                PartialBlobKey::Batch(WriterKey::for_version(&version), part_id)
362            ))
363        );
364
365        // fails on invalid blob key formats
366        assert!(matches!(
367            BlobKey::parse_ids(&format!("{}/{}", WriterId::new(), PartId::new())),
368            Err(_)
369        ));
370        assert!(matches!(
371            BlobKey::parse_ids(&format!(
372                "{}/{}/{}/{}",
373                ShardId::new(),
374                WriterId::new(),
375                PartId::new(),
376                PartId::new()
377            )),
378            Err(_)
379        ));
380        assert!(matches!(BlobKey::parse_ids("abc/def/ghi"), Err(_)));
381        assert!(matches!(BlobKey::parse_ids(""), Err(_)));
382
383        // fails if shard/writer/part id are in the wrong spots
384        assert!(matches!(
385            BlobKey::parse_ids(&format!(
386                "{}/{}/{}",
387                PartId::new(),
388                ShardId::new(),
389                WriterId::new()
390            )),
391            Err(_)
392        ));
393
394        Ok(())
395    }
396}