Skip to main content

mz_persist_types/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types for the persist crate.
11
12#![warn(missing_docs)]
13#![warn(
14    clippy::cast_possible_truncation,
15    clippy::cast_precision_loss,
16    clippy::cast_sign_loss
17)]
18
19use std::fmt::Debug;
20
21use crate::columnar::Schema;
22use bytes::{BufMut, Bytes};
23use mz_ore::url::SensitiveUrl;
24use mz_proto::{RustType, TryFromProtoError};
25use proptest_derive::Arbitrary;
26use serde::{Deserialize, Serialize};
27use uuid::Uuid;
28
29pub mod arrow;
30pub mod codec_impls;
31pub mod columnar;
32pub mod parquet;
33pub mod part;
34pub mod schema;
35pub mod stats;
36pub mod timestamp;
37pub mod txn;
38
39/// Encoding and decoding operations for a type usable as a persisted key or
40/// value.
41pub trait Codec: Default + Sized + PartialEq + 'static {
42    /// The type of the associated schema for [Self].
43    ///
44    /// This is a separate type because Row is not self-describing. For Row, you
45    /// need a RelationDesc to determine the types of any columns that are
46    /// Datum::Null.
47    type Schema: Schema<Self> + PartialEq;
48
49    /// Name of the codec.
50    ///
51    /// This name is stored for the key and value when a stream is first created
52    /// and the same key and value codec must be used for that stream afterward.
53    fn codec_name() -> String;
54    /// Encode a key or value for permanent storage.
55    ///
56    /// This must perfectly round-trip Self through [Codec::decode]. If the
57    /// encode function for this codec ever changes, decode must be able to
58    /// handle bytes output by all previous versions of encode.
59    fn encode<B>(&self, buf: &mut B)
60    where
61        B: BufMut;
62
63    /// Encode a key or value to a Vec.
64    ///
65    /// This is a convenience function for calling [Self::encode] with a fresh
66    /// `Vec` each time. Reuse an allocation when performance matters!
67    fn encode_to_vec(&self) -> Vec<u8> {
68        let mut buf = vec![];
69        self.encode(&mut buf);
70        buf
71    }
72
73    /// Decode a key or value previous encoded with this codec's
74    /// [Codec::encode].
75    ///
76    /// This must perfectly round-trip Self through [Codec::encode]. If the
77    /// encode function for this codec ever changes, decode must be able to
78    /// handle bytes output by all previous versions of encode.
79    ///
80    /// It should also gracefully handle data encoded by future versions of
81    /// encode (likely with an error).
82    //
83    // TODO: Mechanically, this could return a ref to the original bytes
84    // without any copies, see if we can make the types work out for that.
85    fn decode<'a>(buf: &'a [u8], schema: &Self::Schema) -> Result<Self, String>;
86
87    /// A type used with [Self::decode_from] for allocation reuse. Set to `()`
88    /// if unnecessary.
89    type Storage: Default + Send + Sync;
90    /// An alternate form of [Self::decode] which enables amortizing allocs.
91    ///
92    /// First, instead of returning `Self`, it takes `&mut Self` as a parameter,
93    /// allowing for reuse of the internal `Row`/`SourceData` allocations.
94    ///
95    /// Second, it adds a `type Storage` to `Codec` and also passes it in as
96    /// `&mut Option<Self::Storage>`, allowing for reuse of
97    /// `ProtoRow`/`ProtoSourceData` allocations. If `Some`, this impl may
98    /// attempt to reuse allocations with it. It may also leave allocations in
99    /// it for use in future calls to `decode_from`.
100    ///
101    /// A default implementation is provided for `Codec` impls that can't take
102    /// advantage of this.
103    fn decode_from<'a>(
104        &mut self,
105        buf: &'a [u8],
106        _storage: &mut Option<Self::Storage>,
107        schema: &Self::Schema,
108    ) -> Result<(), String> {
109        *self = Self::decode(buf, schema)?;
110        Ok(())
111    }
112
113    /// Checks that the given value matches the provided schema.
114    ///
115    /// A no-op default implementation is provided for convenience.
116    fn validate(_val: &Self, _schema: &Self::Schema) -> Result<(), String> {
117        Ok(())
118    }
119
120    /// Encode a schema for permanent storage.
121    ///
122    /// This must perfectly round-trip the schema through [Self::decode_schema].
123    /// If the encode_schema function ever changes, decode_schema must be able
124    /// to handle bytes output by all previous versions of encode_schema.
125    ///
126    /// TODO: Move this to instead be a new trait that is required by
127    /// Self::Schema?
128    fn encode_schema(schema: &Self::Schema) -> Bytes;
129
130    /// Decode a schema previous encoded with this codec's
131    /// [Self::encode_schema].
132    ///
133    /// This must perfectly round-trip the schema through [Self::encode_schema].
134    /// If the encode_schema function ever changes, decode_schema must be able
135    /// to handle bytes output by all previous versions of encode_schema.
136    fn decode_schema(buf: &Bytes) -> Self::Schema;
137}
138
139/// Encoding and decoding operations for a type usable as a persisted timestamp
140/// or diff.
141pub trait Codec64: Sized + Clone + Debug + 'static {
142    /// Name of the codec.
143    ///
144    /// This name is stored for the timestamp and diff when a stream is first
145    /// created and the same timestamp and diff codec must be used for that
146    /// stream afterward.
147    fn codec_name() -> String;
148
149    /// Encode a timestamp or diff for permanent storage.
150    ///
151    /// This must perfectly round-trip Self through [Codec64::decode]. If the
152    /// encode function for this codec ever changes, decode must be able to
153    /// handle bytes output by all previous versions of encode.
154    fn encode(&self) -> [u8; 8];
155
156    /// Decode a timestamp or diff previous encoded with this codec's
157    /// [Codec64::encode].
158    ///
159    /// This must perfectly round-trip Self through [Codec64::encode]. If the
160    /// encode function for this codec ever changes, decode must be able to
161    /// handle bytes output by all previous versions of encode.
162    fn decode(buf: [u8; 8]) -> Self;
163}
164
165/// A location in s3, other cloud storage, or otherwise "durable storage" used
166/// by persist.
167///
168/// This structure can be durably written down or transmitted for use by other
169/// processes. This location can contain any number of persist shards.
170#[derive(
171    Arbitrary,
172    Debug,
173    Clone,
174    PartialEq,
175    Eq,
176    PartialOrd,
177    Ord,
178    Hash,
179    Deserialize,
180    Serialize
181)]
182pub struct PersistLocation {
183    /// Uri string that identifies the blob store.
184    pub blob_uri: SensitiveUrl,
185
186    /// Uri string that identifies the consensus system.
187    pub consensus_uri: SensitiveUrl,
188}
189
190impl PersistLocation {
191    /// Returns a PersistLocation indicating in-mem blob and consensus.
192    pub fn new_in_mem() -> Self {
193        PersistLocation {
194            blob_uri: "mem://".parse().unwrap(),
195            consensus_uri: "mem://".parse().unwrap(),
196        }
197    }
198}
199
200/// An opaque identifier for a persist durable TVC (aka shard).
201///
202/// The [std::string::ToString::to_string] format of this may be stored durably
203/// or otherwise used as an interchange format. It can be parsed back using
204/// [str::parse] or [std::str::FromStr::from_str].
205#[derive(
206    Arbitrary,
207    Clone,
208    Copy,
209    Default,
210    PartialEq,
211    Eq,
212    Hash,
213    PartialOrd,
214    Ord,
215    Serialize,
216    Deserialize
217)]
218#[serde(try_from = "String", into = "String")]
219pub struct ShardId([u8; 16]);
220
221impl std::fmt::Display for ShardId {
222    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223        write!(f, "s{}", Uuid::from_bytes(self.0))
224    }
225}
226
227impl std::fmt::Debug for ShardId {
228    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229        write!(f, "ShardId({})", Uuid::from_bytes(self.0))
230    }
231}
232
233impl std::str::FromStr for ShardId {
234    type Err = String;
235
236    fn from_str(s: &str) -> Result<Self, Self::Err> {
237        let uuid_encoded = match s.strip_prefix('s') {
238            Some(x) => x,
239            None => return Err(format!("invalid ShardId {}: incorrect prefix", s)),
240        };
241        let uuid = Uuid::parse_str(uuid_encoded)
242            .map_err(|err| format!("invalid ShardId {}: {}", s, err))?;
243        Ok(ShardId(*uuid.as_bytes()))
244    }
245}
246
247impl From<ShardId> for String {
248    fn from(shard_id: ShardId) -> Self {
249        shard_id.to_string()
250    }
251}
252
253impl TryFrom<String> for ShardId {
254    type Error = String;
255
256    fn try_from(s: String) -> Result<Self, Self::Error> {
257        s.parse()
258    }
259}
260
261impl ShardId {
262    /// Returns a random [ShardId] that is reasonably likely to have never been
263    /// generated before.
264    pub fn new() -> Self {
265        ShardId(Uuid::new_v4().as_bytes().to_owned())
266    }
267}
268
269impl RustType<String> for ShardId {
270    fn into_proto(&self) -> String {
271        self.to_string()
272    }
273
274    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
275        match proto.parse() {
276            Ok(x) => Ok(x),
277            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
278        }
279    }
280}
281
282/// Advance a timestamp by the least amount possible such that
283/// `ts.less_than(ts.step_forward())` is true.
284///
285/// TODO: Unify this with repr's TimestampManipulation. Or, ideally, get rid of
286/// it entirely by making txn-wal methods take an `advance_to` argument.
287pub trait StepForward {
288    /// Advance a timestamp by the least amount possible such that
289    /// `ts.less_than(ts.step_forward())` is true. Panic if unable to do so.
290    fn step_forward(&self) -> Self;
291}
292
293impl StepForward for u64 {
294    fn step_forward(&self) -> Self {
295        self.checked_add(1).unwrap()
296    }
297}