mz_persist_types/lib.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types for the persist crate.
11
12#![warn(missing_docs)]
13#![warn(
14 clippy::cast_possible_truncation,
15 clippy::cast_precision_loss,
16 clippy::cast_sign_loss
17)]
18
19use std::fmt::Debug;
20
21use bytes::{BufMut, Bytes};
22use mz_ore::url::SensitiveUrl;
23use mz_proto::{RustType, TryFromProtoError};
24use proptest_derive::Arbitrary;
25use serde::{Deserialize, Serialize};
26use uuid::Uuid;
27
28use crate::columnar::Schema;
29
30pub mod arrow;
31pub mod codec_impls;
32pub mod columnar;
33pub mod parquet;
34pub mod part;
35pub mod schema;
36pub mod stats;
37pub mod timestamp;
38pub mod txn;
39
40/// Encoding and decoding operations for a type usable as a persisted key or
41/// value.
42pub trait Codec: Default + Sized + PartialEq + 'static {
43 /// The type of the associated schema for [Self].
44 ///
45 /// This is a separate type because Row is not self-describing. For Row, you
46 /// need a RelationDesc to determine the types of any columns that are
47 /// Datum::Null.
48 type Schema: Schema<Self> + PartialEq;
49
50 /// Name of the codec.
51 ///
52 /// This name is stored for the key and value when a stream is first created
53 /// and the same key and value codec must be used for that stream afterward.
54 fn codec_name() -> String;
55 /// Encode a key or value for permanent storage.
56 ///
57 /// This must perfectly round-trip Self through [Codec::decode]. If the
58 /// encode function for this codec ever changes, decode must be able to
59 /// handle bytes output by all previous versions of encode.
60 fn encode<B>(&self, buf: &mut B)
61 where
62 B: BufMut;
63
64 /// Encode a key or value to a Vec.
65 ///
66 /// This is a convenience function for calling [Self::encode] with a fresh
67 /// `Vec` each time. Reuse an allocation when performance matters!
68 fn encode_to_vec(&self) -> Vec<u8> {
69 let mut buf = vec![];
70 self.encode(&mut buf);
71 buf
72 }
73
74 /// Decode a key or value previous encoded with this codec's
75 /// [Codec::encode].
76 ///
77 /// This must perfectly round-trip Self through [Codec::encode]. If the
78 /// encode function for this codec ever changes, decode must be able to
79 /// handle bytes output by all previous versions of encode.
80 ///
81 /// It should also gracefully handle data encoded by future versions of
82 /// encode (likely with an error).
83 //
84 // TODO: Mechanically, this could return a ref to the original bytes
85 // without any copies, see if we can make the types work out for that.
86 fn decode<'a>(buf: &'a [u8], schema: &Self::Schema) -> Result<Self, String>;
87
88 /// A type used with [Self::decode_from] for allocation reuse. Set to `()`
89 /// if unnecessary.
90 type Storage: Default + Send + Sync;
91 /// An alternate form of [Self::decode] which enables amortizing allocs.
92 ///
93 /// First, instead of returning `Self`, it takes `&mut Self` as a parameter,
94 /// allowing for reuse of the internal `Row`/`SourceData` allocations.
95 ///
96 /// Second, it adds a `type Storage` to `Codec` and also passes it in as
97 /// `&mut Option<Self::Storage>`, allowing for reuse of
98 /// `ProtoRow`/`ProtoSourceData` allocations. If `Some`, this impl may
99 /// attempt to reuse allocations with it. It may also leave allocations in
100 /// it for use in future calls to `decode_from`.
101 ///
102 /// A default implementation is provided for `Codec` impls that can't take
103 /// advantage of this.
104 fn decode_from<'a>(
105 &mut self,
106 buf: &'a [u8],
107 _storage: &mut Option<Self::Storage>,
108 schema: &Self::Schema,
109 ) -> Result<(), String> {
110 *self = Self::decode(buf, schema)?;
111 Ok(())
112 }
113
114 /// Checks that the given value matches the provided schema.
115 ///
116 /// A no-op default implementation is provided for convenience.
117 fn validate(_val: &Self, _schema: &Self::Schema) -> Result<(), String> {
118 Ok(())
119 }
120
121 /// Encode a schema for permanent storage.
122 ///
123 /// This must perfectly round-trip the schema through [Self::decode_schema].
124 /// If the encode_schema function ever changes, decode_schema must be able
125 /// to handle bytes output by all previous versions of encode_schema.
126 ///
127 /// TODO: Move this to instead be a new trait that is required by
128 /// Self::Schema?
129 fn encode_schema(schema: &Self::Schema) -> Bytes;
130
131 /// Decode a schema previous encoded with this codec's
132 /// [Self::encode_schema].
133 ///
134 /// This must perfectly round-trip the schema through [Self::encode_schema].
135 /// If the encode_schema function ever changes, decode_schema must be able
136 /// to handle bytes output by all previous versions of encode_schema.
137 fn decode_schema(buf: &Bytes) -> Self::Schema;
138}
139
140/// Encoding and decoding operations for a type usable as a persisted timestamp
141/// or diff.
142pub trait Codec64: Sized + Clone + Debug + 'static {
143 /// Name of the codec.
144 ///
145 /// This name is stored for the timestamp and diff when a stream is first
146 /// created and the same timestamp and diff codec must be used for that
147 /// stream afterward.
148 fn codec_name() -> String;
149
150 /// Encode a timestamp or diff for permanent storage.
151 ///
152 /// This must perfectly round-trip Self through [Codec64::decode]. If the
153 /// encode function for this codec ever changes, decode must be able to
154 /// handle bytes output by all previous versions of encode.
155 fn encode(&self) -> [u8; 8];
156
157 /// Decode a timestamp or diff previous encoded with this codec's
158 /// [Codec64::encode].
159 ///
160 /// This must perfectly round-trip Self through [Codec64::encode]. If the
161 /// encode function for this codec ever changes, decode must be able to
162 /// handle bytes output by all previous versions of encode.
163 fn decode(buf: [u8; 8]) -> Self;
164}
165
166/// A location in s3, other cloud storage, or otherwise "durable storage" used
167/// by persist.
168///
169/// This structure can be durably written down or transmitted for use by other
170/// processes. This location can contain any number of persist shards.
171#[derive(Arbitrary, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)]
172pub struct PersistLocation {
173 /// Uri string that identifies the blob store.
174 pub blob_uri: SensitiveUrl,
175
176 /// Uri string that identifies the consensus system.
177 pub consensus_uri: SensitiveUrl,
178}
179
180impl PersistLocation {
181 /// Returns a PersistLocation indicating in-mem blob and consensus.
182 pub fn new_in_mem() -> Self {
183 PersistLocation {
184 blob_uri: "mem://".parse().unwrap(),
185 consensus_uri: "mem://".parse().unwrap(),
186 }
187 }
188}
189
190/// An opaque identifier for a persist durable TVC (aka shard).
191///
192/// The [std::string::ToString::to_string] format of this may be stored durably
193/// or otherwise used as an interchange format. It can be parsed back using
194/// [str::parse] or [std::str::FromStr::from_str].
195#[derive(
196 Arbitrary, Clone, Copy, Default, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize,
197)]
198#[serde(try_from = "String", into = "String")]
199pub struct ShardId([u8; 16]);
200
201impl std::fmt::Display for ShardId {
202 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 write!(f, "s{}", Uuid::from_bytes(self.0))
204 }
205}
206
207impl std::fmt::Debug for ShardId {
208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 write!(f, "ShardId({})", Uuid::from_bytes(self.0))
210 }
211}
212
213impl std::str::FromStr for ShardId {
214 type Err = String;
215
216 fn from_str(s: &str) -> Result<Self, Self::Err> {
217 let uuid_encoded = match s.strip_prefix('s') {
218 Some(x) => x,
219 None => return Err(format!("invalid ShardId {}: incorrect prefix", s)),
220 };
221 let uuid = Uuid::parse_str(uuid_encoded)
222 .map_err(|err| format!("invalid ShardId {}: {}", s, err))?;
223 Ok(ShardId(*uuid.as_bytes()))
224 }
225}
226
227impl From<ShardId> for String {
228 fn from(shard_id: ShardId) -> Self {
229 shard_id.to_string()
230 }
231}
232
233impl TryFrom<String> for ShardId {
234 type Error = String;
235
236 fn try_from(s: String) -> Result<Self, Self::Error> {
237 s.parse()
238 }
239}
240
241impl ShardId {
242 /// Returns a random [ShardId] that is reasonably likely to have never been
243 /// generated before.
244 pub fn new() -> Self {
245 ShardId(Uuid::new_v4().as_bytes().to_owned())
246 }
247}
248
249impl RustType<String> for ShardId {
250 fn into_proto(&self) -> String {
251 self.to_string()
252 }
253
254 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
255 match proto.parse() {
256 Ok(x) => Ok(x),
257 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
258 }
259 }
260}
261
262/// An opaque fencing token used in compare_and_downgrade_since.
263pub trait Opaque: PartialEq + Clone + Sized + 'static {
264 /// The value of the opaque token when no compare_and_downgrade_since calls
265 /// have yet been successful.
266 fn initial() -> Self;
267}
268
269/// Advance a timestamp by the least amount possible such that
270/// `ts.less_than(ts.step_forward())` is true.
271///
272/// TODO: Unify this with repr's TimestampManipulation. Or, ideally, get rid of
273/// it entirely by making txn-wal methods take an `advance_to` argument.
274pub trait StepForward {
275 /// Advance a timestamp by the least amount possible such that
276 /// `ts.less_than(ts.step_forward())` is true. Panic if unable to do so.
277 fn step_forward(&self) -> Self;
278}
279
280impl StepForward for u64 {
281 fn step_forward(&self) -> Self {
282 self.checked_add(1).unwrap()
283 }
284}