Skip to main content

mz_timestamp_oracle/
foundationdb_oracle.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A timestamp oracle backed by FoundationDB for persistence/durability and where
11//! all oracle operations are self-sufficiently linearized, without requiring
12//! any external precautions/machinery.
13//!
14//! We store the timestamp data in a subspace at the configured path. Each timeline
15//! maps to a subspace with the following structure:
16//! * `./<timeline>/read_ts -> <timestamp>`
17//! * `./<timeline>/write_ts -> <timestamp>`
18
19use std::io::Write;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use anyhow::anyhow;
24use async_trait::async_trait;
25use futures_util::future::FutureExt;
26use mz_foundationdb::FdbConfig;
27use mz_foundationdb::directory::{Directory, DirectoryError, DirectoryLayer, DirectoryOutput};
28use mz_foundationdb::tuple::{
29    PackError, PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset, pack, unpack,
30};
31use mz_foundationdb::{
32    Database, FdbBindingError, FdbError, TransactError, TransactOption, Transaction,
33};
34use mz_ore::metrics::MetricsRegistry;
35use mz_ore::url::SensitiveUrl;
36use mz_repr::Timestamp;
37use tracing::{debug, info};
38
39use crate::metrics::Metrics;
40use crate::{GenericNowFn, TimestampOracle, WriteTimestamp};
41
42/// A [`TimestampOracle`] backed by FoundationDB.
43pub struct FdbTimestampOracle<N> {
44    timeline: String,
45    next: N,
46    db: Arc<Database>,
47    /// A read-only timestamp oracle is NOT allowed to do operations that change
48    /// the backing FoundationDB state.
49    read_only: bool,
50    /// read_ts key for this timeline
51    read_ts_key: Vec<u8>,
52    /// write_ts key for this timeline
53    write_ts_key: Vec<u8>,
54}
55
56impl<N> std::fmt::Debug for FdbTimestampOracle<N>
57where
58    N: GenericNowFn<Timestamp> + std::fmt::Debug,
59{
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        f.debug_struct("FdbTimestampOracle")
62            .field("timeline", &self.timeline)
63            .field("next", &self.next)
64            .field("read_only", &self.read_only)
65            .field("read_ts_key", &self.read_ts_key)
66            .field("write_ts_key", &self.write_ts_key)
67            .finish_non_exhaustive()
68    }
69}
70
71/// Configuration to connect to a FoundationDB-backed implementation of
72/// [`TimestampOracle`].
73#[derive(Clone, Debug)]
74pub struct FdbTimestampOracleConfig {
75    url: SensitiveUrl,
76    metrics: Arc<Metrics>,
77}
78
79impl FdbTimestampOracleConfig {
80    /// Returns a new instance of [`FdbTimestampOracleConfig`].
81    pub fn new(url: SensitiveUrl, metrics_registry: &MetricsRegistry) -> Self {
82        let metrics = Arc::new(Metrics::new(metrics_registry));
83        Self { url, metrics }
84    }
85
86    /// Returns a new [`FdbTimestampOracleConfig`] for use in unit tests.
87    ///
88    /// By default, fdb oracle tests are no-ops so that `cargo test` works
89    /// on new environments without any configuration. To activate the tests for
90    /// [`FdbTimestampOracle`] set the `FDB_TIMESTAMP_ORACLE_URL` environment variable
91    /// with a valid connection URL.
92    pub fn new_for_test() -> Self {
93        Self {
94            url: FromStr::from_str("foundationdb:?prefix=test/tsoracle").unwrap(),
95            metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
96        }
97    }
98
99    /// Returns the metrics associated with this config.
100    pub(crate) fn metrics(&self) -> &Arc<Metrics> {
101        &self.metrics
102    }
103}
104
105/// An error that can occur during a FoundationDB transaction.
106/// This is either a FoundationDB error or an external error.
107enum FdbTransactError {
108    FdbError(FdbError),
109    ExternalError(anyhow::Error),
110}
111
112impl std::fmt::Debug for FdbTransactError {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        match self {
115            FdbTransactError::FdbError(e) => write!(f, "FdbError({})", e),
116            FdbTransactError::ExternalError(e) => write!(f, "ExternalError({:?})", e),
117        }
118    }
119}
120
121impl From<FdbError> for FdbTransactError {
122    fn from(value: FdbError) -> Self {
123        Self::FdbError(value)
124    }
125}
126
127impl From<anyhow::Error> for FdbTransactError {
128    fn from(value: anyhow::Error) -> Self {
129        Self::ExternalError(value)
130    }
131}
132
133impl From<PackError> for FdbTransactError {
134    fn from(value: PackError) -> Self {
135        Self::ExternalError(anyhow::Error::new(value))
136    }
137}
138
139impl From<FdbBindingError> for FdbTransactError {
140    fn from(value: FdbBindingError) -> Self {
141        Self::ExternalError(anyhow::Error::new(value))
142    }
143}
144
145impl From<FdbTransactError> for anyhow::Error {
146    fn from(value: FdbTransactError) -> Self {
147        match value {
148            FdbTransactError::FdbError(e) => anyhow::Error::new(e),
149            FdbTransactError::ExternalError(e) => e,
150        }
151    }
152}
153
154impl TransactError for FdbTransactError {
155    fn try_into_fdb_error(self) -> Result<FdbError, Self> {
156        match self {
157            Self::FdbError(e) => Ok(e),
158            other => Err(other),
159        }
160    }
161}
162
163/// Wrapper to implement TuplePack/TupleUnpack for Timestamp.
164#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
165struct PackableTimestamp(Timestamp);
166
167impl TuplePack for PackableTimestamp {
168    fn pack<W: Write>(
169        &self,
170        w: &mut W,
171        tuple_depth: TupleDepth,
172    ) -> std::io::Result<VersionstampOffset> {
173        u64::from(self.0).pack(w, tuple_depth)
174    }
175}
176
177impl<'de> TupleUnpack<'de> for PackableTimestamp {
178    fn unpack(input: &'de [u8], tuple_depth: TupleDepth) -> PackResult<(&'de [u8], Self)> {
179        u64::unpack(input, tuple_depth).map(|(rem, v)| (rem, PackableTimestamp(Timestamp::from(v))))
180    }
181}
182
183impl<N: Sync> FdbTimestampOracle<N> {
184    async fn open_inner(
185        timeline: String,
186        next: N,
187        read_only: bool,
188        db: Arc<Database>,
189        prefix: Vec<String>,
190        directory: DirectoryLayer,
191    ) -> Result<FdbTimestampOracle<N>, anyhow::Error> {
192        // Create a subspace for this timeline at <prefix>/<timeline>
193        let timeline_path: Vec<_> = prefix
194            .into_iter()
195            .chain(std::iter::once(timeline.clone()))
196            .collect();
197
198        let timeline_subspace = db
199            .run(async |trx, _maybe_committed| {
200                Ok(directory
201                    .create_or_open(&trx, &timeline_path, None, None)
202                    .await)
203            })
204            .await?
205            .map_err(|e| anyhow!("directory error: {e:?}"))?;
206
207        let timeline_subspace = match timeline_subspace {
208            DirectoryOutput::DirectorySubspace(subspace) => subspace,
209            DirectoryOutput::DirectoryPartition(_partition) => {
210                return Err(anyhow!("timestamp oracle timeline cannot be a partition"));
211            }
212        };
213
214        let read_ts_key = timeline_subspace.pack(&"read_ts");
215        let write_ts_key = timeline_subspace.pack(&"write_ts");
216
217        Ok(Self {
218            timeline,
219            next,
220            read_ts_key,
221            write_ts_key,
222            db,
223            read_only,
224        })
225    }
226
227    async fn max_ts(&self) -> Result<Option<PackableTimestamp>, anyhow::Error> {
228        let max_ts = self
229            .db
230            .transact_boxed(
231                &(),
232                |trx, ()| self.max_rs_trx(trx).boxed(),
233                TransactOption::default(),
234            )
235            .await?;
236        Ok(max_ts)
237    }
238
239    async fn max_rs_trx(
240        &self,
241        trx: &Transaction,
242    ) -> Result<Option<PackableTimestamp>, FdbTransactError> {
243        let read_data = trx.get(&self.read_ts_key, false).await?;
244        let write_data = trx.get(&self.write_ts_key, false).await?;
245
246        let read_ts: Option<PackableTimestamp> =
247            read_data.map(|data| unpack(&data).expect("must unpack"));
248
249        let write_ts: Option<PackableTimestamp> =
250            write_data.map(|data| unpack(&data).expect("must unpack"));
251
252        let max_ts = std::cmp::max(read_ts, write_ts);
253
254        Ok::<_, FdbTransactError>(max_ts)
255    }
256}
257
258impl<N> FdbTimestampOracle<N>
259where
260    N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
261{
262    /// Open a FoundationDB [`TimestampOracle`] instance with `config`, for the
263    /// timeline named `timeline`. `next` generates new timestamps when invoked.
264    /// Timestamps that are returned are made durable and will never retract.
265    pub async fn open(
266        config: FdbTimestampOracleConfig,
267        timeline: String,
268        initially: Timestamp,
269        next: N,
270        read_only: bool,
271    ) -> Result<Self, anyhow::Error> {
272        info!(config = ?config, "opening FdbTimestampOracle");
273
274        let fdb_config = FdbConfig::parse(&config.url)?;
275
276        mz_foundationdb::init_network();
277
278        let db = Arc::new(Database::new(None)?);
279        let prefix = fdb_config.prefix;
280        let directory = DirectoryLayer::default();
281
282        let oracle = Self::open_inner(timeline, next, read_only, db, prefix, directory).await?;
283
284        // Initialize the timestamps for this timeline if they don't exist.
285        oracle.initialize(initially).await?;
286
287        Ok(oracle)
288    }
289
290    /// Initialize the timestamps for this timeline if they don't exist.
291    ///
292    /// Both `read_ts` and `write_ts` are set to `initially` if they do not already exist.
293    async fn initialize(&self, initially: Timestamp) -> Result<(), FdbBindingError> {
294        // Initialize the timestamps for this timeline if they don't exist.
295        // Keys are stored as <timeline_subspace>/read_ts and <timeline_subspace>/write_ts
296        let initially_packed = pack(&PackableTimestamp(initially));
297
298        self.db
299            .run(async |trx, _maybe_committed| {
300                // Check if read_ts exists, if not initialize it
301                let existing_read = trx.get(&self.read_ts_key, false).await?;
302                if existing_read.is_none() {
303                    trx.set(&self.read_ts_key, &initially_packed);
304                }
305
306                // Check if write_ts exists, if not initialize it
307                let existing_write = trx.get(&self.write_ts_key, false).await?;
308                if existing_write.is_none() {
309                    trx.set(&self.write_ts_key, &initially_packed);
310                }
311
312                Ok(())
313            })
314            .await?;
315
316        // Forward timestamps to what we're given from outside.
317        if !self.read_only {
318            self.apply_write(initially).await;
319        }
320
321        Ok(())
322    }
323
324    /// Returns a `Vec` of all known timelines along with their current greatest
325    /// timestamp (max of read_ts and write_ts).
326    ///
327    /// For use when initializing another [`TimestampOracle`] implementation
328    /// from another oracle's state.
329    pub async fn get_all_timelines(
330        config: FdbTimestampOracleConfig,
331    ) -> Result<Vec<(String, Timestamp)>, anyhow::Error> {
332        let fdb_config = FdbConfig::parse(&config.url)?;
333
334        mz_foundationdb::init_network();
335
336        let db = Arc::new(Database::new(None)?);
337        let prefix = fdb_config.prefix;
338        let directory = DirectoryLayer::default();
339
340        // List all timeline directories under the prefix
341        let timeline_names = match db
342            .run(async |trx, _maybe_committed| Ok(directory.list(&trx, &prefix).await))
343            .await?
344        {
345            Err(DirectoryError::PathDoesNotExists) => Vec::new(), // No timelines exist yet
346            Err(e) => return Err(anyhow!("directory error: {e:?}")),
347            Ok(timelines) => timelines,
348        };
349
350        let mut result = Vec::with_capacity(timeline_names.len());
351
352        // For each timeline, read the max of read_ts and write_ts
353        for timeline_name in timeline_names {
354            let oracle = FdbTimestampOracle::<()>::open_inner(
355                timeline_name.clone(),
356                (),
357                true,
358                Arc::clone(&db),
359                prefix.clone(),
360                directory.clone(),
361            )
362            .await?;
363
364            if let Some(ts) = oracle.max_ts().await? {
365                result.push((timeline_name, ts.0));
366            }
367        }
368
369        Ok(result)
370    }
371
372    async fn write_ts_trx(
373        &self,
374        trx: &Transaction,
375        proposed_next_ts: Timestamp,
376    ) -> Result<Timestamp, FdbTransactError> {
377        // Get current write_ts
378        let current = trx.get(&self.write_ts_key, false).await?;
379        let current_ts: PackableTimestamp = match current {
380            Some(data) => unpack(&data)?,
381            None => {
382                return Err(FdbTransactError::ExternalError(anyhow!(
383                    "timeline not initialized"
384                )));
385            }
386        };
387
388        // Calculate new timestamp: GREATEST(write_ts+1, proposed_next_ts)
389        let incremented = current_ts.0.step_forward();
390        let new_ts = std::cmp::max(incremented, proposed_next_ts);
391
392        // Update write_ts
393        let new_ts_packed = pack(&PackableTimestamp(new_ts));
394        trx.set(&self.write_ts_key, &new_ts_packed);
395
396        Ok(new_ts)
397    }
398
399    async fn peek_write_ts_trx(
400        &self,
401        trx: &Transaction,
402    ) -> Result<Option<PackableTimestamp>, FdbTransactError> {
403        let data = trx.get(&self.write_ts_key, false).await?;
404        Ok(data.map(|data| unpack(&data)).transpose()?)
405    }
406
407    async fn read_ts_trx(
408        &self,
409        trx: &Transaction,
410    ) -> Result<Option<PackableTimestamp>, FdbTransactError> {
411        let data = trx.get(&self.read_ts_key, false).await?;
412        Ok(data.map(|data| unpack(&data)).transpose()?)
413    }
414
415    async fn apply_write_trx(
416        &self,
417        trx: &Transaction,
418        write_ts: Timestamp,
419    ) -> Result<(), FdbTransactError> {
420        // Update read_ts = GREATEST(read_ts, write_ts)
421        let current_read = trx.get(&self.read_ts_key, false).await?;
422        let current_read_ts: PackableTimestamp = match current_read {
423            Some(data) => unpack(&data)?,
424            None => {
425                return Err(FdbTransactError::ExternalError(anyhow!(
426                    "timeline not initialized"
427                )));
428            }
429        };
430
431        if write_ts > current_read_ts.0 {
432            let new_ts_packed = pack(&PackableTimestamp(write_ts));
433            trx.set(&self.read_ts_key, &new_ts_packed);
434        }
435
436        // Update write_ts = GREATEST(write_ts, write_ts_param)
437        let current_write = trx.get(&self.write_ts_key, false).await?;
438        let current_write_ts: PackableTimestamp = match current_write {
439            Some(data) => unpack(&data)?,
440            None => {
441                return Err(FdbTransactError::ExternalError(anyhow!(
442                    "timeline not initialized"
443                )));
444            }
445        };
446
447        if write_ts > current_write_ts.0 {
448            let new_ts_packed = pack(&PackableTimestamp(write_ts));
449            trx.set(&self.write_ts_key, &new_ts_packed);
450        }
451
452        Ok::<_, FdbTransactError>(())
453    }
454}
455
456#[async_trait]
457impl<N> TimestampOracle<Timestamp> for FdbTimestampOracle<N>
458where
459    N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
460{
461    async fn write_ts(&self) -> WriteTimestamp<Timestamp> {
462        if self.read_only {
463            panic!("attempting write_ts in read-only mode");
464        }
465
466        let proposed_next_ts = self.next.now();
467
468        let write_ts: Timestamp = self
469            .db
470            .transact_boxed(
471                &proposed_next_ts,
472                |trx, proposed_next_ts| self.write_ts_trx(trx, **proposed_next_ts).boxed(),
473                TransactOption::default(),
474            )
475            .await
476            .expect("write_ts transaction failed");
477
478        debug!(
479            timeline = ?self.timeline,
480            write_ts = ?write_ts,
481            proposed_next_ts = ?proposed_next_ts,
482            "returning from write_ts()"
483        );
484
485        let advance_to = write_ts.step_forward();
486
487        WriteTimestamp {
488            timestamp: write_ts,
489            advance_to,
490        }
491    }
492
493    async fn peek_write_ts(&self) -> Timestamp {
494        let write_ts = self
495            .db
496            .transact_boxed(
497                &(),
498                |trx, ()| self.peek_write_ts_trx(trx).boxed(),
499                TransactOption::default(),
500            )
501            .await
502            .expect("peek_write_ts transaction failed")
503            .expect("timeline not initialized")
504            .0;
505
506        debug!(
507            timeline = ?self.timeline,
508            write_ts = ?write_ts,
509            "returning from peek_write_ts()"
510        );
511
512        write_ts
513    }
514
515    async fn read_ts(&self) -> Timestamp {
516        let read_ts = self
517            .db
518            .transact_boxed(
519                &(),
520                |trx, ()| self.read_ts_trx(trx).boxed(),
521                TransactOption::default(),
522            )
523            .await
524            .expect("read_ts transaction failed")
525            .expect("timeline not initialized")
526            .0;
527
528        debug!(
529            timeline = ?self.timeline,
530            read_ts = ?read_ts,
531            "returning from read_ts()"
532        );
533
534        read_ts
535    }
536
537    async fn apply_write(&self, write_ts: Timestamp) {
538        if self.read_only {
539            panic!("attempting apply_write in read-only mode");
540        }
541
542        self.db
543            .transact_boxed(
544                &write_ts,
545                |trx, write_ts| self.apply_write_trx(trx, **write_ts).boxed(),
546                TransactOption::default(),
547            )
548            .await
549            .expect("apply_write transaction failed");
550
551        debug!(
552            timeline = ?self.timeline,
553            write_ts = ?write_ts,
554            "returning from apply_write()"
555        );
556    }
557}
558
559#[cfg(test)]
560mod tests {
561    use super::*;
562
563    use mz_ore::now::NowFn;
564
565    use crate::TimestampOracle;
566
567    #[mz_ore::test(tokio::test)]
568    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function
569    async fn test_fdb_timestamp_oracle() -> Result<(), anyhow::Error> {
570        let config = FdbTimestampOracleConfig::new_for_test();
571
572        crate::tests::timestamp_oracle_impl_test(|timeline, now_fn: NowFn, initial_ts| {
573            let config = config.clone();
574            async move {
575                let oracle = FdbTimestampOracle::open(config, timeline, initial_ts, now_fn, false)
576                    .await
577                    .expect("failed to open FdbTimestampOracle");
578
579                let arced_oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync> =
580                    Arc::new(oracle);
581
582                arced_oracle
583            }
584        })
585        .await?;
586
587        mz_foundationdb::shutdown_network();
588
589        Ok(())
590    }
591}