mz_persist_types/lib.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types for the persist crate.
11
12#![warn(missing_docs)]
13#![warn(
14 clippy::cast_possible_truncation,
15 clippy::cast_precision_loss,
16 clippy::cast_sign_loss
17)]
18
19use std::fmt::Debug;
20
21use crate::columnar::Schema;
22use bytes::{BufMut, Bytes};
23use mz_ore::url::SensitiveUrl;
24use mz_proto::{RustType, TryFromProtoError};
25use proptest_derive::Arbitrary;
26use serde::{Deserialize, Serialize};
27use uuid::Uuid;
28
29pub mod arrow;
30pub mod codec_impls;
31pub mod columnar;
32pub mod parquet;
33pub mod part;
34pub mod schema;
35pub mod stats;
36pub mod timestamp;
37pub mod txn;
38
39/// Encoding and decoding operations for a type usable as a persisted key or
40/// value.
41pub trait Codec: Default + Sized + PartialEq + 'static {
42 /// The type of the associated schema for [Self].
43 ///
44 /// This is a separate type because Row is not self-describing. For Row, you
45 /// need a RelationDesc to determine the types of any columns that are
46 /// Datum::Null.
47 type Schema: Schema<Self> + PartialEq;
48
49 /// Name of the codec.
50 ///
51 /// This name is stored for the key and value when a stream is first created
52 /// and the same key and value codec must be used for that stream afterward.
53 fn codec_name() -> String;
54 /// Encode a key or value for permanent storage.
55 ///
56 /// This must perfectly round-trip Self through [Codec::decode]. If the
57 /// encode function for this codec ever changes, decode must be able to
58 /// handle bytes output by all previous versions of encode.
59 fn encode<B>(&self, buf: &mut B)
60 where
61 B: BufMut;
62
63 /// Encode a key or value to a Vec.
64 ///
65 /// This is a convenience function for calling [Self::encode] with a fresh
66 /// `Vec` each time. Reuse an allocation when performance matters!
67 fn encode_to_vec(&self) -> Vec<u8> {
68 let mut buf = vec![];
69 self.encode(&mut buf);
70 buf
71 }
72
73 /// Decode a key or value previous encoded with this codec's
74 /// [Codec::encode].
75 ///
76 /// This must perfectly round-trip Self through [Codec::encode]. If the
77 /// encode function for this codec ever changes, decode must be able to
78 /// handle bytes output by all previous versions of encode.
79 ///
80 /// It should also gracefully handle data encoded by future versions of
81 /// encode (likely with an error).
82 //
83 // TODO: Mechanically, this could return a ref to the original bytes
84 // without any copies, see if we can make the types work out for that.
85 fn decode<'a>(buf: &'a [u8], schema: &Self::Schema) -> Result<Self, String>;
86
87 /// A type used with [Self::decode_from] for allocation reuse. Set to `()`
88 /// if unnecessary.
89 type Storage: Default + Send + Sync;
90 /// An alternate form of [Self::decode] which enables amortizing allocs.
91 ///
92 /// First, instead of returning `Self`, it takes `&mut Self` as a parameter,
93 /// allowing for reuse of the internal `Row`/`SourceData` allocations.
94 ///
95 /// Second, it adds a `type Storage` to `Codec` and also passes it in as
96 /// `&mut Option<Self::Storage>`, allowing for reuse of
97 /// `ProtoRow`/`ProtoSourceData` allocations. If `Some`, this impl may
98 /// attempt to reuse allocations with it. It may also leave allocations in
99 /// it for use in future calls to `decode_from`.
100 ///
101 /// A default implementation is provided for `Codec` impls that can't take
102 /// advantage of this.
103 fn decode_from<'a>(
104 &mut self,
105 buf: &'a [u8],
106 _storage: &mut Option<Self::Storage>,
107 schema: &Self::Schema,
108 ) -> Result<(), String> {
109 *self = Self::decode(buf, schema)?;
110 Ok(())
111 }
112
113 /// Checks that the given value matches the provided schema.
114 ///
115 /// A no-op default implementation is provided for convenience.
116 fn validate(_val: &Self, _schema: &Self::Schema) -> Result<(), String> {
117 Ok(())
118 }
119
120 /// Encode a schema for permanent storage.
121 ///
122 /// This must perfectly round-trip the schema through [Self::decode_schema].
123 /// If the encode_schema function ever changes, decode_schema must be able
124 /// to handle bytes output by all previous versions of encode_schema.
125 ///
126 /// TODO: Move this to instead be a new trait that is required by
127 /// Self::Schema?
128 fn encode_schema(schema: &Self::Schema) -> Bytes;
129
130 /// Decode a schema previous encoded with this codec's
131 /// [Self::encode_schema].
132 ///
133 /// This must perfectly round-trip the schema through [Self::encode_schema].
134 /// If the encode_schema function ever changes, decode_schema must be able
135 /// to handle bytes output by all previous versions of encode_schema.
136 fn decode_schema(buf: &Bytes) -> Self::Schema;
137}
138
139/// Encoding and decoding operations for a type usable as a persisted timestamp
140/// or diff.
141pub trait Codec64: Sized + Clone + Debug + 'static {
142 /// Name of the codec.
143 ///
144 /// This name is stored for the timestamp and diff when a stream is first
145 /// created and the same timestamp and diff codec must be used for that
146 /// stream afterward.
147 fn codec_name() -> String;
148
149 /// Encode a timestamp or diff for permanent storage.
150 ///
151 /// This must perfectly round-trip Self through [Codec64::decode]. If the
152 /// encode function for this codec ever changes, decode must be able to
153 /// handle bytes output by all previous versions of encode.
154 fn encode(&self) -> [u8; 8];
155
156 /// Decode a timestamp or diff previous encoded with this codec's
157 /// [Codec64::encode].
158 ///
159 /// This must perfectly round-trip Self through [Codec64::encode]. If the
160 /// encode function for this codec ever changes, decode must be able to
161 /// handle bytes output by all previous versions of encode.
162 fn decode(buf: [u8; 8]) -> Self;
163}
164
165/// A location in s3, other cloud storage, or otherwise "durable storage" used
166/// by persist.
167///
168/// This structure can be durably written down or transmitted for use by other
169/// processes. This location can contain any number of persist shards.
170#[derive(
171 Arbitrary,
172 Debug,
173 Clone,
174 PartialEq,
175 Eq,
176 PartialOrd,
177 Ord,
178 Hash,
179 Deserialize,
180 Serialize
181)]
182pub struct PersistLocation {
183 /// Uri string that identifies the blob store.
184 pub blob_uri: SensitiveUrl,
185
186 /// Uri string that identifies the consensus system.
187 pub consensus_uri: SensitiveUrl,
188}
189
190impl PersistLocation {
191 /// Returns a PersistLocation indicating in-mem blob and consensus.
192 pub fn new_in_mem() -> Self {
193 PersistLocation {
194 blob_uri: "mem://".parse().unwrap(),
195 consensus_uri: "mem://".parse().unwrap(),
196 }
197 }
198}
199
200/// An opaque identifier for a persist durable TVC (aka shard).
201///
202/// The [std::string::ToString::to_string] format of this may be stored durably
203/// or otherwise used as an interchange format. It can be parsed back using
204/// [str::parse] or [std::str::FromStr::from_str].
205#[derive(
206 Arbitrary,
207 Clone,
208 Copy,
209 Default,
210 PartialEq,
211 Eq,
212 Hash,
213 PartialOrd,
214 Ord,
215 Serialize,
216 Deserialize
217)]
218#[serde(try_from = "String", into = "String")]
219pub struct ShardId([u8; 16]);
220
221impl std::fmt::Display for ShardId {
222 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223 write!(f, "s{}", Uuid::from_bytes(self.0))
224 }
225}
226
227impl std::fmt::Debug for ShardId {
228 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 write!(f, "ShardId({})", Uuid::from_bytes(self.0))
230 }
231}
232
233impl std::str::FromStr for ShardId {
234 type Err = String;
235
236 fn from_str(s: &str) -> Result<Self, Self::Err> {
237 let uuid_encoded = match s.strip_prefix('s') {
238 Some(x) => x,
239 None => return Err(format!("invalid ShardId {}: incorrect prefix", s)),
240 };
241 let uuid = Uuid::parse_str(uuid_encoded)
242 .map_err(|err| format!("invalid ShardId {}: {}", s, err))?;
243 Ok(ShardId(*uuid.as_bytes()))
244 }
245}
246
247impl From<ShardId> for String {
248 fn from(shard_id: ShardId) -> Self {
249 shard_id.to_string()
250 }
251}
252
253impl TryFrom<String> for ShardId {
254 type Error = String;
255
256 fn try_from(s: String) -> Result<Self, Self::Error> {
257 s.parse()
258 }
259}
260
261impl ShardId {
262 /// Returns a random [ShardId] that is reasonably likely to have never been
263 /// generated before.
264 pub fn new() -> Self {
265 ShardId(Uuid::new_v4().as_bytes().to_owned())
266 }
267}
268
269impl RustType<String> for ShardId {
270 fn into_proto(&self) -> String {
271 self.to_string()
272 }
273
274 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
275 match proto.parse() {
276 Ok(x) => Ok(x),
277 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
278 }
279 }
280}
281
282/// Advance a timestamp by the least amount possible such that
283/// `ts.less_than(ts.step_forward())` is true.
284///
285/// TODO: Unify this with repr's TimestampManipulation. Or, ideally, get rid of
286/// it entirely by making txn-wal methods take an `advance_to` argument.
287pub trait StepForward {
288 /// Advance a timestamp by the least amount possible such that
289 /// `ts.less_than(ts.step_forward())` is true. Panic if unable to do so.
290 fn step_forward(&self) -> Self;
291}
292
293impl StepForward for u64 {
294 fn step_forward(&self) -> Self {
295 self.checked_add(1).unwrap()
296 }
297}