1use std::io::Write;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use anyhow::anyhow;
24use async_trait::async_trait;
25use futures_util::future::FutureExt;
26use mz_foundationdb::FdbConfig;
27use mz_foundationdb::directory::{Directory, DirectoryError, DirectoryLayer, DirectoryOutput};
28use mz_foundationdb::tuple::{
29 PackError, PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset, pack, unpack,
30};
31use mz_foundationdb::{
32 Database, FdbBindingError, FdbError, TransactError, TransactOption, Transaction,
33};
34use mz_ore::metrics::MetricsRegistry;
35use mz_ore::url::SensitiveUrl;
36use mz_repr::Timestamp;
37use tracing::{debug, info};
38
39use crate::metrics::Metrics;
40use crate::{GenericNowFn, TimestampOracle, WriteTimestamp};
41
42pub struct FdbTimestampOracle<N> {
44 timeline: String,
45 next: N,
46 db: Arc<Database>,
47 metrics: Arc<Metrics>,
48 read_only: bool,
51 read_ts_key: Vec<u8>,
53 write_ts_key: Vec<u8>,
55}
56
57impl<N> std::fmt::Debug for FdbTimestampOracle<N>
58where
59 N: GenericNowFn<Timestamp> + std::fmt::Debug,
60{
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 f.debug_struct("FdbTimestampOracle")
63 .field("timeline", &self.timeline)
64 .field("next", &self.next)
65 .field("read_only", &self.read_only)
66 .field("read_ts_key", &self.read_ts_key)
67 .field("write_ts_key", &self.write_ts_key)
68 .field("metrics", &self.metrics)
69 .finish_non_exhaustive()
70 }
71}
72
73#[derive(Clone, Debug)]
76pub struct FdbTimestampOracleConfig {
77 url: SensitiveUrl,
78 metrics: Arc<Metrics>,
79}
80
81impl FdbTimestampOracleConfig {
82 pub fn new(url: SensitiveUrl, metrics_registry: &MetricsRegistry) -> Self {
84 let metrics = Arc::new(Metrics::new(metrics_registry));
85 Self { url, metrics }
86 }
87
88 pub fn new_for_test() -> Self {
95 Self {
96 url: FromStr::from_str("foundationdb:?prefix=test/tsoracle").unwrap(),
97 metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
98 }
99 }
100
101 pub(crate) fn metrics(&self) -> &Arc<Metrics> {
103 &self.metrics
104 }
105}
106
107enum FdbTransactError {
110 FdbError(FdbError),
111 ExternalError(anyhow::Error),
112}
113
114impl std::fmt::Debug for FdbTransactError {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 match self {
117 FdbTransactError::FdbError(e) => write!(f, "FdbError({})", e),
118 FdbTransactError::ExternalError(e) => write!(f, "ExternalError({:?})", e),
119 }
120 }
121}
122
123impl From<FdbError> for FdbTransactError {
124 fn from(value: FdbError) -> Self {
125 Self::FdbError(value)
126 }
127}
128
129impl From<anyhow::Error> for FdbTransactError {
130 fn from(value: anyhow::Error) -> Self {
131 Self::ExternalError(value)
132 }
133}
134
135impl From<PackError> for FdbTransactError {
136 fn from(value: PackError) -> Self {
137 Self::ExternalError(anyhow::Error::new(value))
138 }
139}
140
141impl From<FdbBindingError> for FdbTransactError {
142 fn from(value: FdbBindingError) -> Self {
143 Self::ExternalError(anyhow::Error::new(value))
144 }
145}
146
147impl From<FdbTransactError> for anyhow::Error {
148 fn from(value: FdbTransactError) -> Self {
149 match value {
150 FdbTransactError::FdbError(e) => anyhow::Error::new(e),
151 FdbTransactError::ExternalError(e) => e,
152 }
153 }
154}
155
156impl TransactError for FdbTransactError {
157 fn try_into_fdb_error(self) -> Result<FdbError, Self> {
158 match self {
159 Self::FdbError(e) => Ok(e),
160 other => Err(other),
161 }
162 }
163}
164
165#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
167struct PackableTimestamp(Timestamp);
168
169impl TuplePack for PackableTimestamp {
170 fn pack<W: Write>(
171 &self,
172 w: &mut W,
173 tuple_depth: TupleDepth,
174 ) -> std::io::Result<VersionstampOffset> {
175 u64::from(self.0).pack(w, tuple_depth)
176 }
177}
178
179impl<'de> TupleUnpack<'de> for PackableTimestamp {
180 fn unpack(input: &'de [u8], tuple_depth: TupleDepth) -> PackResult<(&'de [u8], Self)> {
181 u64::unpack(input, tuple_depth).map(|(rem, v)| (rem, PackableTimestamp(Timestamp::from(v))))
182 }
183}
184
185impl<N: Sync> FdbTimestampOracle<N> {
186 async fn open_inner(
187 timeline: String,
188 next: N,
189 read_only: bool,
190 db: Arc<Database>,
191 metrics: Arc<Metrics>,
192 prefix: Vec<String>,
193 directory: DirectoryLayer,
194 ) -> Result<FdbTimestampOracle<N>, anyhow::Error> {
195 let timeline_path: Vec<_> = prefix
197 .into_iter()
198 .chain(std::iter::once(timeline.clone()))
199 .collect();
200
201 let timeline_subspace = db
202 .run(async |trx, _maybe_committed| {
203 Ok(directory
204 .create_or_open(&trx, &timeline_path, None, None)
205 .await)
206 })
207 .await?
208 .map_err(|e| anyhow!("directory error: {e:?}"))?;
209
210 let timeline_subspace = match timeline_subspace {
211 DirectoryOutput::DirectorySubspace(subspace) => subspace,
212 DirectoryOutput::DirectoryPartition(_partition) => {
213 return Err(anyhow!("timestamp oracle timeline cannot be a partition"));
214 }
215 };
216
217 let read_ts_key = timeline_subspace.pack(&"read_ts");
218 let write_ts_key = timeline_subspace.pack(&"write_ts");
219
220 Ok(Self {
221 timeline,
222 next,
223 read_ts_key,
224 write_ts_key,
225 db,
226 metrics,
227 read_only,
228 })
229 }
230
231 async fn max_ts(&self) -> Result<Option<PackableTimestamp>, anyhow::Error> {
232 let max_ts = self
233 .db
234 .transact_boxed(
235 &(),
236 |trx, ()| self.max_rs_trx(trx).boxed(),
237 TransactOption::idempotent(),
241 )
242 .await?;
243 Ok(max_ts)
244 }
245
246 async fn max_rs_trx(
247 &self,
248 trx: &Transaction,
249 ) -> Result<Option<PackableTimestamp>, FdbTransactError> {
250 let read_data = trx.get(&self.read_ts_key, false).await?;
251 let write_data = trx.get(&self.write_ts_key, false).await?;
252
253 let read_ts: Option<PackableTimestamp> =
254 read_data.map(|data| unpack(&data).expect("must unpack"));
255
256 let write_ts: Option<PackableTimestamp> =
257 write_data.map(|data| unpack(&data).expect("must unpack"));
258
259 let max_ts = std::cmp::max(read_ts, write_ts);
260
261 Ok::<_, FdbTransactError>(max_ts)
262 }
263}
264
265impl<N> FdbTimestampOracle<N>
266where
267 N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
268{
269 pub async fn open(
273 config: FdbTimestampOracleConfig,
274 timeline: String,
275 initially: Timestamp,
276 next: N,
277 read_only: bool,
278 ) -> Result<Self, anyhow::Error> {
279 info!(config = ?config, "opening FdbTimestampOracle");
280
281 let fdb_config = FdbConfig::parse(&config.url)?;
282
283 mz_foundationdb::init_network();
284
285 let db = Arc::new(Database::new(None)?);
286 #[cfg(test)]
289 mz_foundationdb::set_test_transaction_timeout(&db);
290 let metrics = Arc::clone(&config.metrics);
291 let prefix = fdb_config.prefix;
292 let directory = DirectoryLayer::default();
293
294 let oracle =
295 Self::open_inner(timeline, next, read_only, db, metrics, prefix, directory).await?;
296
297 oracle.initialize(initially).await?;
299
300 Ok(oracle)
301 }
302
303 async fn initialize(&self, initially: Timestamp) -> Result<(), FdbBindingError> {
307 let initially_packed = pack(&PackableTimestamp(initially));
310
311 self.db
312 .run(async |trx, _maybe_committed| {
313 let existing_read = trx.get(&self.read_ts_key, false).await?;
315 if existing_read.is_none() {
316 trx.set(&self.read_ts_key, &initially_packed);
317 }
318
319 let existing_write = trx.get(&self.write_ts_key, false).await?;
321 if existing_write.is_none() {
322 trx.set(&self.write_ts_key, &initially_packed);
323 }
324
325 Ok(())
326 })
327 .await?;
328
329 if !self.read_only {
331 self.apply_write(initially).await;
332 }
333
334 Ok(())
335 }
336
337 pub async fn get_all_timelines(
343 config: FdbTimestampOracleConfig,
344 ) -> Result<Vec<(String, Timestamp)>, anyhow::Error> {
345 let fdb_config = FdbConfig::parse(&config.url)?;
346
347 mz_foundationdb::init_network();
348
349 let db = Arc::new(Database::new(None)?);
350 #[cfg(test)]
353 mz_foundationdb::set_test_transaction_timeout(&db);
354 let metrics = Arc::clone(&config.metrics);
355 let prefix = fdb_config.prefix;
356 let directory = DirectoryLayer::default();
357
358 let timeline_names = match db
360 .run(async |trx, _maybe_committed| Ok(directory.list(&trx, &prefix).await))
361 .await?
362 {
363 Err(DirectoryError::PathDoesNotExists) => Vec::new(), Err(e) => return Err(anyhow!("directory error: {e:?}")),
365 Ok(timelines) => timelines,
366 };
367
368 let mut result = Vec::with_capacity(timeline_names.len());
369
370 for timeline_name in timeline_names {
372 let oracle = FdbTimestampOracle::<()>::open_inner(
373 timeline_name.clone(),
374 (),
375 true,
376 Arc::clone(&db),
377 Arc::clone(&metrics),
378 prefix.clone(),
379 directory.clone(),
380 )
381 .await?;
382
383 if let Some(ts) = oracle.max_ts().await? {
384 result.push((timeline_name, ts.0));
385 }
386 }
387
388 Ok(result)
389 }
390
391 async fn write_ts_trx(
392 &self,
393 trx: &Transaction,
394 proposed_next_ts: Timestamp,
395 ) -> Result<Timestamp, FdbTransactError> {
396 let current = trx.get(&self.write_ts_key, false).await?;
398 let current_ts: PackableTimestamp = match current {
399 Some(data) => unpack(&data)?,
400 None => {
401 return Err(FdbTransactError::ExternalError(anyhow!(
402 "timeline not initialized"
403 )));
404 }
405 };
406
407 let incremented = current_ts.0.step_forward();
409 let new_ts = std::cmp::max(incremented, proposed_next_ts);
410
411 let new_ts_packed = pack(&PackableTimestamp(new_ts));
413 trx.set(&self.write_ts_key, &new_ts_packed);
414
415 Ok(new_ts)
416 }
417
418 async fn peek_write_ts_trx(
419 &self,
420 trx: &Transaction,
421 ) -> Result<Option<PackableTimestamp>, FdbTransactError> {
422 let data = trx.get(&self.write_ts_key, false).await?;
423 Ok(data.map(|data| unpack(&data)).transpose()?)
424 }
425
426 async fn read_ts_trx(
427 &self,
428 trx: &Transaction,
429 ) -> Result<Option<PackableTimestamp>, FdbTransactError> {
430 let data = trx.get(&self.read_ts_key, false).await?;
431 Ok(data.map(|data| unpack(&data)).transpose()?)
432 }
433
434 async fn apply_write_trx(
435 &self,
436 trx: &Transaction,
437 write_ts: Timestamp,
438 ) -> Result<(), FdbTransactError> {
439 let current_read = trx.get(&self.read_ts_key, false).await?;
441 let current_read_ts: PackableTimestamp = match current_read {
442 Some(data) => unpack(&data)?,
443 None => {
444 return Err(FdbTransactError::ExternalError(anyhow!(
445 "timeline not initialized"
446 )));
447 }
448 };
449
450 if write_ts > current_read_ts.0 {
451 let new_ts_packed = pack(&PackableTimestamp(write_ts));
452 trx.set(&self.read_ts_key, &new_ts_packed);
453 }
454
455 let current_write = trx.get(&self.write_ts_key, false).await?;
457 let current_write_ts: PackableTimestamp = match current_write {
458 Some(data) => unpack(&data)?,
459 None => {
460 return Err(FdbTransactError::ExternalError(anyhow!(
461 "timeline not initialized"
462 )));
463 }
464 };
465
466 if write_ts > current_write_ts.0 {
467 let new_ts_packed = pack(&PackableTimestamp(write_ts));
468 trx.set(&self.write_ts_key, &new_ts_packed);
469 }
470
471 Ok::<_, FdbTransactError>(())
472 }
473}
474
475#[async_trait]
476impl<N> TimestampOracle<Timestamp> for FdbTimestampOracle<N>
477where
478 N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
479{
480 async fn write_ts(&self) -> WriteTimestamp<Timestamp> {
481 if self.read_only {
482 panic!("attempting write_ts in read-only mode");
483 }
484
485 let proposed_next_ts = self.next.now();
486
487 let write_ts: Timestamp = self
488 .metrics
489 .oracle
490 .write_ts
491 .run_op(|| async {
492 self.db
493 .transact_boxed(
494 &proposed_next_ts,
495 |trx, proposed_next_ts| self.write_ts_trx(trx, **proposed_next_ts).boxed(),
496 TransactOption::idempotent(),
502 )
503 .await
504 .map_err(anyhow::Error::from)
505 })
506 .await
507 .expect("write_ts transaction failed");
508
509 debug!(
510 timeline = ?self.timeline,
511 write_ts = ?write_ts,
512 proposed_next_ts = ?proposed_next_ts,
513 "returning from write_ts()"
514 );
515
516 let advance_to = write_ts.step_forward();
517
518 WriteTimestamp {
519 timestamp: write_ts,
520 advance_to,
521 }
522 }
523
524 async fn peek_write_ts(&self) -> Timestamp {
525 let write_ts = self
526 .metrics
527 .oracle
528 .peek_write_ts
529 .run_op(|| async {
530 self.db
531 .transact_boxed(
532 &(),
533 |trx, ()| self.peek_write_ts_trx(trx).boxed(),
534 TransactOption::idempotent(),
536 )
537 .await
538 .map_err(anyhow::Error::from)?
539 .ok_or_else(|| anyhow!("timeline not initialized"))
540 .map(|ts| ts.0)
541 })
542 .await
543 .expect("peek_write_ts transaction failed");
544
545 debug!(
546 timeline = ?self.timeline,
547 write_ts = ?write_ts,
548 "returning from peek_write_ts()"
549 );
550
551 write_ts
552 }
553
554 async fn read_ts(&self) -> Timestamp {
555 let read_ts = self
556 .metrics
557 .oracle
558 .read_ts
559 .run_op(|| async {
560 self.db
561 .transact_boxed(
562 &(),
563 |trx, ()| self.read_ts_trx(trx).boxed(),
564 TransactOption::idempotent(),
566 )
567 .await
568 .map_err(anyhow::Error::from)?
569 .ok_or_else(|| anyhow!("timeline not initialized"))
570 .map(|ts| ts.0)
571 })
572 .await
573 .expect("read_ts transaction failed");
574
575 debug!(
576 timeline = ?self.timeline,
577 read_ts = ?read_ts,
578 "returning from read_ts()"
579 );
580
581 read_ts
582 }
583
584 async fn apply_write(&self, write_ts: Timestamp) {
585 if self.read_only {
586 panic!("attempting apply_write in read-only mode");
587 }
588
589 self.metrics
590 .oracle
591 .apply_write
592 .run_op(|| async {
593 self.db
594 .transact_boxed(
595 &write_ts,
596 |trx, write_ts| self.apply_write_trx(trx, **write_ts).boxed(),
597 TransactOption::idempotent(),
603 )
604 .await
605 .map_err(anyhow::Error::from)
606 })
607 .await
608 .expect("apply_write transaction failed");
609
610 debug!(
611 timeline = ?self.timeline,
612 write_ts = ?write_ts,
613 "returning from apply_write()"
614 );
615 }
616}
617
618#[cfg(test)]
619mod tests {
620 use super::*;
621
622 use mz_ore::now::NowFn;
623
624 use crate::TimestampOracle;
625
626 #[mz_ore::test(tokio::test)]
627 #[cfg_attr(miri, ignore)] async fn test_fdb_timestamp_oracle() -> Result<(), anyhow::Error> {
629 let config = FdbTimestampOracleConfig::new_for_test();
630
631 crate::tests::timestamp_oracle_impl_test(|timeline, now_fn: NowFn, initial_ts| {
632 let config = config.clone();
633 async move {
634 let oracle = FdbTimestampOracle::open(config, timeline, initial_ts, now_fn, false)
635 .await
636 .expect("failed to open FdbTimestampOracle");
637
638 let arced_oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync> =
639 Arc::new(oracle);
640
641 arced_oracle
642 }
643 })
644 .await?;
645
646 mz_foundationdb::shutdown_network();
651
652 Ok(())
653 }
654}