mz_persist_types/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types for the persist crate.
11
12#![warn(missing_docs)]
13#![warn(
14    clippy::cast_possible_truncation,
15    clippy::cast_precision_loss,
16    clippy::cast_sign_loss
17)]
18
19use bytes::{BufMut, Bytes};
20use mz_ore::url::SensitiveUrl;
21use mz_proto::{RustType, TryFromProtoError};
22use proptest_derive::Arbitrary;
23use serde::{Deserialize, Serialize};
24use uuid::Uuid;
25
26use crate::columnar::Schema;
27
28pub mod arrow;
29pub mod codec_impls;
30pub mod columnar;
31pub mod parquet;
32pub mod part;
33pub mod schema;
34pub mod stats;
35pub mod timestamp;
36pub mod txn;
37
38/// Encoding and decoding operations for a type usable as a persisted key or
39/// value.
40pub trait Codec: Default + Sized + PartialEq + 'static {
41    /// The type of the associated schema for [Self].
42    ///
43    /// This is a separate type because Row is not self-describing. For Row, you
44    /// need a RelationDesc to determine the types of any columns that are
45    /// Datum::Null.
46    type Schema: Schema<Self> + PartialEq;
47
48    /// Name of the codec.
49    ///
50    /// This name is stored for the key and value when a stream is first created
51    /// and the same key and value codec must be used for that stream afterward.
52    fn codec_name() -> String;
53    /// Encode a key or value for permanent storage.
54    ///
55    /// This must perfectly round-trip Self through [Codec::decode]. If the
56    /// encode function for this codec ever changes, decode must be able to
57    /// handle bytes output by all previous versions of encode.
58    fn encode<B>(&self, buf: &mut B)
59    where
60        B: BufMut;
61
62    /// Encode a key or value to a Vec.
63    ///
64    /// This is a convenience function for calling [Self::encode] with a fresh
65    /// `Vec` each time. Reuse an allocation when performance matters!
66    fn encode_to_vec(&self) -> Vec<u8> {
67        let mut buf = vec![];
68        self.encode(&mut buf);
69        buf
70    }
71
72    /// Decode a key or value previous encoded with this codec's
73    /// [Codec::encode].
74    ///
75    /// This must perfectly round-trip Self through [Codec::encode]. If the
76    /// encode function for this codec ever changes, decode must be able to
77    /// handle bytes output by all previous versions of encode.
78    ///
79    /// It should also gracefully handle data encoded by future versions of
80    /// encode (likely with an error).
81    //
82    // TODO: Mechanically, this could return a ref to the original bytes
83    // without any copies, see if we can make the types work out for that.
84    fn decode<'a>(buf: &'a [u8], schema: &Self::Schema) -> Result<Self, String>;
85
86    /// A type used with [Self::decode_from] for allocation reuse. Set to `()`
87    /// if unnecessary.
88    type Storage: Default + Send + Sync;
89    /// An alternate form of [Self::decode] which enables amortizing allocs.
90    ///
91    /// First, instead of returning `Self`, it takes `&mut Self` as a parameter,
92    /// allowing for reuse of the internal `Row`/`SourceData` allocations.
93    ///
94    /// Second, it adds a `type Storage` to `Codec` and also passes it in as
95    /// `&mut Option<Self::Storage>`, allowing for reuse of
96    /// `ProtoRow`/`ProtoSourceData` allocations. If `Some`, this impl may
97    /// attempt to reuse allocations with it. It may also leave allocations in
98    /// it for use in future calls to `decode_from`.
99    ///
100    /// A default implementation is provided for `Codec` impls that can't take
101    /// advantage of this.
102    fn decode_from<'a>(
103        &mut self,
104        buf: &'a [u8],
105        _storage: &mut Option<Self::Storage>,
106        schema: &Self::Schema,
107    ) -> Result<(), String> {
108        *self = Self::decode(buf, schema)?;
109        Ok(())
110    }
111
112    /// Checks that the given value matches the provided schema.
113    ///
114    /// A no-op default implementation is provided for convenience.
115    fn validate(_val: &Self, _schema: &Self::Schema) -> Result<(), String> {
116        Ok(())
117    }
118
119    /// Encode a schema for permanent storage.
120    ///
121    /// This must perfectly round-trip the schema through [Self::decode_schema].
122    /// If the encode_schema function ever changes, decode_schema must be able
123    /// to handle bytes output by all previous versions of encode_schema.
124    ///
125    /// TODO: Move this to instead be a new trait that is required by
126    /// Self::Schema?
127    fn encode_schema(schema: &Self::Schema) -> Bytes;
128
129    /// Decode a schema previous encoded with this codec's
130    /// [Self::encode_schema].
131    ///
132    /// This must perfectly round-trip the schema through [Self::encode_schema].
133    /// If the encode_schema function ever changes, decode_schema must be able
134    /// to handle bytes output by all previous versions of encode_schema.
135    fn decode_schema(buf: &Bytes) -> Self::Schema;
136}
137
138/// Encoding and decoding operations for a type usable as a persisted timestamp
139/// or diff.
140pub trait Codec64: Sized + Clone + 'static {
141    /// Name of the codec.
142    ///
143    /// This name is stored for the timestamp and diff when a stream is first
144    /// created and the same timestamp and diff codec must be used for that
145    /// stream afterward.
146    fn codec_name() -> String;
147
148    /// Encode a timestamp or diff for permanent storage.
149    ///
150    /// This must perfectly round-trip Self through [Codec64::decode]. If the
151    /// encode function for this codec ever changes, decode must be able to
152    /// handle bytes output by all previous versions of encode.
153    fn encode(&self) -> [u8; 8];
154
155    /// Decode a timestamp or diff previous encoded with this codec's
156    /// [Codec64::encode].
157    ///
158    /// This must perfectly round-trip Self through [Codec64::encode]. If the
159    /// encode function for this codec ever changes, decode must be able to
160    /// handle bytes output by all previous versions of encode.
161    fn decode(buf: [u8; 8]) -> Self;
162}
163
164/// A location in s3, other cloud storage, or otherwise "durable storage" used
165/// by persist.
166///
167/// This structure can be durably written down or transmitted for use by other
168/// processes. This location can contain any number of persist shards.
169#[derive(Arbitrary, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)]
170pub struct PersistLocation {
171    /// Uri string that identifies the blob store.
172    pub blob_uri: SensitiveUrl,
173
174    /// Uri string that identifies the consensus system.
175    pub consensus_uri: SensitiveUrl,
176}
177
178impl PersistLocation {
179    /// Returns a PersistLocation indicating in-mem blob and consensus.
180    pub fn new_in_mem() -> Self {
181        PersistLocation {
182            blob_uri: "mem://".parse().unwrap(),
183            consensus_uri: "mem://".parse().unwrap(),
184        }
185    }
186}
187
188/// An opaque identifier for a persist durable TVC (aka shard).
189///
190/// The [std::string::ToString::to_string] format of this may be stored durably
191/// or otherwise used as an interchange format. It can be parsed back using
192/// [str::parse] or [std::str::FromStr::from_str].
193#[derive(
194    Arbitrary, Clone, Copy, Default, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize,
195)]
196#[serde(try_from = "String", into = "String")]
197pub struct ShardId([u8; 16]);
198
199impl std::fmt::Display for ShardId {
200    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201        write!(f, "s{}", Uuid::from_bytes(self.0))
202    }
203}
204
205impl std::fmt::Debug for ShardId {
206    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207        write!(f, "ShardId({})", Uuid::from_bytes(self.0))
208    }
209}
210
211impl std::str::FromStr for ShardId {
212    type Err = String;
213
214    fn from_str(s: &str) -> Result<Self, Self::Err> {
215        let uuid_encoded = match s.strip_prefix('s') {
216            Some(x) => x,
217            None => return Err(format!("invalid ShardId {}: incorrect prefix", s)),
218        };
219        let uuid = Uuid::parse_str(uuid_encoded)
220            .map_err(|err| format!("invalid ShardId {}: {}", s, err))?;
221        Ok(ShardId(*uuid.as_bytes()))
222    }
223}
224
225impl From<ShardId> for String {
226    fn from(shard_id: ShardId) -> Self {
227        shard_id.to_string()
228    }
229}
230
231impl TryFrom<String> for ShardId {
232    type Error = String;
233
234    fn try_from(s: String) -> Result<Self, Self::Error> {
235        s.parse()
236    }
237}
238
239impl ShardId {
240    /// Returns a random [ShardId] that is reasonably likely to have never been
241    /// generated before.
242    pub fn new() -> Self {
243        ShardId(Uuid::new_v4().as_bytes().to_owned())
244    }
245}
246
247impl RustType<String> for ShardId {
248    fn into_proto(&self) -> String {
249        self.to_string()
250    }
251
252    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
253        match proto.parse() {
254            Ok(x) => Ok(x),
255            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
256        }
257    }
258}
259
260/// An opaque fencing token used in compare_and_downgrade_since.
261pub trait Opaque: PartialEq + Clone + Sized + 'static {
262    /// The value of the opaque token when no compare_and_downgrade_since calls
263    /// have yet been successful.
264    fn initial() -> Self;
265}
266
267/// Advance a timestamp by the least amount possible such that
268/// `ts.less_than(ts.step_forward())` is true.
269///
270/// TODO: Unify this with repr's TimestampManipulation. Or, ideally, get rid of
271/// it entirely by making txn-wal methods take an `advance_to` argument.
272pub trait StepForward {
273    /// Advance a timestamp by the least amount possible such that
274    /// `ts.less_than(ts.step_forward())` is true. Panic if unable to do so.
275    fn step_forward(&self) -> Self;
276}
277
278impl StepForward for u64 {
279    fn step_forward(&self) -> Self {
280        self.checked_add(1).unwrap()
281    }
282}