Skip to main content

mz_timestamp_oracle/
foundationdb_oracle.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A timestamp oracle backed by FoundationDB for persistence/durability and where
11//! all oracle operations are self-sufficiently linearized, without requiring
12//! any external precautions/machinery.
13//!
14//! We store the timestamp data in a subspace at the configured path. Each timeline
15//! maps to a subspace with the following structure:
16//! * `./<timeline>/read_ts -> <timestamp>`
17//! * `./<timeline>/write_ts -> <timestamp>`
18
19use std::io::Write;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use anyhow::anyhow;
24use async_trait::async_trait;
25use futures_util::future::FutureExt;
26use mz_foundationdb::FdbConfig;
27use mz_foundationdb::directory::{Directory, DirectoryError, DirectoryLayer, DirectoryOutput};
28use mz_foundationdb::tuple::{
29    PackError, PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset, pack, unpack,
30};
31use mz_foundationdb::{
32    Database, FdbBindingError, FdbError, TransactError, TransactOption, Transaction,
33};
34use mz_ore::metrics::MetricsRegistry;
35use mz_ore::url::SensitiveUrl;
36use mz_repr::Timestamp;
37use tracing::{debug, info};
38
39use crate::metrics::Metrics;
40use crate::{GenericNowFn, TimestampOracle, WriteTimestamp};
41
42/// A [`TimestampOracle`] backed by FoundationDB.
43pub struct FdbTimestampOracle<N> {
44    timeline: String,
45    next: N,
46    db: Arc<Database>,
47    metrics: Arc<Metrics>,
48    /// A read-only timestamp oracle is NOT allowed to do operations that change
49    /// the backing FoundationDB state.
50    read_only: bool,
51    /// read_ts key for this timeline
52    read_ts_key: Vec<u8>,
53    /// write_ts key for this timeline
54    write_ts_key: Vec<u8>,
55}
56
57impl<N> std::fmt::Debug for FdbTimestampOracle<N>
58where
59    N: GenericNowFn<Timestamp> + std::fmt::Debug,
60{
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        f.debug_struct("FdbTimestampOracle")
63            .field("timeline", &self.timeline)
64            .field("next", &self.next)
65            .field("read_only", &self.read_only)
66            .field("read_ts_key", &self.read_ts_key)
67            .field("write_ts_key", &self.write_ts_key)
68            .field("metrics", &self.metrics)
69            .finish_non_exhaustive()
70    }
71}
72
73/// Configuration to connect to a FoundationDB-backed implementation of
74/// [`TimestampOracle`].
75#[derive(Clone, Debug)]
76pub struct FdbTimestampOracleConfig {
77    url: SensitiveUrl,
78    metrics: Arc<Metrics>,
79}
80
81impl FdbTimestampOracleConfig {
82    /// Returns a new instance of [`FdbTimestampOracleConfig`].
83    pub fn new(url: SensitiveUrl, metrics_registry: &MetricsRegistry) -> Self {
84        let metrics = Arc::new(Metrics::new(metrics_registry));
85        Self { url, metrics }
86    }
87
88    /// Returns a new [`FdbTimestampOracleConfig`] for use in unit tests.
89    ///
90    /// By default, fdb oracle tests are no-ops so that `cargo test` works
91    /// on new environments without any configuration. To activate the tests for
92    /// [`FdbTimestampOracle`] set the `FDB_TIMESTAMP_ORACLE_URL` environment variable
93    /// with a valid connection URL.
94    pub fn new_for_test() -> Self {
95        Self {
96            url: FromStr::from_str("foundationdb:?prefix=test/tsoracle").unwrap(),
97            metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
98        }
99    }
100
101    /// Returns the metrics associated with this config.
102    pub(crate) fn metrics(&self) -> &Arc<Metrics> {
103        &self.metrics
104    }
105}
106
107/// An error that can occur during a FoundationDB transaction.
108/// This is either a FoundationDB error or an external error.
109enum FdbTransactError {
110    FdbError(FdbError),
111    ExternalError(anyhow::Error),
112}
113
114impl std::fmt::Debug for FdbTransactError {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        match self {
117            FdbTransactError::FdbError(e) => write!(f, "FdbError({})", e),
118            FdbTransactError::ExternalError(e) => write!(f, "ExternalError({:?})", e),
119        }
120    }
121}
122
123impl From<FdbError> for FdbTransactError {
124    fn from(value: FdbError) -> Self {
125        Self::FdbError(value)
126    }
127}
128
129impl From<anyhow::Error> for FdbTransactError {
130    fn from(value: anyhow::Error) -> Self {
131        Self::ExternalError(value)
132    }
133}
134
135impl From<PackError> for FdbTransactError {
136    fn from(value: PackError) -> Self {
137        Self::ExternalError(anyhow::Error::new(value))
138    }
139}
140
141impl From<FdbBindingError> for FdbTransactError {
142    fn from(value: FdbBindingError) -> Self {
143        Self::ExternalError(anyhow::Error::new(value))
144    }
145}
146
147impl From<FdbTransactError> for anyhow::Error {
148    fn from(value: FdbTransactError) -> Self {
149        match value {
150            FdbTransactError::FdbError(e) => anyhow::Error::new(e),
151            FdbTransactError::ExternalError(e) => e,
152        }
153    }
154}
155
156impl TransactError for FdbTransactError {
157    fn try_into_fdb_error(self) -> Result<FdbError, Self> {
158        match self {
159            Self::FdbError(e) => Ok(e),
160            other => Err(other),
161        }
162    }
163}
164
165/// Wrapper to implement TuplePack/TupleUnpack for Timestamp.
166#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
167struct PackableTimestamp(Timestamp);
168
169impl TuplePack for PackableTimestamp {
170    fn pack<W: Write>(
171        &self,
172        w: &mut W,
173        tuple_depth: TupleDepth,
174    ) -> std::io::Result<VersionstampOffset> {
175        u64::from(self.0).pack(w, tuple_depth)
176    }
177}
178
179impl<'de> TupleUnpack<'de> for PackableTimestamp {
180    fn unpack(input: &'de [u8], tuple_depth: TupleDepth) -> PackResult<(&'de [u8], Self)> {
181        u64::unpack(input, tuple_depth).map(|(rem, v)| (rem, PackableTimestamp(Timestamp::from(v))))
182    }
183}
184
185impl<N: Sync> FdbTimestampOracle<N> {
186    async fn open_inner(
187        timeline: String,
188        next: N,
189        read_only: bool,
190        db: Arc<Database>,
191        metrics: Arc<Metrics>,
192        prefix: Vec<String>,
193        directory: DirectoryLayer,
194    ) -> Result<FdbTimestampOracle<N>, anyhow::Error> {
195        // Create a subspace for this timeline at <prefix>/<timeline>
196        let timeline_path: Vec<_> = prefix
197            .into_iter()
198            .chain(std::iter::once(timeline.clone()))
199            .collect();
200
201        let timeline_subspace = db
202            .run(async |trx, _maybe_committed| {
203                Ok(directory
204                    .create_or_open(&trx, &timeline_path, None, None)
205                    .await)
206            })
207            .await?
208            .map_err(|e| anyhow!("directory error: {e:?}"))?;
209
210        let timeline_subspace = match timeline_subspace {
211            DirectoryOutput::DirectorySubspace(subspace) => subspace,
212            DirectoryOutput::DirectoryPartition(_partition) => {
213                return Err(anyhow!("timestamp oracle timeline cannot be a partition"));
214            }
215        };
216
217        let read_ts_key = timeline_subspace.pack(&"read_ts");
218        let write_ts_key = timeline_subspace.pack(&"write_ts");
219
220        Ok(Self {
221            timeline,
222            next,
223            read_ts_key,
224            write_ts_key,
225            db,
226            metrics,
227            read_only,
228        })
229    }
230
231    async fn max_ts(&self) -> Result<Option<PackableTimestamp>, anyhow::Error> {
232        let max_ts = self
233            .db
234            .transact_boxed(
235                &(),
236                |trx, ()| self.max_rs_trx(trx).boxed(),
237                // This transaction is a pure read, so it is safe to retry on a
238                // `commit_unknown_result` ("transaction may or may not have
239                // committed") error.
240                TransactOption::idempotent(),
241            )
242            .await?;
243        Ok(max_ts)
244    }
245
246    async fn max_rs_trx(
247        &self,
248        trx: &Transaction,
249    ) -> Result<Option<PackableTimestamp>, FdbTransactError> {
250        let read_data = trx.get(&self.read_ts_key, false).await?;
251        let write_data = trx.get(&self.write_ts_key, false).await?;
252
253        let read_ts: Option<PackableTimestamp> =
254            read_data.map(|data| unpack(&data).expect("must unpack"));
255
256        let write_ts: Option<PackableTimestamp> =
257            write_data.map(|data| unpack(&data).expect("must unpack"));
258
259        let max_ts = std::cmp::max(read_ts, write_ts);
260
261        Ok::<_, FdbTransactError>(max_ts)
262    }
263}
264
265impl<N> FdbTimestampOracle<N>
266where
267    N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
268{
269    /// Open a FoundationDB [`TimestampOracle`] instance with `config`, for the
270    /// timeline named `timeline`. `next` generates new timestamps when invoked.
271    /// Timestamps that are returned are made durable and will never retract.
272    pub async fn open(
273        config: FdbTimestampOracleConfig,
274        timeline: String,
275        initially: Timestamp,
276        next: N,
277        read_only: bool,
278    ) -> Result<Self, anyhow::Error> {
279        info!(config = ?config, "opening FdbTimestampOracle");
280
281        let fdb_config = FdbConfig::parse(&config.url)?;
282
283        mz_foundationdb::init_network();
284
285        let db = Arc::new(Database::new(None)?);
286        // In tests, bound transactions so an unresponsive server fails fast
287        // instead of hanging until the harness terminates the process.
288        #[cfg(test)]
289        mz_foundationdb::set_test_transaction_timeout(&db);
290        let metrics = Arc::clone(&config.metrics);
291        let prefix = fdb_config.prefix;
292        let directory = DirectoryLayer::default();
293
294        let oracle =
295            Self::open_inner(timeline, next, read_only, db, metrics, prefix, directory).await?;
296
297        // Initialize the timestamps for this timeline if they don't exist.
298        oracle.initialize(initially).await?;
299
300        Ok(oracle)
301    }
302
303    /// Initialize the timestamps for this timeline if they don't exist.
304    ///
305    /// Both `read_ts` and `write_ts` are set to `initially` if they do not already exist.
306    async fn initialize(&self, initially: Timestamp) -> Result<(), FdbBindingError> {
307        // Initialize the timestamps for this timeline if they don't exist.
308        // Keys are stored as <timeline_subspace>/read_ts and <timeline_subspace>/write_ts
309        let initially_packed = pack(&PackableTimestamp(initially));
310
311        self.db
312            .run(async |trx, _maybe_committed| {
313                // Check if read_ts exists, if not initialize it
314                let existing_read = trx.get(&self.read_ts_key, false).await?;
315                if existing_read.is_none() {
316                    trx.set(&self.read_ts_key, &initially_packed);
317                }
318
319                // Check if write_ts exists, if not initialize it
320                let existing_write = trx.get(&self.write_ts_key, false).await?;
321                if existing_write.is_none() {
322                    trx.set(&self.write_ts_key, &initially_packed);
323                }
324
325                Ok(())
326            })
327            .await?;
328
329        // Forward timestamps to what we're given from outside.
330        if !self.read_only {
331            self.apply_write(initially).await;
332        }
333
334        Ok(())
335    }
336
337    /// Returns a `Vec` of all known timelines along with their current greatest
338    /// timestamp (max of read_ts and write_ts).
339    ///
340    /// For use when initializing another [`TimestampOracle`] implementation
341    /// from another oracle's state.
342    pub async fn get_all_timelines(
343        config: FdbTimestampOracleConfig,
344    ) -> Result<Vec<(String, Timestamp)>, anyhow::Error> {
345        let fdb_config = FdbConfig::parse(&config.url)?;
346
347        mz_foundationdb::init_network();
348
349        let db = Arc::new(Database::new(None)?);
350        // In tests, bound transactions so an unresponsive server fails fast
351        // instead of hanging until the harness terminates the process.
352        #[cfg(test)]
353        mz_foundationdb::set_test_transaction_timeout(&db);
354        let metrics = Arc::clone(&config.metrics);
355        let prefix = fdb_config.prefix;
356        let directory = DirectoryLayer::default();
357
358        // List all timeline directories under the prefix
359        let timeline_names = match db
360            .run(async |trx, _maybe_committed| Ok(directory.list(&trx, &prefix).await))
361            .await?
362        {
363            Err(DirectoryError::PathDoesNotExists) => Vec::new(), // No timelines exist yet
364            Err(e) => return Err(anyhow!("directory error: {e:?}")),
365            Ok(timelines) => timelines,
366        };
367
368        let mut result = Vec::with_capacity(timeline_names.len());
369
370        // For each timeline, read the max of read_ts and write_ts
371        for timeline_name in timeline_names {
372            let oracle = FdbTimestampOracle::<()>::open_inner(
373                timeline_name.clone(),
374                (),
375                true,
376                Arc::clone(&db),
377                Arc::clone(&metrics),
378                prefix.clone(),
379                directory.clone(),
380            )
381            .await?;
382
383            if let Some(ts) = oracle.max_ts().await? {
384                result.push((timeline_name, ts.0));
385            }
386        }
387
388        Ok(result)
389    }
390
391    async fn write_ts_trx(
392        &self,
393        trx: &Transaction,
394        proposed_next_ts: Timestamp,
395    ) -> Result<Timestamp, FdbTransactError> {
396        // Get current write_ts
397        let current = trx.get(&self.write_ts_key, false).await?;
398        let current_ts: PackableTimestamp = match current {
399            Some(data) => unpack(&data)?,
400            None => {
401                return Err(FdbTransactError::ExternalError(anyhow!(
402                    "timeline not initialized"
403                )));
404            }
405        };
406
407        // Calculate new timestamp: GREATEST(write_ts+1, proposed_next_ts)
408        let incremented = current_ts.0.step_forward();
409        let new_ts = std::cmp::max(incremented, proposed_next_ts);
410
411        // Update write_ts
412        let new_ts_packed = pack(&PackableTimestamp(new_ts));
413        trx.set(&self.write_ts_key, &new_ts_packed);
414
415        Ok(new_ts)
416    }
417
418    async fn peek_write_ts_trx(
419        &self,
420        trx: &Transaction,
421    ) -> Result<Option<PackableTimestamp>, FdbTransactError> {
422        let data = trx.get(&self.write_ts_key, false).await?;
423        Ok(data.map(|data| unpack(&data)).transpose()?)
424    }
425
426    async fn read_ts_trx(
427        &self,
428        trx: &Transaction,
429    ) -> Result<Option<PackableTimestamp>, FdbTransactError> {
430        let data = trx.get(&self.read_ts_key, false).await?;
431        Ok(data.map(|data| unpack(&data)).transpose()?)
432    }
433
434    async fn apply_write_trx(
435        &self,
436        trx: &Transaction,
437        write_ts: Timestamp,
438    ) -> Result<(), FdbTransactError> {
439        // Update read_ts = GREATEST(read_ts, write_ts)
440        let current_read = trx.get(&self.read_ts_key, false).await?;
441        let current_read_ts: PackableTimestamp = match current_read {
442            Some(data) => unpack(&data)?,
443            None => {
444                return Err(FdbTransactError::ExternalError(anyhow!(
445                    "timeline not initialized"
446                )));
447            }
448        };
449
450        if write_ts > current_read_ts.0 {
451            let new_ts_packed = pack(&PackableTimestamp(write_ts));
452            trx.set(&self.read_ts_key, &new_ts_packed);
453        }
454
455        // Update write_ts = GREATEST(write_ts, write_ts_param)
456        let current_write = trx.get(&self.write_ts_key, false).await?;
457        let current_write_ts: PackableTimestamp = match current_write {
458            Some(data) => unpack(&data)?,
459            None => {
460                return Err(FdbTransactError::ExternalError(anyhow!(
461                    "timeline not initialized"
462                )));
463            }
464        };
465
466        if write_ts > current_write_ts.0 {
467            let new_ts_packed = pack(&PackableTimestamp(write_ts));
468            trx.set(&self.write_ts_key, &new_ts_packed);
469        }
470
471        Ok::<_, FdbTransactError>(())
472    }
473}
474
475#[async_trait]
476impl<N> TimestampOracle<Timestamp> for FdbTimestampOracle<N>
477where
478    N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
479{
480    async fn write_ts(&self) -> WriteTimestamp<Timestamp> {
481        if self.read_only {
482            panic!("attempting write_ts in read-only mode");
483        }
484
485        let proposed_next_ts = self.next.now();
486
487        let write_ts: Timestamp = self
488            .metrics
489            .oracle
490            .write_ts
491            .run_op(|| async {
492                self.db
493                    .transact_boxed(
494                        &proposed_next_ts,
495                        |trx, proposed_next_ts| self.write_ts_trx(trx, **proposed_next_ts).boxed(),
496                        // Safe to retry on a `commit_unknown_result` error: the
497                        // transaction re-reads `write_ts` and hands out
498                        // `max(write_ts + 1, proposed_next_ts)`, so a retry only
499                        // ever advances the timestamp further, preserving
500                        // monotonicity (it may skip a timestamp, which is fine).
501                        TransactOption::idempotent(),
502                    )
503                    .await
504                    .map_err(anyhow::Error::from)
505            })
506            .await
507            .expect("write_ts transaction failed");
508
509        debug!(
510            timeline = ?self.timeline,
511            write_ts = ?write_ts,
512            proposed_next_ts = ?proposed_next_ts,
513            "returning from write_ts()"
514        );
515
516        let advance_to = write_ts.step_forward();
517
518        WriteTimestamp {
519            timestamp: write_ts,
520            advance_to,
521        }
522    }
523
524    async fn peek_write_ts(&self) -> Timestamp {
525        let write_ts = self
526            .metrics
527            .oracle
528            .peek_write_ts
529            .run_op(|| async {
530                self.db
531                    .transact_boxed(
532                        &(),
533                        |trx, ()| self.peek_write_ts_trx(trx).boxed(),
534                        // Pure read, safe to retry on `commit_unknown_result`.
535                        TransactOption::idempotent(),
536                    )
537                    .await
538                    .map_err(anyhow::Error::from)?
539                    .ok_or_else(|| anyhow!("timeline not initialized"))
540                    .map(|ts| ts.0)
541            })
542            .await
543            .expect("peek_write_ts transaction failed");
544
545        debug!(
546            timeline = ?self.timeline,
547            write_ts = ?write_ts,
548            "returning from peek_write_ts()"
549        );
550
551        write_ts
552    }
553
554    async fn read_ts(&self) -> Timestamp {
555        let read_ts = self
556            .metrics
557            .oracle
558            .read_ts
559            .run_op(|| async {
560                self.db
561                    .transact_boxed(
562                        &(),
563                        |trx, ()| self.read_ts_trx(trx).boxed(),
564                        // Pure read, safe to retry on `commit_unknown_result`.
565                        TransactOption::idempotent(),
566                    )
567                    .await
568                    .map_err(anyhow::Error::from)?
569                    .ok_or_else(|| anyhow!("timeline not initialized"))
570                    .map(|ts| ts.0)
571            })
572            .await
573            .expect("read_ts transaction failed");
574
575        debug!(
576            timeline = ?self.timeline,
577            read_ts = ?read_ts,
578            "returning from read_ts()"
579        );
580
581        read_ts
582    }
583
584    async fn apply_write(&self, write_ts: Timestamp) {
585        if self.read_only {
586            panic!("attempting apply_write in read-only mode");
587        }
588
589        self.metrics
590            .oracle
591            .apply_write
592            .run_op(|| async {
593                self.db
594                    .transact_boxed(
595                        &write_ts,
596                        |trx, write_ts| self.apply_write_trx(trx, **write_ts).boxed(),
597                        // `apply_write_trx` only bumps `read_ts`/`write_ts` to
598                        // `GREATEST(current, write_ts)`, so it is idempotent and
599                        // safe to retry on a `commit_unknown_result` ("transaction
600                        // may or may not have committed") error rather than
601                        // panicking. This is the failure observed in PER-25.
602                        TransactOption::idempotent(),
603                    )
604                    .await
605                    .map_err(anyhow::Error::from)
606            })
607            .await
608            .expect("apply_write transaction failed");
609
610        debug!(
611            timeline = ?self.timeline,
612            write_ts = ?write_ts,
613            "returning from apply_write()"
614        );
615    }
616}
617
618#[cfg(test)]
619mod tests {
620    use super::*;
621
622    use mz_ore::now::NowFn;
623
624    use crate::TimestampOracle;
625
626    #[mz_ore::test(tokio::test)]
627    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function
628    async fn test_fdb_timestamp_oracle() -> Result<(), anyhow::Error> {
629        let config = FdbTimestampOracleConfig::new_for_test();
630
631        crate::tests::timestamp_oracle_impl_test(|timeline, now_fn: NowFn, initial_ts| {
632            let config = config.clone();
633            async move {
634                let oracle = FdbTimestampOracle::open(config, timeline, initial_ts, now_fn, false)
635                    .await
636                    .expect("failed to open FdbTimestampOracle");
637
638                let arced_oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync> =
639                    Arc::new(oracle);
640
641                arced_oracle
642            }
643        })
644        .await?;
645
646        // The network must be stopped before the process exits, otherwise the
647        // FoundationDB client can segfault during teardown. All oracles (and their
648        // `Database` handles) created above have been dropped by now, so the
649        // network thread join does not block.
650        mz_foundationdb::shutdown_network();
651
652        Ok(())
653    }
654}