mz_persist_types/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types for the persist crate.
11
12#![warn(missing_docs)]
13#![warn(
14    clippy::cast_possible_truncation,
15    clippy::cast_precision_loss,
16    clippy::cast_sign_loss
17)]
18
19use std::fmt::Debug;
20
21use bytes::{BufMut, Bytes};
22use mz_ore::url::SensitiveUrl;
23use mz_proto::{RustType, TryFromProtoError};
24use proptest_derive::Arbitrary;
25use serde::{Deserialize, Serialize};
26use uuid::Uuid;
27
28use crate::columnar::Schema;
29
30pub mod arrow;
31pub mod codec_impls;
32pub mod columnar;
33pub mod parquet;
34pub mod part;
35pub mod schema;
36pub mod stats;
37pub mod timestamp;
38pub mod txn;
39
40/// Encoding and decoding operations for a type usable as a persisted key or
41/// value.
42pub trait Codec: Default + Sized + PartialEq + 'static {
43    /// The type of the associated schema for [Self].
44    ///
45    /// This is a separate type because Row is not self-describing. For Row, you
46    /// need a RelationDesc to determine the types of any columns that are
47    /// Datum::Null.
48    type Schema: Schema<Self> + PartialEq;
49
50    /// Name of the codec.
51    ///
52    /// This name is stored for the key and value when a stream is first created
53    /// and the same key and value codec must be used for that stream afterward.
54    fn codec_name() -> String;
55    /// Encode a key or value for permanent storage.
56    ///
57    /// This must perfectly round-trip Self through [Codec::decode]. If the
58    /// encode function for this codec ever changes, decode must be able to
59    /// handle bytes output by all previous versions of encode.
60    fn encode<B>(&self, buf: &mut B)
61    where
62        B: BufMut;
63
64    /// Encode a key or value to a Vec.
65    ///
66    /// This is a convenience function for calling [Self::encode] with a fresh
67    /// `Vec` each time. Reuse an allocation when performance matters!
68    fn encode_to_vec(&self) -> Vec<u8> {
69        let mut buf = vec![];
70        self.encode(&mut buf);
71        buf
72    }
73
74    /// Decode a key or value previous encoded with this codec's
75    /// [Codec::encode].
76    ///
77    /// This must perfectly round-trip Self through [Codec::encode]. If the
78    /// encode function for this codec ever changes, decode must be able to
79    /// handle bytes output by all previous versions of encode.
80    ///
81    /// It should also gracefully handle data encoded by future versions of
82    /// encode (likely with an error).
83    //
84    // TODO: Mechanically, this could return a ref to the original bytes
85    // without any copies, see if we can make the types work out for that.
86    fn decode<'a>(buf: &'a [u8], schema: &Self::Schema) -> Result<Self, String>;
87
88    /// A type used with [Self::decode_from] for allocation reuse. Set to `()`
89    /// if unnecessary.
90    type Storage: Default + Send + Sync;
91    /// An alternate form of [Self::decode] which enables amortizing allocs.
92    ///
93    /// First, instead of returning `Self`, it takes `&mut Self` as a parameter,
94    /// allowing for reuse of the internal `Row`/`SourceData` allocations.
95    ///
96    /// Second, it adds a `type Storage` to `Codec` and also passes it in as
97    /// `&mut Option<Self::Storage>`, allowing for reuse of
98    /// `ProtoRow`/`ProtoSourceData` allocations. If `Some`, this impl may
99    /// attempt to reuse allocations with it. It may also leave allocations in
100    /// it for use in future calls to `decode_from`.
101    ///
102    /// A default implementation is provided for `Codec` impls that can't take
103    /// advantage of this.
104    fn decode_from<'a>(
105        &mut self,
106        buf: &'a [u8],
107        _storage: &mut Option<Self::Storage>,
108        schema: &Self::Schema,
109    ) -> Result<(), String> {
110        *self = Self::decode(buf, schema)?;
111        Ok(())
112    }
113
114    /// Checks that the given value matches the provided schema.
115    ///
116    /// A no-op default implementation is provided for convenience.
117    fn validate(_val: &Self, _schema: &Self::Schema) -> Result<(), String> {
118        Ok(())
119    }
120
121    /// Encode a schema for permanent storage.
122    ///
123    /// This must perfectly round-trip the schema through [Self::decode_schema].
124    /// If the encode_schema function ever changes, decode_schema must be able
125    /// to handle bytes output by all previous versions of encode_schema.
126    ///
127    /// TODO: Move this to instead be a new trait that is required by
128    /// Self::Schema?
129    fn encode_schema(schema: &Self::Schema) -> Bytes;
130
131    /// Decode a schema previous encoded with this codec's
132    /// [Self::encode_schema].
133    ///
134    /// This must perfectly round-trip the schema through [Self::encode_schema].
135    /// If the encode_schema function ever changes, decode_schema must be able
136    /// to handle bytes output by all previous versions of encode_schema.
137    fn decode_schema(buf: &Bytes) -> Self::Schema;
138}
139
140/// Encoding and decoding operations for a type usable as a persisted timestamp
141/// or diff.
142pub trait Codec64: Sized + Clone + Debug + 'static {
143    /// Name of the codec.
144    ///
145    /// This name is stored for the timestamp and diff when a stream is first
146    /// created and the same timestamp and diff codec must be used for that
147    /// stream afterward.
148    fn codec_name() -> String;
149
150    /// Encode a timestamp or diff for permanent storage.
151    ///
152    /// This must perfectly round-trip Self through [Codec64::decode]. If the
153    /// encode function for this codec ever changes, decode must be able to
154    /// handle bytes output by all previous versions of encode.
155    fn encode(&self) -> [u8; 8];
156
157    /// Decode a timestamp or diff previous encoded with this codec's
158    /// [Codec64::encode].
159    ///
160    /// This must perfectly round-trip Self through [Codec64::encode]. If the
161    /// encode function for this codec ever changes, decode must be able to
162    /// handle bytes output by all previous versions of encode.
163    fn decode(buf: [u8; 8]) -> Self;
164}
165
166/// A location in s3, other cloud storage, or otherwise "durable storage" used
167/// by persist.
168///
169/// This structure can be durably written down or transmitted for use by other
170/// processes. This location can contain any number of persist shards.
171#[derive(Arbitrary, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)]
172pub struct PersistLocation {
173    /// Uri string that identifies the blob store.
174    pub blob_uri: SensitiveUrl,
175
176    /// Uri string that identifies the consensus system.
177    pub consensus_uri: SensitiveUrl,
178}
179
180impl PersistLocation {
181    /// Returns a PersistLocation indicating in-mem blob and consensus.
182    pub fn new_in_mem() -> Self {
183        PersistLocation {
184            blob_uri: "mem://".parse().unwrap(),
185            consensus_uri: "mem://".parse().unwrap(),
186        }
187    }
188}
189
190/// An opaque identifier for a persist durable TVC (aka shard).
191///
192/// The [std::string::ToString::to_string] format of this may be stored durably
193/// or otherwise used as an interchange format. It can be parsed back using
194/// [str::parse] or [std::str::FromStr::from_str].
195#[derive(
196    Arbitrary, Clone, Copy, Default, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize,
197)]
198#[serde(try_from = "String", into = "String")]
199pub struct ShardId([u8; 16]);
200
201impl std::fmt::Display for ShardId {
202    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203        write!(f, "s{}", Uuid::from_bytes(self.0))
204    }
205}
206
207impl std::fmt::Debug for ShardId {
208    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209        write!(f, "ShardId({})", Uuid::from_bytes(self.0))
210    }
211}
212
213impl std::str::FromStr for ShardId {
214    type Err = String;
215
216    fn from_str(s: &str) -> Result<Self, Self::Err> {
217        let uuid_encoded = match s.strip_prefix('s') {
218            Some(x) => x,
219            None => return Err(format!("invalid ShardId {}: incorrect prefix", s)),
220        };
221        let uuid = Uuid::parse_str(uuid_encoded)
222            .map_err(|err| format!("invalid ShardId {}: {}", s, err))?;
223        Ok(ShardId(*uuid.as_bytes()))
224    }
225}
226
227impl From<ShardId> for String {
228    fn from(shard_id: ShardId) -> Self {
229        shard_id.to_string()
230    }
231}
232
233impl TryFrom<String> for ShardId {
234    type Error = String;
235
236    fn try_from(s: String) -> Result<Self, Self::Error> {
237        s.parse()
238    }
239}
240
241impl ShardId {
242    /// Returns a random [ShardId] that is reasonably likely to have never been
243    /// generated before.
244    pub fn new() -> Self {
245        ShardId(Uuid::new_v4().as_bytes().to_owned())
246    }
247}
248
249impl RustType<String> for ShardId {
250    fn into_proto(&self) -> String {
251        self.to_string()
252    }
253
254    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
255        match proto.parse() {
256            Ok(x) => Ok(x),
257            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
258        }
259    }
260}
261
262/// An opaque fencing token used in compare_and_downgrade_since.
263pub trait Opaque: PartialEq + Clone + Sized + 'static {
264    /// The value of the opaque token when no compare_and_downgrade_since calls
265    /// have yet been successful.
266    fn initial() -> Self;
267}
268
269/// Advance a timestamp by the least amount possible such that
270/// `ts.less_than(ts.step_forward())` is true.
271///
272/// TODO: Unify this with repr's TimestampManipulation. Or, ideally, get rid of
273/// it entirely by making txn-wal methods take an `advance_to` argument.
274pub trait StepForward {
275    /// Advance a timestamp by the least amount possible such that
276    /// `ts.less_than(ts.step_forward())` is true. Panic if unable to do so.
277    fn step_forward(&self) -> Self;
278}
279
280impl StepForward for u64 {
281    fn step_forward(&self) -> Self {
282        self.checked_add(1).unwrap()
283    }
284}