mz_persist_types/lib.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types for the persist crate.
11
12#![warn(missing_docs)]
13#![warn(
14 clippy::cast_possible_truncation,
15 clippy::cast_precision_loss,
16 clippy::cast_sign_loss
17)]
18
19use bytes::{BufMut, Bytes};
20use mz_ore::url::SensitiveUrl;
21use mz_proto::{RustType, TryFromProtoError};
22use proptest_derive::Arbitrary;
23use serde::{Deserialize, Serialize};
24use uuid::Uuid;
25
26use crate::columnar::Schema;
27
28pub mod arrow;
29pub mod codec_impls;
30pub mod columnar;
31pub mod parquet;
32pub mod part;
33pub mod schema;
34pub mod stats;
35pub mod timestamp;
36pub mod txn;
37
38/// Encoding and decoding operations for a type usable as a persisted key or
39/// value.
40pub trait Codec: Default + Sized + PartialEq + 'static {
41 /// The type of the associated schema for [Self].
42 ///
43 /// This is a separate type because Row is not self-describing. For Row, you
44 /// need a RelationDesc to determine the types of any columns that are
45 /// Datum::Null.
46 type Schema: Schema<Self> + PartialEq;
47
48 /// Name of the codec.
49 ///
50 /// This name is stored for the key and value when a stream is first created
51 /// and the same key and value codec must be used for that stream afterward.
52 fn codec_name() -> String;
53 /// Encode a key or value for permanent storage.
54 ///
55 /// This must perfectly round-trip Self through [Codec::decode]. If the
56 /// encode function for this codec ever changes, decode must be able to
57 /// handle bytes output by all previous versions of encode.
58 fn encode<B>(&self, buf: &mut B)
59 where
60 B: BufMut;
61
62 /// Encode a key or value to a Vec.
63 ///
64 /// This is a convenience function for calling [Self::encode] with a fresh
65 /// `Vec` each time. Reuse an allocation when performance matters!
66 fn encode_to_vec(&self) -> Vec<u8> {
67 let mut buf = vec![];
68 self.encode(&mut buf);
69 buf
70 }
71
72 /// Decode a key or value previous encoded with this codec's
73 /// [Codec::encode].
74 ///
75 /// This must perfectly round-trip Self through [Codec::encode]. If the
76 /// encode function for this codec ever changes, decode must be able to
77 /// handle bytes output by all previous versions of encode.
78 ///
79 /// It should also gracefully handle data encoded by future versions of
80 /// encode (likely with an error).
81 //
82 // TODO: Mechanically, this could return a ref to the original bytes
83 // without any copies, see if we can make the types work out for that.
84 fn decode<'a>(buf: &'a [u8], schema: &Self::Schema) -> Result<Self, String>;
85
86 /// A type used with [Self::decode_from] for allocation reuse. Set to `()`
87 /// if unnecessary.
88 type Storage: Default + Send + Sync;
89 /// An alternate form of [Self::decode] which enables amortizing allocs.
90 ///
91 /// First, instead of returning `Self`, it takes `&mut Self` as a parameter,
92 /// allowing for reuse of the internal `Row`/`SourceData` allocations.
93 ///
94 /// Second, it adds a `type Storage` to `Codec` and also passes it in as
95 /// `&mut Option<Self::Storage>`, allowing for reuse of
96 /// `ProtoRow`/`ProtoSourceData` allocations. If `Some`, this impl may
97 /// attempt to reuse allocations with it. It may also leave allocations in
98 /// it for use in future calls to `decode_from`.
99 ///
100 /// A default implementation is provided for `Codec` impls that can't take
101 /// advantage of this.
102 fn decode_from<'a>(
103 &mut self,
104 buf: &'a [u8],
105 _storage: &mut Option<Self::Storage>,
106 schema: &Self::Schema,
107 ) -> Result<(), String> {
108 *self = Self::decode(buf, schema)?;
109 Ok(())
110 }
111
112 /// Checks that the given value matches the provided schema.
113 ///
114 /// A no-op default implementation is provided for convenience.
115 fn validate(_val: &Self, _schema: &Self::Schema) -> Result<(), String> {
116 Ok(())
117 }
118
119 /// Encode a schema for permanent storage.
120 ///
121 /// This must perfectly round-trip the schema through [Self::decode_schema].
122 /// If the encode_schema function ever changes, decode_schema must be able
123 /// to handle bytes output by all previous versions of encode_schema.
124 ///
125 /// TODO: Move this to instead be a new trait that is required by
126 /// Self::Schema?
127 fn encode_schema(schema: &Self::Schema) -> Bytes;
128
129 /// Decode a schema previous encoded with this codec's
130 /// [Self::encode_schema].
131 ///
132 /// This must perfectly round-trip the schema through [Self::encode_schema].
133 /// If the encode_schema function ever changes, decode_schema must be able
134 /// to handle bytes output by all previous versions of encode_schema.
135 fn decode_schema(buf: &Bytes) -> Self::Schema;
136}
137
138/// Encoding and decoding operations for a type usable as a persisted timestamp
139/// or diff.
140pub trait Codec64: Sized + Clone + 'static {
141 /// Name of the codec.
142 ///
143 /// This name is stored for the timestamp and diff when a stream is first
144 /// created and the same timestamp and diff codec must be used for that
145 /// stream afterward.
146 fn codec_name() -> String;
147
148 /// Encode a timestamp or diff for permanent storage.
149 ///
150 /// This must perfectly round-trip Self through [Codec64::decode]. If the
151 /// encode function for this codec ever changes, decode must be able to
152 /// handle bytes output by all previous versions of encode.
153 fn encode(&self) -> [u8; 8];
154
155 /// Decode a timestamp or diff previous encoded with this codec's
156 /// [Codec64::encode].
157 ///
158 /// This must perfectly round-trip Self through [Codec64::encode]. If the
159 /// encode function for this codec ever changes, decode must be able to
160 /// handle bytes output by all previous versions of encode.
161 fn decode(buf: [u8; 8]) -> Self;
162}
163
164/// A location in s3, other cloud storage, or otherwise "durable storage" used
165/// by persist.
166///
167/// This structure can be durably written down or transmitted for use by other
168/// processes. This location can contain any number of persist shards.
169#[derive(Arbitrary, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)]
170pub struct PersistLocation {
171 /// Uri string that identifies the blob store.
172 pub blob_uri: SensitiveUrl,
173
174 /// Uri string that identifies the consensus system.
175 pub consensus_uri: SensitiveUrl,
176}
177
178impl PersistLocation {
179 /// Returns a PersistLocation indicating in-mem blob and consensus.
180 pub fn new_in_mem() -> Self {
181 PersistLocation {
182 blob_uri: "mem://".parse().unwrap(),
183 consensus_uri: "mem://".parse().unwrap(),
184 }
185 }
186}
187
188/// An opaque identifier for a persist durable TVC (aka shard).
189///
190/// The [std::string::ToString::to_string] format of this may be stored durably
191/// or otherwise used as an interchange format. It can be parsed back using
192/// [str::parse] or [std::str::FromStr::from_str].
193#[derive(
194 Arbitrary, Clone, Copy, Default, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize,
195)]
196#[serde(try_from = "String", into = "String")]
197pub struct ShardId([u8; 16]);
198
199impl std::fmt::Display for ShardId {
200 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201 write!(f, "s{}", Uuid::from_bytes(self.0))
202 }
203}
204
205impl std::fmt::Debug for ShardId {
206 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207 write!(f, "ShardId({})", Uuid::from_bytes(self.0))
208 }
209}
210
211impl std::str::FromStr for ShardId {
212 type Err = String;
213
214 fn from_str(s: &str) -> Result<Self, Self::Err> {
215 let uuid_encoded = match s.strip_prefix('s') {
216 Some(x) => x,
217 None => return Err(format!("invalid ShardId {}: incorrect prefix", s)),
218 };
219 let uuid = Uuid::parse_str(uuid_encoded)
220 .map_err(|err| format!("invalid ShardId {}: {}", s, err))?;
221 Ok(ShardId(*uuid.as_bytes()))
222 }
223}
224
225impl From<ShardId> for String {
226 fn from(shard_id: ShardId) -> Self {
227 shard_id.to_string()
228 }
229}
230
231impl TryFrom<String> for ShardId {
232 type Error = String;
233
234 fn try_from(s: String) -> Result<Self, Self::Error> {
235 s.parse()
236 }
237}
238
239impl ShardId {
240 /// Returns a random [ShardId] that is reasonably likely to have never been
241 /// generated before.
242 pub fn new() -> Self {
243 ShardId(Uuid::new_v4().as_bytes().to_owned())
244 }
245}
246
247impl RustType<String> for ShardId {
248 fn into_proto(&self) -> String {
249 self.to_string()
250 }
251
252 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
253 match proto.parse() {
254 Ok(x) => Ok(x),
255 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
256 }
257 }
258}
259
260/// An opaque fencing token used in compare_and_downgrade_since.
261pub trait Opaque: PartialEq + Clone + Sized + 'static {
262 /// The value of the opaque token when no compare_and_downgrade_since calls
263 /// have yet been successful.
264 fn initial() -> Self;
265}
266
267/// Advance a timestamp by the least amount possible such that
268/// `ts.less_than(ts.step_forward())` is true.
269///
270/// TODO: Unify this with repr's TimestampManipulation. Or, ideally, get rid of
271/// it entirely by making txn-wal methods take an `advance_to` argument.
272pub trait StepForward {
273 /// Advance a timestamp by the least amount possible such that
274 /// `ts.less_than(ts.step_forward())` is true. Panic if unable to do so.
275 fn step_forward(&self) -> Self;
276}
277
278impl StepForward for u64 {
279 fn step_forward(&self) -> Self {
280 self.checked_add(1).unwrap()
281 }
282}