Skip to main content

mz_persist/
foundationdb.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of [Consensus] backed by FoundationDB.
11//!
12//! We're storing the consensus data in a subspace. Each key maps to a subspace
13//! with the following structure:
14//! * `./keys/<key> -> ()` to track existing keys.
15//! * `./data/<key>/<seqno> -> <data>` mapping seqnos to data blobs.
16//!
17//! The current seqno for a key is determined by a reverse scan of the data
18//! entries, rather than a separate head pointer. This ensures locality between
19//! the latest data and any metadata lookups.
20
21use std::io::Write;
22
23use anyhow::anyhow;
24use async_stream::try_stream;
25use async_trait::async_trait;
26use bytes::Bytes;
27use futures_util::future::FutureExt;
28use mz_foundationdb::FdbConfig;
29use mz_foundationdb::directory::{
30    Directory, DirectoryError, DirectoryLayer, DirectoryOutput, DirectorySubspace,
31};
32use mz_foundationdb::tuple::{
33    PackError, PackResult, Subspace, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset,
34};
35use mz_foundationdb::{
36    Database, FdbBindingError, FdbError, KeySelector, RangeOption, TransactError, TransactOption,
37    Transaction,
38};
39use mz_ore::url::SensitiveUrl;
40
41use crate::error::Error;
42use crate::location::{
43    CaSResult, Consensus, Determinate, ExternalError, Indeterminate, ResultStream, SeqNo,
44    VersionedData,
45};
46
47impl From<FdbError> for ExternalError {
48    fn from(x: FdbError) -> Self {
49        if x.is_retryable() {
50            ExternalError::Indeterminate(Indeterminate::new(x.into()))
51        } else {
52            ExternalError::Determinate(Determinate::new(x.into()))
53        }
54    }
55}
56
57impl From<FdbBindingError> for ExternalError {
58    fn from(x: FdbBindingError) -> Self {
59        ExternalError::Determinate(Determinate::new(x.into()))
60    }
61}
62
63/// Configuration to connect to a FoundationDB backed implementation of [Consensus].
64#[derive(Clone, Debug)]
65pub struct FdbConsensusConfig {
66    url: SensitiveUrl,
67}
68
69impl FdbConsensusConfig {
70    /// Returns a new [FdbConsensusConfig] for use in production.
71    pub fn new(url: SensitiveUrl) -> Result<Self, Error> {
72        Ok(FdbConsensusConfig { url })
73    }
74}
75
76/// Implementation of [Consensus] over a Foundation database.
77pub struct FdbConsensus {
78    /// Subspace for data.
79    keys: DirectorySubspace,
80    /// Subspace for data.
81    data: DirectorySubspace,
82    /// The FoundationDB database handle.
83    db: Database,
84}
85
86/// An error that can occur during a FoundationDB transaction.
87/// This is either a FoundationDB error or an external error.
88enum FdbTransactError {
89    FdbError(FdbError),
90    ExternalError(ExternalError),
91}
92
93impl From<FdbError> for FdbTransactError {
94    fn from(value: FdbError) -> Self {
95        Self::FdbError(value)
96    }
97}
98
99impl From<ExternalError> for FdbTransactError {
100    fn from(value: ExternalError) -> Self {
101        Self::ExternalError(value)
102    }
103}
104
105impl From<PackError> for FdbTransactError {
106    fn from(value: PackError) -> Self {
107        ExternalError::Determinate(anyhow::Error::new(value).into()).into()
108    }
109}
110
111impl From<FdbTransactError> for ExternalError {
112    fn from(value: FdbTransactError) -> Self {
113        match value {
114            FdbTransactError::FdbError(e) => e.into(),
115            FdbTransactError::ExternalError(e) => e,
116        }
117    }
118}
119
120impl From<DirectoryError> for ExternalError {
121    fn from(e: DirectoryError) -> Self {
122        ExternalError::Determinate(anyhow!("directory error: {e:?}").into())
123    }
124}
125
126impl TransactError for FdbTransactError {
127    fn try_into_fdb_error(self) -> Result<FdbError, Self> {
128        match self {
129            Self::FdbError(e) => Ok(e),
130            other => Err(other),
131        }
132    }
133}
134
135impl TuplePack for SeqNo {
136    fn pack<W: Write>(
137        &self,
138        w: &mut W,
139        tuple_depth: TupleDepth,
140    ) -> std::io::Result<VersionstampOffset> {
141        self.0.pack(w, tuple_depth)
142    }
143}
144
145impl<'de> TupleUnpack<'de> for SeqNo {
146    fn unpack(input: &'de [u8], tuple_depth: TupleDepth) -> PackResult<(&'de [u8], Self)> {
147        u64::unpack(input, tuple_depth).map(|(rem, v)| (rem, SeqNo(v)))
148    }
149}
150
151impl std::fmt::Debug for FdbConsensus {
152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153        f.debug_struct("FdbConsensus")
154            .field("keys", &self.keys)
155            .field("data", &self.data)
156            .finish_non_exhaustive()
157    }
158}
159
160impl FdbConsensus {
161    /// Open a FoundationDB [Consensus] instance with `config`.
162    pub async fn open(config: FdbConsensusConfig) -> Result<Self, ExternalError> {
163        let fdb_config =
164            FdbConfig::parse(&config.url).map_err(|e| ExternalError::Determinate(e.into()))?;
165
166        mz_foundationdb::init_network();
167
168        let db = Database::new(None)?;
169        let directory = DirectoryLayer::default();
170        let keys_path: Vec<_> = fdb_config
171            .prefix
172            .iter()
173            .cloned()
174            .chain(std::iter::once("keys".to_owned()))
175            .collect();
176        let keys = Self::open_directory(&db, &directory, &keys_path).await?;
177        let data_path: Vec<_> = fdb_config
178            .prefix
179            .into_iter()
180            .chain(std::iter::once("data".to_owned()))
181            .collect();
182        let data = Self::open_directory(&db, &directory, &data_path).await?;
183        Ok(FdbConsensus { keys, data, db })
184    }
185
186    /// Opens (or creates) a directory at the specified path. Errors if the
187    /// directory is a partition, or cannot be opened for another reason..
188    async fn open_directory(
189        db: &Database,
190        directory: &DirectoryLayer,
191        path: &[String],
192    ) -> Result<DirectorySubspace, ExternalError> {
193        let directory = db
194            .run(async |trx, _maybe_commited| {
195                Ok(directory.create_or_open(&trx, path, None, None).await)
196            })
197            .await??;
198        match directory {
199            DirectoryOutput::DirectorySubspace(subspace) => Ok(subspace),
200            DirectoryOutput::DirectoryPartition(_partition) => Err(ExternalError::from(anyhow!(
201                "consensus data cannot be a partition"
202            ))),
203        }
204    }
205
206    /// Returns the latest entry for a key by reverse scanning the data entries.
207    ///
208    /// If `snapshot` is true, uses snapshot reads which don't create conflict
209    /// ranges. Use snapshot=true for read-only `head()` calls, and snapshot=false
210    /// for `compare_and_set()` where we need conflict detection.
211    async fn head_trx(
212        &self,
213        trx: &Transaction,
214        data_key: &Subspace,
215        snapshot: bool,
216    ) -> Result<Option<VersionedData>, FdbTransactError> {
217        let mut range = RangeOption::from(data_key).rev();
218        range.limit = Some(1);
219        range.mode = mz_foundationdb::options::StreamingMode::Exact;
220        let values = trx.get_range(&range, 1, snapshot).await?;
221        if let Some(kv) = values.first() {
222            let seqno = data_key.unpack(kv.key())?;
223            Ok(Some(VersionedData {
224                seqno,
225                data: Bytes::from(kv.value().to_vec()),
226            }))
227        } else {
228            Ok(None)
229        }
230    }
231    async fn compare_and_set_trx(
232        &self,
233        trx: &Transaction,
234        data_key: &Subspace,
235        expected: &Option<SeqNo>,
236        new: &VersionedData,
237        key: &str,
238    ) -> Result<CaSResult, FdbTransactError> {
239        // Use non-snapshot read to create conflict ranges for concurrent writes.
240        let current = self.head_trx(trx, data_key, false).await?;
241        let current_seqno = current.map(|v| v.seqno);
242
243        if expected != &current_seqno {
244            return Ok(CaSResult::ExpectationMismatch);
245        }
246
247        if expected.is_none() {
248            // If expected is `None`, it's a new key which we need to register in the keys directory.
249            let key = self.keys.pack(&key);
250            trx.set(&key, &[]);
251        }
252
253        let data_seqno_key = data_key.pack(&new.seqno);
254        trx.set(&data_seqno_key, new.data.as_ref());
255        Ok(CaSResult::Committed)
256    }
257
258    async fn scan_trx(
259        &self,
260        trx: &Transaction,
261        data_key: &Subspace,
262        from: &SeqNo,
263        limit: &usize,
264        entries: &mut Vec<VersionedData>,
265    ) -> Result<(), FdbTransactError> {
266        let seqno_start = data_key.pack(&from);
267        let seqno_end = data_key.pack(&SeqNo::maximum());
268
269        let mut range = RangeOption::from(seqno_start..=seqno_end);
270        range.limit = Some(*limit);
271
272        entries.clear();
273
274        loop {
275            let output = trx.get_range(&range, 1, false).await?;
276            entries.reserve(output.len());
277            for key_value in &output {
278                let seqno = data_key.unpack(key_value.key())?;
279                entries.push(VersionedData {
280                    seqno,
281                    data: Bytes::from(key_value.value().to_vec()),
282                });
283            }
284
285            if let Some(next_range) = range.next_range(&output) {
286                range = next_range;
287            } else {
288                break;
289            }
290        }
291        Ok(())
292    }
293
294    async fn truncate_trx(
295        &self,
296        trx: &Transaction,
297        data_key: &Subspace,
298        until: &SeqNo,
299    ) -> Result<(), FdbTransactError> {
300        // Snapshot read is fine here - truncate is idempotent and the validation
301        // only gets more permissive if a concurrent CaS increases the seqno.
302        let current = self.head_trx(trx, data_key, true).await?;
303        if let Some(current) = current {
304            if current.seqno < *until {
305                return Err(ExternalError::Determinate(
306                    anyhow!("upper bound too high for truncate: {until}").into(),
307                )
308                .into());
309            }
310        } else {
311            return Err(ExternalError::Determinate(anyhow!("no entries for key").into()).into());
312        }
313        let key_space_start = data_key.pack(&SeqNo::minimum());
314        let key_space_end = data_key.pack(&until);
315
316        trx.clear_range(&key_space_start, &key_space_end);
317        Ok(())
318    }
319}
320
321#[async_trait]
322impl Consensus for FdbConsensus {
323    fn list_keys(&self) -> ResultStream<'_, String> {
324        Box::pin(try_stream! {
325            let keys: Vec<String> = self
326                .db
327                .run(async |trx, _maybe_commited| {
328                    let mut range = RangeOption::from(self.keys.range());
329                    let mut keys = Vec::new();
330                    loop {
331                        let values = trx.get_range(&range, 1, false).await?;
332                        for value in &values {
333                            let key: String = self.keys.unpack(value.key())
334                                .map_err(FdbBindingError::PackError)?;
335                            keys.push(key);
336                        }
337                        if let Some(last) = values.last() {
338                            range.begin = KeySelector::first_greater_than(last.key().to_vec());
339                        } else {
340                            break;
341                        }
342                    }
343                    Ok(keys)
344                }).await?;
345
346            for shard in keys {
347                yield shard;
348            }
349        })
350    }
351
352    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
353        let data_key = self.data.subspace(&key);
354
355        let ok = self
356            .db
357            .transact_boxed(
358                &data_key,
359                // Use snapshot read - we don't need strict consistency for head().
360                |trx, data_key| self.head_trx(trx, data_key, true).boxed(),
361                TransactOption::default(),
362            )
363            .await?;
364        Ok(ok)
365    }
366
367    async fn compare_and_set(
368        &self,
369        key: &str,
370        expected: Option<SeqNo>,
371        new: VersionedData,
372    ) -> Result<CaSResult, ExternalError> {
373        if let Some(expected) = expected {
374            if new.seqno <= expected {
375                return Err(Error::from(
376                    format!("new seqno must be strictly greater than expected. Got new: {:?} expected: {:?}",
377                            new.seqno, expected)).into());
378            }
379        }
380        if new.seqno.0 > i64::MAX.try_into().expect("i64::MAX known to fit in u64") {
381            return Err(ExternalError::from(anyhow!(
382                "sequence numbers must fit within [0, i64::MAX], received: {:?}",
383                new.seqno
384            )));
385        }
386
387        let data_key = self.data.subspace(&key);
388
389        let ok = self
390            .db
391            .transact_boxed(
392                (expected, &new, &*key),
393                |trx, (expected, new, key)| {
394                    self.compare_and_set_trx(trx, &data_key, expected, new, key)
395                        .boxed()
396                },
397                TransactOption::default(),
398            )
399            .await?;
400        Ok(ok)
401    }
402
403    async fn scan(
404        &self,
405        key: &str,
406        from: SeqNo,
407        limit: usize,
408    ) -> Result<Vec<VersionedData>, ExternalError> {
409        let data_key = self.data.subspace(&key);
410        let mut entries = Vec::new();
411        self.db
412            .transact_boxed(
413                (&data_key, from, limit, &mut entries),
414                |trx, (data_key, from, limit, entries)| {
415                    self.scan_trx(trx, data_key, from, limit, entries).boxed()
416                },
417                TransactOption::default(),
418            )
419            .await?;
420
421        entries.sort_by_key(|e| e.seqno);
422        Ok(entries)
423    }
424
425    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
426        let data_key = self.data.subspace(&key);
427
428        self.db
429            .transact_boxed(
430                (&data_key, seqno),
431                |trx, (data_key, seqno)| self.truncate_trx(trx, data_key, seqno).boxed(),
432                TransactOption::idempotent(),
433            )
434            .await?;
435        Ok(None)
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442
443    use mz_foundationdb::directory::Directory;
444    use uuid::Uuid;
445
446    use crate::location::tests::consensus_impl_test;
447
448    /// Drops and recreates the `consensus` data in FoundationDB.
449    ///
450    /// ONLY FOR TESTING
451    async fn drop_and_recreate(consensus: &FdbConsensus) -> Result<(), ExternalError> {
452        consensus
453            .db
454            .run(async |trx, _maybe_commited| {
455                consensus.keys.remove(&trx, &[]).await?;
456                consensus.data.remove(&trx, &[]).await?;
457                Ok(())
458            })
459            .await?;
460        Ok(())
461    }
462
463    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
464    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
465    #[ignore] // TODO: Reenable when https://github.com/MaterializeInc/database-issues/issues/10076 is fixed
466    async fn fdb_consensus() -> Result<(), ExternalError> {
467        let config = FdbConsensusConfig::new(
468            std::str::FromStr::from_str("foundationdb:?prefix=test/consensus").unwrap(),
469        )?;
470
471        {
472            let fdb = FdbConsensus::open(config.clone()).await?;
473            drop_and_recreate(&fdb).await?;
474        }
475
476        consensus_impl_test(|| FdbConsensus::open(config.clone())).await?;
477
478        // and now verify the implementation-specific `drop_and_recreate` works as intended
479        let consensus = FdbConsensus::open(config.clone()).await?;
480        let key = Uuid::new_v4().to_string();
481        let mut state = VersionedData {
482            seqno: SeqNo(5),
483            data: Bytes::from("abc"),
484        };
485
486        assert_eq!(
487            consensus.compare_and_set(&key, None, state.clone()).await,
488            Ok(CaSResult::Committed),
489        );
490        state.seqno = SeqNo(6);
491        assert_eq!(
492            consensus
493                .compare_and_set(&key, Some(SeqNo(5)), state.clone())
494                .await,
495            Ok(CaSResult::Committed),
496        );
497        state.seqno = SeqNo(129 + 5);
498        assert_eq!(
499            consensus
500                .compare_and_set(&key, Some(SeqNo(6)), state.clone())
501                .await,
502            Ok(CaSResult::Committed),
503        );
504
505        assert_eq!(consensus.head(&key).await, Ok(Some(state.clone())));
506
507        println!("--- SCANNING ---");
508
509        for data in consensus.scan(&key, SeqNo(129), 10).await? {
510            println!(
511                "scan data: seqno: {:?}, {} bytes",
512                data.seqno,
513                data.data.len()
514            );
515        }
516
517        drop_and_recreate(&consensus).await?;
518
519        assert_eq!(consensus.head(&key).await, Ok(None));
520
521        mz_foundationdb::shutdown_network();
522        Ok(())
523    }
524}