Skip to main content

mz_persist/
foundationdb.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of [Consensus] backed by FoundationDB.
11//!
12//! We're storing the consensus data in a subspace. Each key maps to a subspace
13//! with the following structure:
14//! * `./keys/<key> -> ()` to track existing keys.
15//! * `./data/<key> -> <seqno>` mapping to the latest seqno for the key.
16//! * `./data/<key>/<seqno> -> <data>` mapping seqnos to data blobs.
17
18use std::io::Write;
19
20use anyhow::anyhow;
21use async_stream::try_stream;
22use async_trait::async_trait;
23use bytes::Bytes;
24use futures_util::future::FutureExt;
25use mz_foundationdb::FdbConfig;
26use mz_foundationdb::directory::{
27    Directory, DirectoryError, DirectoryLayer, DirectoryOutput, DirectorySubspace,
28};
29use mz_foundationdb::tuple::{
30    PackError, PackResult, Subspace, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset, pack,
31    unpack,
32};
33use mz_foundationdb::{
34    Database, FdbBindingError, FdbError, KeySelector, RangeOption, TransactError, TransactOption,
35    Transaction,
36};
37use mz_ore::url::SensitiveUrl;
38
39use crate::error::Error;
40use crate::location::{
41    CaSResult, Consensus, Determinate, ExternalError, Indeterminate, ResultStream, SeqNo,
42    VersionedData,
43};
44
45impl From<FdbError> for ExternalError {
46    fn from(x: FdbError) -> Self {
47        if x.is_retryable() {
48            ExternalError::Indeterminate(Indeterminate::new(x.into()))
49        } else {
50            ExternalError::Determinate(Determinate::new(x.into()))
51        }
52    }
53}
54
55impl From<FdbBindingError> for ExternalError {
56    fn from(x: FdbBindingError) -> Self {
57        ExternalError::Determinate(Determinate::new(x.into()))
58    }
59}
60
61/// Configuration to connect to a FoundationDB backed implementation of [Consensus].
62#[derive(Clone, Debug)]
63pub struct FdbConsensusConfig {
64    url: SensitiveUrl,
65}
66
67impl FdbConsensusConfig {
68    /// Returns a new [FdbConsensusConfig] for use in production.
69    pub fn new(url: SensitiveUrl) -> Result<Self, Error> {
70        Ok(FdbConsensusConfig { url })
71    }
72}
73
74/// Implementation of [Consensus] over a Foundation database.
75pub struct FdbConsensus {
76    /// Subspace for data.
77    keys: DirectorySubspace,
78    /// Subspace for data.
79    data: DirectorySubspace,
80    /// The FoundationDB database handle.
81    db: Database,
82}
83
84/// An error that can occur during a FoundationDB transaction.
85/// This is either a FoundationDB error or an external error.
86enum FdbTransactError {
87    FdbError(FdbError),
88    ExternalError(ExternalError),
89}
90
91impl From<FdbError> for FdbTransactError {
92    fn from(value: FdbError) -> Self {
93        Self::FdbError(value)
94    }
95}
96
97impl From<ExternalError> for FdbTransactError {
98    fn from(value: ExternalError) -> Self {
99        Self::ExternalError(value)
100    }
101}
102
103impl From<PackError> for FdbTransactError {
104    fn from(value: PackError) -> Self {
105        ExternalError::Determinate(anyhow::Error::new(value).into()).into()
106    }
107}
108
109impl From<FdbTransactError> for ExternalError {
110    fn from(value: FdbTransactError) -> Self {
111        match value {
112            FdbTransactError::FdbError(e) => e.into(),
113            FdbTransactError::ExternalError(e) => e,
114        }
115    }
116}
117
118impl From<DirectoryError> for ExternalError {
119    fn from(e: DirectoryError) -> Self {
120        ExternalError::Determinate(anyhow!("directory error: {e:?}").into())
121    }
122}
123
124impl TransactError for FdbTransactError {
125    fn try_into_fdb_error(self) -> Result<FdbError, Self> {
126        match self {
127            Self::FdbError(e) => Ok(e),
128            other => Err(other),
129        }
130    }
131}
132
133impl TuplePack for SeqNo {
134    fn pack<W: Write>(
135        &self,
136        w: &mut W,
137        tuple_depth: TupleDepth,
138    ) -> std::io::Result<VersionstampOffset> {
139        self.0.pack(w, tuple_depth)
140    }
141}
142
143impl<'de> TupleUnpack<'de> for SeqNo {
144    fn unpack(input: &'de [u8], tuple_depth: TupleDepth) -> PackResult<(&'de [u8], Self)> {
145        u64::unpack(input, tuple_depth).map(|(rem, v)| (rem, SeqNo(v)))
146    }
147}
148
149impl std::fmt::Debug for FdbConsensus {
150    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151        f.debug_struct("FdbConsensus")
152            .field("keys", &self.keys)
153            .field("data", &self.data)
154            .finish_non_exhaustive()
155    }
156}
157
158impl FdbConsensus {
159    /// Open a FoundationDB [Consensus] instance with `config`.
160    pub async fn open(config: FdbConsensusConfig) -> Result<Self, ExternalError> {
161        let fdb_config =
162            FdbConfig::parse(&config.url).map_err(|e| ExternalError::Determinate(e.into()))?;
163
164        mz_foundationdb::init_network();
165
166        let db = Database::new(None)?;
167        let directory = DirectoryLayer::default();
168        let keys_path: Vec<_> = fdb_config
169            .prefix
170            .iter()
171            .cloned()
172            .chain(std::iter::once("keys".to_owned()))
173            .collect();
174        let keys = Self::open_directory(&db, &directory, &keys_path).await?;
175        let data_path: Vec<_> = fdb_config
176            .prefix
177            .into_iter()
178            .chain(std::iter::once("data".to_owned()))
179            .collect();
180        let data = Self::open_directory(&db, &directory, &data_path).await?;
181        Ok(FdbConsensus { keys, data, db })
182    }
183
184    /// Opens (or creates) a directory at the specified path. Errors if the
185    /// directory is a partition, or cannot be opened for another reason..
186    async fn open_directory(
187        db: &Database,
188        directory: &DirectoryLayer,
189        path: &[String],
190    ) -> Result<DirectorySubspace, ExternalError> {
191        let directory = db
192            .run(async |trx, _maybe_commited| {
193                Ok(directory.create_or_open(&trx, path, None, None).await)
194            })
195            .await??;
196        match directory {
197            DirectoryOutput::DirectorySubspace(subspace) => Ok(subspace),
198            DirectoryOutput::DirectoryPartition(_partition) => Err(ExternalError::from(anyhow!(
199                "consensus data cannot be a partition"
200            ))),
201        }
202    }
203
204    async fn head_trx(
205        &self,
206        trx: &Transaction,
207        data_key: &Subspace,
208    ) -> Result<Option<VersionedData>, FdbTransactError> {
209        let mut range = RangeOption::from(data_key).rev();
210        range.limit = Some(1);
211        range.mode = mz_foundationdb::options::StreamingMode::Exact;
212        // Allow snapshot reads as we don't need the latest data for head, just some recent data.
213        let values = trx.get_range(&range, 1, true).await?;
214        if let Some(kv) = values.first() {
215            let seqno = data_key.unpack(kv.key())?;
216            Ok(Some(VersionedData {
217                seqno,
218                data: Bytes::from(kv.value().to_vec()),
219            }))
220        } else {
221            Ok(None)
222        }
223    }
224    async fn compare_and_set_trx(
225        &self,
226        trx: &Transaction,
227        data_key: &Subspace,
228        expected: &Option<SeqNo>,
229        new: &VersionedData,
230        key: &str,
231    ) -> Result<CaSResult, FdbTransactError> {
232        let seqno = trx
233            .get(data_key.bytes(), false)
234            .await?
235            .map(|data| unpack(&data))
236            .transpose()?;
237
238        if expected != &seqno {
239            return Ok(CaSResult::ExpectationMismatch);
240        }
241
242        trx.set(data_key.bytes(), &pack(&new.seqno));
243
244        if expected.is_none() {
245            // If expected is `None`, it's a new key which we need to register in the keys directory.
246            let key = self.keys.pack(&key);
247            trx.set(&key, &[]);
248        }
249
250        let data_seqno_key = data_key.pack(&new.seqno);
251        trx.set(&data_seqno_key, new.data.as_ref());
252        Ok(CaSResult::Committed)
253    }
254
255    async fn scan_trx(
256        &self,
257        trx: &Transaction,
258        data_key: &Subspace,
259        from: &SeqNo,
260        limit: &usize,
261        entries: &mut Vec<VersionedData>,
262    ) -> Result<(), FdbTransactError> {
263        let seqno_start = data_key.pack(&from);
264        let seqno_end = data_key.pack(&SeqNo::maximum());
265
266        let mut range = RangeOption::from(seqno_start..=seqno_end);
267        range.limit = Some(*limit);
268
269        entries.clear();
270
271        loop {
272            let output = trx.get_range(&range, 1, false).await?;
273            entries.reserve(output.len());
274            for key_value in &output {
275                let seqno = data_key.unpack(key_value.key())?;
276                entries.push(VersionedData {
277                    seqno,
278                    data: Bytes::from(key_value.value().to_vec()),
279                });
280            }
281
282            if let Some(next_range) = range.next_range(&output) {
283                range = next_range;
284            } else {
285                break;
286            }
287        }
288        Ok(())
289    }
290
291    async fn truncate_trx(
292        &self,
293        trx: &Transaction,
294        data_key: &Subspace,
295        until: &SeqNo,
296    ) -> Result<(), FdbTransactError> {
297        let seqno = trx.get(data_key.bytes(), false).await?;
298        if let Some(seqno) = &seqno {
299            let current_seqno: SeqNo = unpack(seqno)?;
300            if current_seqno < *until {
301                return Err(ExternalError::Determinate(
302                    anyhow!("upper bound too high for truncate: {until}").into(),
303                )
304                .into());
305            }
306        } else {
307            return Err(ExternalError::Determinate(anyhow!("no entries for key").into()).into());
308        }
309        let key_space_start = data_key.pack(&SeqNo::minimum());
310        let key_space_end = data_key.pack(&until);
311
312        trx.clear_range(&key_space_start, &key_space_end);
313        Ok(())
314    }
315}
316
317#[async_trait]
318impl Consensus for FdbConsensus {
319    fn list_keys(&self) -> ResultStream<'_, String> {
320        Box::pin(try_stream! {
321            let keys: Vec<String> = self
322                .db
323                .run(async |trx, _maybe_commited| {
324                    let mut range = RangeOption::from(self.keys.range());
325                    let mut keys = Vec::new();
326                    loop {
327                        let values = trx.get_range(&range, 1, false).await?;
328                        for value in &values {
329                            let key: String = self.keys.unpack(value.key()).map_err(FdbBindingError::PackError)?;
330                            keys.push(key);
331                        }
332                        if let Some(last) = values.last() {
333                            range.begin = KeySelector::first_greater_than(last.key().to_vec());
334                        } else {
335                            break;
336                        }
337                    }
338                    Ok(keys)
339                }).await?;
340
341            for shard in keys {
342                yield shard;
343            }
344        })
345    }
346
347    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
348        let data_key = self.data.subspace(&key);
349
350        let ok = self
351            .db
352            .transact_boxed(
353                &data_key,
354                |trx, data_key| self.head_trx(trx, data_key).boxed(),
355                TransactOption::default(),
356            )
357            .await?;
358        Ok(ok)
359    }
360
361    async fn compare_and_set(
362        &self,
363        key: &str,
364        expected: Option<SeqNo>,
365        new: VersionedData,
366    ) -> Result<CaSResult, ExternalError> {
367        if let Some(expected) = expected {
368            if new.seqno <= expected {
369                return Err(Error::from(
370                    format!("new seqno must be strictly greater than expected. Got new: {:?} expected: {:?}",
371                            new.seqno, expected)).into());
372            }
373        }
374        if new.seqno.0 > i64::MAX.try_into().expect("i64::MAX known to fit in u64") {
375            return Err(ExternalError::from(anyhow!(
376                "sequence numbers must fit within [0, i64::MAX], received: {:?}",
377                new.seqno
378            )));
379        }
380
381        let data_key = self.data.subspace(&key);
382
383        let ok = self
384            .db
385            .transact_boxed(
386                (expected, &new, &*key),
387                |trx, (expected, new, key)| {
388                    self.compare_and_set_trx(trx, &data_key, expected, new, key)
389                        .boxed()
390                },
391                TransactOption::default(),
392            )
393            .await?;
394        Ok(ok)
395    }
396
397    async fn scan(
398        &self,
399        key: &str,
400        from: SeqNo,
401        limit: usize,
402    ) -> Result<Vec<VersionedData>, ExternalError> {
403        let data_key = self.data.subspace(&key);
404        let mut entries = Vec::new();
405        self.db
406            .transact_boxed(
407                (&data_key, from, limit, &mut entries),
408                |trx, (data_key, from, limit, entries)| {
409                    self.scan_trx(trx, data_key, from, limit, entries).boxed()
410                },
411                TransactOption::default(),
412            )
413            .await?;
414
415        entries.sort_by_key(|e| e.seqno);
416        Ok(entries)
417    }
418
419    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
420        let data_key = self.data.subspace(&key);
421
422        self.db
423            .transact_boxed(
424                (&data_key, seqno),
425                |trx, (data_key, seqno)| self.truncate_trx(trx, data_key, seqno).boxed(),
426                TransactOption::idempotent(),
427            )
428            .await?;
429        Ok(None)
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436
437    use mz_foundationdb::directory::Directory;
438    use uuid::Uuid;
439
440    use crate::location::tests::consensus_impl_test;
441
442    /// Drops and recreates the `consensus` data in FoundationDB.
443    ///
444    /// ONLY FOR TESTING
445    async fn drop_and_recreate(consensus: &FdbConsensus) -> Result<(), ExternalError> {
446        consensus
447            .db
448            .run(async |trx, _maybe_commited| {
449                consensus.keys.remove(&trx, &[]).await?;
450                consensus.data.remove(&trx, &[]).await?;
451                Ok(())
452            })
453            .await?;
454        Ok(())
455    }
456
457    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
458    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
459    async fn fdb_consensus() -> Result<(), ExternalError> {
460        let config = FdbConsensusConfig::new(
461            std::str::FromStr::from_str("foundationdb:?prefix=test/consensus").unwrap(),
462        )?;
463
464        {
465            let fdb = FdbConsensus::open(config.clone()).await?;
466            drop_and_recreate(&fdb).await?;
467        }
468
469        consensus_impl_test(|| FdbConsensus::open(config.clone())).await?;
470
471        // and now verify the implementation-specific `drop_and_recreate` works as intended
472        let consensus = FdbConsensus::open(config.clone()).await?;
473        let key = Uuid::new_v4().to_string();
474        let mut state = VersionedData {
475            seqno: SeqNo(5),
476            data: Bytes::from("abc"),
477        };
478
479        assert_eq!(
480            consensus.compare_and_set(&key, None, state.clone()).await,
481            Ok(CaSResult::Committed),
482        );
483        state.seqno = SeqNo(6);
484        assert_eq!(
485            consensus
486                .compare_and_set(&key, Some(SeqNo(5)), state.clone())
487                .await,
488            Ok(CaSResult::Committed),
489        );
490        state.seqno = SeqNo(129 + 5);
491        assert_eq!(
492            consensus
493                .compare_and_set(&key, Some(SeqNo(6)), state.clone())
494                .await,
495            Ok(CaSResult::Committed),
496        );
497
498        assert_eq!(consensus.head(&key).await, Ok(Some(state.clone())));
499
500        println!("--- SCANNING ---");
501
502        for data in consensus.scan(&key, SeqNo(129), 10).await? {
503            println!(
504                "scan data: seqno: {:?}, {} bytes",
505                data.seqno,
506                data.data.len()
507            );
508        }
509
510        drop_and_recreate(&consensus).await?;
511
512        assert_eq!(consensus.head(&key).await, Ok(None));
513
514        mz_foundationdb::shutdown_network();
515        Ok(())
516    }
517}