Skip to main content

mz_timestamp_oracle/
foundationdb_oracle.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A timestamp oracle backed by FoundationDB for persistence/durability and where
11//! all oracle operations are self-sufficiently linearized, without requiring
12//! any external precautions/machinery.
13//!
14//! We store the timestamp data in a subspace at the configured path. Each timeline
15//! maps to a subspace with the following structure:
16//! * `./<timeline>/read_ts -> <timestamp>`
17//! * `./<timeline>/write_ts -> <timestamp>`
18
19use std::io::Write;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use anyhow::anyhow;
24use async_trait::async_trait;
25use futures_util::future::FutureExt;
26use mz_foundationdb::FdbConfig;
27use mz_foundationdb::directory::{Directory, DirectoryError, DirectoryLayer, DirectoryOutput};
28use mz_foundationdb::tuple::{
29    PackError, PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset, pack, unpack,
30};
31use mz_foundationdb::{
32    Database, FdbBindingError, FdbError, TransactError, TransactOption, Transaction,
33};
34use mz_ore::metrics::MetricsRegistry;
35use mz_ore::url::SensitiveUrl;
36use mz_repr::Timestamp;
37use tracing::{debug, info};
38
39use crate::metrics::Metrics;
40use crate::{GenericNowFn, TimestampOracle, WriteTimestamp};
41
42/// A [`TimestampOracle`] backed by FoundationDB.
43pub struct FdbTimestampOracle<N> {
44    timeline: String,
45    next: N,
46    db: Arc<Database>,
47    metrics: Arc<Metrics>,
48    /// A read-only timestamp oracle is NOT allowed to do operations that change
49    /// the backing FoundationDB state.
50    read_only: bool,
51    /// read_ts key for this timeline
52    read_ts_key: Vec<u8>,
53    /// write_ts key for this timeline
54    write_ts_key: Vec<u8>,
55}
56
57impl<N> std::fmt::Debug for FdbTimestampOracle<N>
58where
59    N: GenericNowFn<Timestamp> + std::fmt::Debug,
60{
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        f.debug_struct("FdbTimestampOracle")
63            .field("timeline", &self.timeline)
64            .field("next", &self.next)
65            .field("read_only", &self.read_only)
66            .field("read_ts_key", &self.read_ts_key)
67            .field("write_ts_key", &self.write_ts_key)
68            .field("metrics", &self.metrics)
69            .finish_non_exhaustive()
70    }
71}
72
73/// Configuration to connect to a FoundationDB-backed implementation of
74/// [`TimestampOracle`].
75#[derive(Clone, Debug)]
76pub struct FdbTimestampOracleConfig {
77    url: SensitiveUrl,
78    metrics: Arc<Metrics>,
79}
80
81impl FdbTimestampOracleConfig {
82    /// Returns a new instance of [`FdbTimestampOracleConfig`].
83    pub fn new(url: SensitiveUrl, metrics_registry: &MetricsRegistry) -> Self {
84        let metrics = Arc::new(Metrics::new(metrics_registry));
85        Self { url, metrics }
86    }
87
88    /// Returns a new [`FdbTimestampOracleConfig`] for use in unit tests.
89    ///
90    /// By default, fdb oracle tests are no-ops so that `cargo test` works
91    /// on new environments without any configuration. To activate the tests for
92    /// [`FdbTimestampOracle`] set the `FDB_TIMESTAMP_ORACLE_URL` environment variable
93    /// with a valid connection URL.
94    pub fn new_for_test() -> Self {
95        Self {
96            url: FromStr::from_str("foundationdb:?prefix=test/tsoracle").unwrap(),
97            metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
98        }
99    }
100
101    /// Returns the metrics associated with this config.
102    pub(crate) fn metrics(&self) -> &Arc<Metrics> {
103        &self.metrics
104    }
105}
106
107/// An error that can occur during a FoundationDB transaction.
108/// This is either a FoundationDB error or an external error.
109enum FdbTransactError {
110    FdbError(FdbError),
111    ExternalError(anyhow::Error),
112}
113
114impl std::fmt::Debug for FdbTransactError {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        match self {
117            FdbTransactError::FdbError(e) => write!(f, "FdbError({})", e),
118            FdbTransactError::ExternalError(e) => write!(f, "ExternalError({:?})", e),
119        }
120    }
121}
122
123impl From<FdbError> for FdbTransactError {
124    fn from(value: FdbError) -> Self {
125        Self::FdbError(value)
126    }
127}
128
129impl From<anyhow::Error> for FdbTransactError {
130    fn from(value: anyhow::Error) -> Self {
131        Self::ExternalError(value)
132    }
133}
134
135impl From<PackError> for FdbTransactError {
136    fn from(value: PackError) -> Self {
137        Self::ExternalError(anyhow::Error::new(value))
138    }
139}
140
141impl From<FdbBindingError> for FdbTransactError {
142    fn from(value: FdbBindingError) -> Self {
143        Self::ExternalError(anyhow::Error::new(value))
144    }
145}
146
147impl From<FdbTransactError> for anyhow::Error {
148    fn from(value: FdbTransactError) -> Self {
149        match value {
150            FdbTransactError::FdbError(e) => anyhow::Error::new(e),
151            FdbTransactError::ExternalError(e) => e,
152        }
153    }
154}
155
156impl TransactError for FdbTransactError {
157    fn try_into_fdb_error(self) -> Result<FdbError, Self> {
158        match self {
159            Self::FdbError(e) => Ok(e),
160            other => Err(other),
161        }
162    }
163}
164
165/// Wrapper to implement TuplePack/TupleUnpack for Timestamp.
166#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
167struct PackableTimestamp(Timestamp);
168
169impl TuplePack for PackableTimestamp {
170    fn pack<W: Write>(
171        &self,
172        w: &mut W,
173        tuple_depth: TupleDepth,
174    ) -> std::io::Result<VersionstampOffset> {
175        u64::from(self.0).pack(w, tuple_depth)
176    }
177}
178
179impl<'de> TupleUnpack<'de> for PackableTimestamp {
180    fn unpack(input: &'de [u8], tuple_depth: TupleDepth) -> PackResult<(&'de [u8], Self)> {
181        u64::unpack(input, tuple_depth).map(|(rem, v)| (rem, PackableTimestamp(Timestamp::from(v))))
182    }
183}
184
185impl<N: Sync> FdbTimestampOracle<N> {
186    async fn open_inner(
187        timeline: String,
188        next: N,
189        read_only: bool,
190        db: Arc<Database>,
191        metrics: Arc<Metrics>,
192        prefix: Vec<String>,
193        directory: DirectoryLayer,
194    ) -> Result<FdbTimestampOracle<N>, anyhow::Error> {
195        // Create a subspace for this timeline at <prefix>/<timeline>
196        let timeline_path: Vec<_> = prefix
197            .into_iter()
198            .chain(std::iter::once(timeline.clone()))
199            .collect();
200
201        let timeline_subspace = db
202            .run(async |trx, _maybe_committed| {
203                Ok(directory
204                    .create_or_open(&trx, &timeline_path, None, None)
205                    .await)
206            })
207            .await?
208            .map_err(|e| anyhow!("directory error: {e:?}"))?;
209
210        let timeline_subspace = match timeline_subspace {
211            DirectoryOutput::DirectorySubspace(subspace) => subspace,
212            DirectoryOutput::DirectoryPartition(_partition) => {
213                return Err(anyhow!("timestamp oracle timeline cannot be a partition"));
214            }
215        };
216
217        let read_ts_key = timeline_subspace.pack(&"read_ts");
218        let write_ts_key = timeline_subspace.pack(&"write_ts");
219
220        Ok(Self {
221            timeline,
222            next,
223            read_ts_key,
224            write_ts_key,
225            db,
226            metrics,
227            read_only,
228        })
229    }
230
231    async fn max_ts(&self) -> Result<Option<PackableTimestamp>, anyhow::Error> {
232        let max_ts = self
233            .db
234            .transact_boxed(
235                &(),
236                |trx, ()| self.max_rs_trx(trx).boxed(),
237                TransactOption::default(),
238            )
239            .await?;
240        Ok(max_ts)
241    }
242
243    async fn max_rs_trx(
244        &self,
245        trx: &Transaction,
246    ) -> Result<Option<PackableTimestamp>, FdbTransactError> {
247        let read_data = trx.get(&self.read_ts_key, false).await?;
248        let write_data = trx.get(&self.write_ts_key, false).await?;
249
250        let read_ts: Option<PackableTimestamp> =
251            read_data.map(|data| unpack(&data).expect("must unpack"));
252
253        let write_ts: Option<PackableTimestamp> =
254            write_data.map(|data| unpack(&data).expect("must unpack"));
255
256        let max_ts = std::cmp::max(read_ts, write_ts);
257
258        Ok::<_, FdbTransactError>(max_ts)
259    }
260}
261
262impl<N> FdbTimestampOracle<N>
263where
264    N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
265{
266    /// Open a FoundationDB [`TimestampOracle`] instance with `config`, for the
267    /// timeline named `timeline`. `next` generates new timestamps when invoked.
268    /// Timestamps that are returned are made durable and will never retract.
269    pub async fn open(
270        config: FdbTimestampOracleConfig,
271        timeline: String,
272        initially: Timestamp,
273        next: N,
274        read_only: bool,
275    ) -> Result<Self, anyhow::Error> {
276        info!(config = ?config, "opening FdbTimestampOracle");
277
278        let fdb_config = FdbConfig::parse(&config.url)?;
279
280        mz_foundationdb::init_network();
281
282        let db = Arc::new(Database::new(None)?);
283        let metrics = Arc::clone(&config.metrics);
284        let prefix = fdb_config.prefix;
285        let directory = DirectoryLayer::default();
286
287        let oracle =
288            Self::open_inner(timeline, next, read_only, db, metrics, prefix, directory).await?;
289
290        // Initialize the timestamps for this timeline if they don't exist.
291        oracle.initialize(initially).await?;
292
293        Ok(oracle)
294    }
295
296    /// Initialize the timestamps for this timeline if they don't exist.
297    ///
298    /// Both `read_ts` and `write_ts` are set to `initially` if they do not already exist.
299    async fn initialize(&self, initially: Timestamp) -> Result<(), FdbBindingError> {
300        // Initialize the timestamps for this timeline if they don't exist.
301        // Keys are stored as <timeline_subspace>/read_ts and <timeline_subspace>/write_ts
302        let initially_packed = pack(&PackableTimestamp(initially));
303
304        self.db
305            .run(async |trx, _maybe_committed| {
306                // Check if read_ts exists, if not initialize it
307                let existing_read = trx.get(&self.read_ts_key, false).await?;
308                if existing_read.is_none() {
309                    trx.set(&self.read_ts_key, &initially_packed);
310                }
311
312                // Check if write_ts exists, if not initialize it
313                let existing_write = trx.get(&self.write_ts_key, false).await?;
314                if existing_write.is_none() {
315                    trx.set(&self.write_ts_key, &initially_packed);
316                }
317
318                Ok(())
319            })
320            .await?;
321
322        // Forward timestamps to what we're given from outside.
323        if !self.read_only {
324            self.apply_write(initially).await;
325        }
326
327        Ok(())
328    }
329
330    /// Returns a `Vec` of all known timelines along with their current greatest
331    /// timestamp (max of read_ts and write_ts).
332    ///
333    /// For use when initializing another [`TimestampOracle`] implementation
334    /// from another oracle's state.
335    pub async fn get_all_timelines(
336        config: FdbTimestampOracleConfig,
337    ) -> Result<Vec<(String, Timestamp)>, anyhow::Error> {
338        let fdb_config = FdbConfig::parse(&config.url)?;
339
340        mz_foundationdb::init_network();
341
342        let db = Arc::new(Database::new(None)?);
343        let metrics = Arc::clone(&config.metrics);
344        let prefix = fdb_config.prefix;
345        let directory = DirectoryLayer::default();
346
347        // List all timeline directories under the prefix
348        let timeline_names = match db
349            .run(async |trx, _maybe_committed| Ok(directory.list(&trx, &prefix).await))
350            .await?
351        {
352            Err(DirectoryError::PathDoesNotExists) => Vec::new(), // No timelines exist yet
353            Err(e) => return Err(anyhow!("directory error: {e:?}")),
354            Ok(timelines) => timelines,
355        };
356
357        let mut result = Vec::with_capacity(timeline_names.len());
358
359        // For each timeline, read the max of read_ts and write_ts
360        for timeline_name in timeline_names {
361            let oracle = FdbTimestampOracle::<()>::open_inner(
362                timeline_name.clone(),
363                (),
364                true,
365                Arc::clone(&db),
366                Arc::clone(&metrics),
367                prefix.clone(),
368                directory.clone(),
369            )
370            .await?;
371
372            if let Some(ts) = oracle.max_ts().await? {
373                result.push((timeline_name, ts.0));
374            }
375        }
376
377        Ok(result)
378    }
379
380    async fn write_ts_trx(
381        &self,
382        trx: &Transaction,
383        proposed_next_ts: Timestamp,
384    ) -> Result<Timestamp, FdbTransactError> {
385        // Get current write_ts
386        let current = trx.get(&self.write_ts_key, false).await?;
387        let current_ts: PackableTimestamp = match current {
388            Some(data) => unpack(&data)?,
389            None => {
390                return Err(FdbTransactError::ExternalError(anyhow!(
391                    "timeline not initialized"
392                )));
393            }
394        };
395
396        // Calculate new timestamp: GREATEST(write_ts+1, proposed_next_ts)
397        let incremented = current_ts.0.step_forward();
398        let new_ts = std::cmp::max(incremented, proposed_next_ts);
399
400        // Update write_ts
401        let new_ts_packed = pack(&PackableTimestamp(new_ts));
402        trx.set(&self.write_ts_key, &new_ts_packed);
403
404        Ok(new_ts)
405    }
406
407    async fn peek_write_ts_trx(
408        &self,
409        trx: &Transaction,
410    ) -> Result<Option<PackableTimestamp>, FdbTransactError> {
411        let data = trx.get(&self.write_ts_key, false).await?;
412        Ok(data.map(|data| unpack(&data)).transpose()?)
413    }
414
415    async fn read_ts_trx(
416        &self,
417        trx: &Transaction,
418    ) -> Result<Option<PackableTimestamp>, FdbTransactError> {
419        let data = trx.get(&self.read_ts_key, false).await?;
420        Ok(data.map(|data| unpack(&data)).transpose()?)
421    }
422
423    async fn apply_write_trx(
424        &self,
425        trx: &Transaction,
426        write_ts: Timestamp,
427    ) -> Result<(), FdbTransactError> {
428        // Update read_ts = GREATEST(read_ts, write_ts)
429        let current_read = trx.get(&self.read_ts_key, false).await?;
430        let current_read_ts: PackableTimestamp = match current_read {
431            Some(data) => unpack(&data)?,
432            None => {
433                return Err(FdbTransactError::ExternalError(anyhow!(
434                    "timeline not initialized"
435                )));
436            }
437        };
438
439        if write_ts > current_read_ts.0 {
440            let new_ts_packed = pack(&PackableTimestamp(write_ts));
441            trx.set(&self.read_ts_key, &new_ts_packed);
442        }
443
444        // Update write_ts = GREATEST(write_ts, write_ts_param)
445        let current_write = trx.get(&self.write_ts_key, false).await?;
446        let current_write_ts: PackableTimestamp = match current_write {
447            Some(data) => unpack(&data)?,
448            None => {
449                return Err(FdbTransactError::ExternalError(anyhow!(
450                    "timeline not initialized"
451                )));
452            }
453        };
454
455        if write_ts > current_write_ts.0 {
456            let new_ts_packed = pack(&PackableTimestamp(write_ts));
457            trx.set(&self.write_ts_key, &new_ts_packed);
458        }
459
460        Ok::<_, FdbTransactError>(())
461    }
462}
463
464#[async_trait]
465impl<N> TimestampOracle<Timestamp> for FdbTimestampOracle<N>
466where
467    N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
468{
469    async fn write_ts(&self) -> WriteTimestamp<Timestamp> {
470        if self.read_only {
471            panic!("attempting write_ts in read-only mode");
472        }
473
474        let proposed_next_ts = self.next.now();
475
476        let write_ts: Timestamp = self
477            .metrics
478            .oracle
479            .write_ts
480            .run_op(|| async {
481                self.db
482                    .transact_boxed(
483                        &proposed_next_ts,
484                        |trx, proposed_next_ts| self.write_ts_trx(trx, **proposed_next_ts).boxed(),
485                        TransactOption::default(),
486                    )
487                    .await
488                    .map_err(anyhow::Error::from)
489            })
490            .await
491            .expect("write_ts transaction failed");
492
493        debug!(
494            timeline = ?self.timeline,
495            write_ts = ?write_ts,
496            proposed_next_ts = ?proposed_next_ts,
497            "returning from write_ts()"
498        );
499
500        let advance_to = write_ts.step_forward();
501
502        WriteTimestamp {
503            timestamp: write_ts,
504            advance_to,
505        }
506    }
507
508    async fn peek_write_ts(&self) -> Timestamp {
509        let write_ts = self
510            .metrics
511            .oracle
512            .peek_write_ts
513            .run_op(|| async {
514                self.db
515                    .transact_boxed(
516                        &(),
517                        |trx, ()| self.peek_write_ts_trx(trx).boxed(),
518                        TransactOption::default(),
519                    )
520                    .await
521                    .map_err(anyhow::Error::from)?
522                    .ok_or_else(|| anyhow!("timeline not initialized"))
523                    .map(|ts| ts.0)
524            })
525            .await
526            .expect("peek_write_ts transaction failed");
527
528        debug!(
529            timeline = ?self.timeline,
530            write_ts = ?write_ts,
531            "returning from peek_write_ts()"
532        );
533
534        write_ts
535    }
536
537    async fn read_ts(&self) -> Timestamp {
538        let read_ts = self
539            .metrics
540            .oracle
541            .read_ts
542            .run_op(|| async {
543                self.db
544                    .transact_boxed(
545                        &(),
546                        |trx, ()| self.read_ts_trx(trx).boxed(),
547                        TransactOption::default(),
548                    )
549                    .await
550                    .map_err(anyhow::Error::from)?
551                    .ok_or_else(|| anyhow!("timeline not initialized"))
552                    .map(|ts| ts.0)
553            })
554            .await
555            .expect("read_ts transaction failed");
556
557        debug!(
558            timeline = ?self.timeline,
559            read_ts = ?read_ts,
560            "returning from read_ts()"
561        );
562
563        read_ts
564    }
565
566    async fn apply_write(&self, write_ts: Timestamp) {
567        if self.read_only {
568            panic!("attempting apply_write in read-only mode");
569        }
570
571        self.metrics
572            .oracle
573            .apply_write
574            .run_op(|| async {
575                self.db
576                    .transact_boxed(
577                        &write_ts,
578                        |trx, write_ts| self.apply_write_trx(trx, **write_ts).boxed(),
579                        TransactOption::default(),
580                    )
581                    .await
582                    .map_err(anyhow::Error::from)
583            })
584            .await
585            .expect("apply_write transaction failed");
586
587        debug!(
588            timeline = ?self.timeline,
589            write_ts = ?write_ts,
590            "returning from apply_write()"
591        );
592    }
593}
594
595#[cfg(test)]
596mod tests {
597    use super::*;
598
599    use mz_ore::now::NowFn;
600
601    use crate::TimestampOracle;
602
603    #[mz_ore::test(tokio::test)]
604    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function
605    #[ignore] // TODO: Reenable when https://github.com/MaterializeInc/database-issues/issues/10076 is fixed
606    async fn test_fdb_timestamp_oracle() -> Result<(), anyhow::Error> {
607        let config = FdbTimestampOracleConfig::new_for_test();
608
609        crate::tests::timestamp_oracle_impl_test(|timeline, now_fn: NowFn, initial_ts| {
610            let config = config.clone();
611            async move {
612                let oracle = FdbTimestampOracle::open(config, timeline, initial_ts, now_fn, false)
613                    .await
614                    .expect("failed to open FdbTimestampOracle");
615
616                let arced_oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync> =
617                    Arc::new(oracle);
618
619                arced_oracle
620            }
621        })
622        .await?;
623
624        mz_foundationdb::shutdown_network();
625
626        Ok(())
627    }
628}