1use std::io::Write;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use anyhow::anyhow;
24use async_trait::async_trait;
25use futures_util::future::FutureExt;
26use mz_foundationdb::FdbConfig;
27use mz_foundationdb::directory::{Directory, DirectoryError, DirectoryLayer, DirectoryOutput};
28use mz_foundationdb::tuple::{
29 PackError, PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset, pack, unpack,
30};
31use mz_foundationdb::{
32 Database, FdbBindingError, FdbError, TransactError, TransactOption, Transaction,
33};
34use mz_ore::metrics::MetricsRegistry;
35use mz_ore::url::SensitiveUrl;
36use mz_repr::Timestamp;
37use tracing::{debug, info};
38
39use crate::metrics::Metrics;
40use crate::{GenericNowFn, TimestampOracle, WriteTimestamp};
41
42pub struct FdbTimestampOracle<N> {
44 timeline: String,
45 next: N,
46 db: Arc<Database>,
47 metrics: Arc<Metrics>,
48 read_only: bool,
51 read_ts_key: Vec<u8>,
53 write_ts_key: Vec<u8>,
55}
56
57impl<N> std::fmt::Debug for FdbTimestampOracle<N>
58where
59 N: GenericNowFn<Timestamp> + std::fmt::Debug,
60{
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 f.debug_struct("FdbTimestampOracle")
63 .field("timeline", &self.timeline)
64 .field("next", &self.next)
65 .field("read_only", &self.read_only)
66 .field("read_ts_key", &self.read_ts_key)
67 .field("write_ts_key", &self.write_ts_key)
68 .field("metrics", &self.metrics)
69 .finish_non_exhaustive()
70 }
71}
72
73#[derive(Clone, Debug)]
76pub struct FdbTimestampOracleConfig {
77 url: SensitiveUrl,
78 metrics: Arc<Metrics>,
79}
80
81impl FdbTimestampOracleConfig {
82 pub fn new(url: SensitiveUrl, metrics_registry: &MetricsRegistry) -> Self {
84 let metrics = Arc::new(Metrics::new(metrics_registry));
85 Self { url, metrics }
86 }
87
88 pub fn new_for_test() -> Self {
95 Self {
96 url: FromStr::from_str("foundationdb:?prefix=test/tsoracle").unwrap(),
97 metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
98 }
99 }
100
101 pub(crate) fn metrics(&self) -> &Arc<Metrics> {
103 &self.metrics
104 }
105}
106
107enum FdbTransactError {
110 FdbError(FdbError),
111 ExternalError(anyhow::Error),
112}
113
114impl std::fmt::Debug for FdbTransactError {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 match self {
117 FdbTransactError::FdbError(e) => write!(f, "FdbError({})", e),
118 FdbTransactError::ExternalError(e) => write!(f, "ExternalError({:?})", e),
119 }
120 }
121}
122
123impl From<FdbError> for FdbTransactError {
124 fn from(value: FdbError) -> Self {
125 Self::FdbError(value)
126 }
127}
128
129impl From<anyhow::Error> for FdbTransactError {
130 fn from(value: anyhow::Error) -> Self {
131 Self::ExternalError(value)
132 }
133}
134
135impl From<PackError> for FdbTransactError {
136 fn from(value: PackError) -> Self {
137 Self::ExternalError(anyhow::Error::new(value))
138 }
139}
140
141impl From<FdbBindingError> for FdbTransactError {
142 fn from(value: FdbBindingError) -> Self {
143 Self::ExternalError(anyhow::Error::new(value))
144 }
145}
146
147impl From<FdbTransactError> for anyhow::Error {
148 fn from(value: FdbTransactError) -> Self {
149 match value {
150 FdbTransactError::FdbError(e) => anyhow::Error::new(e),
151 FdbTransactError::ExternalError(e) => e,
152 }
153 }
154}
155
156impl TransactError for FdbTransactError {
157 fn try_into_fdb_error(self) -> Result<FdbError, Self> {
158 match self {
159 Self::FdbError(e) => Ok(e),
160 other => Err(other),
161 }
162 }
163}
164
165#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
167struct PackableTimestamp(Timestamp);
168
169impl TuplePack for PackableTimestamp {
170 fn pack<W: Write>(
171 &self,
172 w: &mut W,
173 tuple_depth: TupleDepth,
174 ) -> std::io::Result<VersionstampOffset> {
175 u64::from(self.0).pack(w, tuple_depth)
176 }
177}
178
179impl<'de> TupleUnpack<'de> for PackableTimestamp {
180 fn unpack(input: &'de [u8], tuple_depth: TupleDepth) -> PackResult<(&'de [u8], Self)> {
181 u64::unpack(input, tuple_depth).map(|(rem, v)| (rem, PackableTimestamp(Timestamp::from(v))))
182 }
183}
184
185impl<N: Sync> FdbTimestampOracle<N> {
186 async fn open_inner(
187 timeline: String,
188 next: N,
189 read_only: bool,
190 db: Arc<Database>,
191 metrics: Arc<Metrics>,
192 prefix: Vec<String>,
193 directory: DirectoryLayer,
194 ) -> Result<FdbTimestampOracle<N>, anyhow::Error> {
195 let timeline_path: Vec<_> = prefix
197 .into_iter()
198 .chain(std::iter::once(timeline.clone()))
199 .collect();
200
201 let timeline_subspace = db
202 .run(async |trx, _maybe_committed| {
203 Ok(directory
204 .create_or_open(&trx, &timeline_path, None, None)
205 .await)
206 })
207 .await?
208 .map_err(|e| anyhow!("directory error: {e:?}"))?;
209
210 let timeline_subspace = match timeline_subspace {
211 DirectoryOutput::DirectorySubspace(subspace) => subspace,
212 DirectoryOutput::DirectoryPartition(_partition) => {
213 return Err(anyhow!("timestamp oracle timeline cannot be a partition"));
214 }
215 };
216
217 let read_ts_key = timeline_subspace.pack(&"read_ts");
218 let write_ts_key = timeline_subspace.pack(&"write_ts");
219
220 Ok(Self {
221 timeline,
222 next,
223 read_ts_key,
224 write_ts_key,
225 db,
226 metrics,
227 read_only,
228 })
229 }
230
231 async fn max_ts(&self) -> Result<Option<PackableTimestamp>, anyhow::Error> {
232 let max_ts = self
233 .db
234 .transact_boxed(
235 &(),
236 |trx, ()| self.max_rs_trx(trx).boxed(),
237 TransactOption::default(),
238 )
239 .await?;
240 Ok(max_ts)
241 }
242
243 async fn max_rs_trx(
244 &self,
245 trx: &Transaction,
246 ) -> Result<Option<PackableTimestamp>, FdbTransactError> {
247 let read_data = trx.get(&self.read_ts_key, false).await?;
248 let write_data = trx.get(&self.write_ts_key, false).await?;
249
250 let read_ts: Option<PackableTimestamp> =
251 read_data.map(|data| unpack(&data).expect("must unpack"));
252
253 let write_ts: Option<PackableTimestamp> =
254 write_data.map(|data| unpack(&data).expect("must unpack"));
255
256 let max_ts = std::cmp::max(read_ts, write_ts);
257
258 Ok::<_, FdbTransactError>(max_ts)
259 }
260}
261
262impl<N> FdbTimestampOracle<N>
263where
264 N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
265{
266 pub async fn open(
270 config: FdbTimestampOracleConfig,
271 timeline: String,
272 initially: Timestamp,
273 next: N,
274 read_only: bool,
275 ) -> Result<Self, anyhow::Error> {
276 info!(config = ?config, "opening FdbTimestampOracle");
277
278 let fdb_config = FdbConfig::parse(&config.url)?;
279
280 mz_foundationdb::init_network();
281
282 let db = Arc::new(Database::new(None)?);
283 let metrics = Arc::clone(&config.metrics);
284 let prefix = fdb_config.prefix;
285 let directory = DirectoryLayer::default();
286
287 let oracle =
288 Self::open_inner(timeline, next, read_only, db, metrics, prefix, directory).await?;
289
290 oracle.initialize(initially).await?;
292
293 Ok(oracle)
294 }
295
296 async fn initialize(&self, initially: Timestamp) -> Result<(), FdbBindingError> {
300 let initially_packed = pack(&PackableTimestamp(initially));
303
304 self.db
305 .run(async |trx, _maybe_committed| {
306 let existing_read = trx.get(&self.read_ts_key, false).await?;
308 if existing_read.is_none() {
309 trx.set(&self.read_ts_key, &initially_packed);
310 }
311
312 let existing_write = trx.get(&self.write_ts_key, false).await?;
314 if existing_write.is_none() {
315 trx.set(&self.write_ts_key, &initially_packed);
316 }
317
318 Ok(())
319 })
320 .await?;
321
322 if !self.read_only {
324 self.apply_write(initially).await;
325 }
326
327 Ok(())
328 }
329
330 pub async fn get_all_timelines(
336 config: FdbTimestampOracleConfig,
337 ) -> Result<Vec<(String, Timestamp)>, anyhow::Error> {
338 let fdb_config = FdbConfig::parse(&config.url)?;
339
340 mz_foundationdb::init_network();
341
342 let db = Arc::new(Database::new(None)?);
343 let metrics = Arc::clone(&config.metrics);
344 let prefix = fdb_config.prefix;
345 let directory = DirectoryLayer::default();
346
347 let timeline_names = match db
349 .run(async |trx, _maybe_committed| Ok(directory.list(&trx, &prefix).await))
350 .await?
351 {
352 Err(DirectoryError::PathDoesNotExists) => Vec::new(), Err(e) => return Err(anyhow!("directory error: {e:?}")),
354 Ok(timelines) => timelines,
355 };
356
357 let mut result = Vec::with_capacity(timeline_names.len());
358
359 for timeline_name in timeline_names {
361 let oracle = FdbTimestampOracle::<()>::open_inner(
362 timeline_name.clone(),
363 (),
364 true,
365 Arc::clone(&db),
366 Arc::clone(&metrics),
367 prefix.clone(),
368 directory.clone(),
369 )
370 .await?;
371
372 if let Some(ts) = oracle.max_ts().await? {
373 result.push((timeline_name, ts.0));
374 }
375 }
376
377 Ok(result)
378 }
379
380 async fn write_ts_trx(
381 &self,
382 trx: &Transaction,
383 proposed_next_ts: Timestamp,
384 ) -> Result<Timestamp, FdbTransactError> {
385 let current = trx.get(&self.write_ts_key, false).await?;
387 let current_ts: PackableTimestamp = match current {
388 Some(data) => unpack(&data)?,
389 None => {
390 return Err(FdbTransactError::ExternalError(anyhow!(
391 "timeline not initialized"
392 )));
393 }
394 };
395
396 let incremented = current_ts.0.step_forward();
398 let new_ts = std::cmp::max(incremented, proposed_next_ts);
399
400 let new_ts_packed = pack(&PackableTimestamp(new_ts));
402 trx.set(&self.write_ts_key, &new_ts_packed);
403
404 Ok(new_ts)
405 }
406
407 async fn peek_write_ts_trx(
408 &self,
409 trx: &Transaction,
410 ) -> Result<Option<PackableTimestamp>, FdbTransactError> {
411 let data = trx.get(&self.write_ts_key, false).await?;
412 Ok(data.map(|data| unpack(&data)).transpose()?)
413 }
414
415 async fn read_ts_trx(
416 &self,
417 trx: &Transaction,
418 ) -> Result<Option<PackableTimestamp>, FdbTransactError> {
419 let data = trx.get(&self.read_ts_key, false).await?;
420 Ok(data.map(|data| unpack(&data)).transpose()?)
421 }
422
423 async fn apply_write_trx(
424 &self,
425 trx: &Transaction,
426 write_ts: Timestamp,
427 ) -> Result<(), FdbTransactError> {
428 let current_read = trx.get(&self.read_ts_key, false).await?;
430 let current_read_ts: PackableTimestamp = match current_read {
431 Some(data) => unpack(&data)?,
432 None => {
433 return Err(FdbTransactError::ExternalError(anyhow!(
434 "timeline not initialized"
435 )));
436 }
437 };
438
439 if write_ts > current_read_ts.0 {
440 let new_ts_packed = pack(&PackableTimestamp(write_ts));
441 trx.set(&self.read_ts_key, &new_ts_packed);
442 }
443
444 let current_write = trx.get(&self.write_ts_key, false).await?;
446 let current_write_ts: PackableTimestamp = match current_write {
447 Some(data) => unpack(&data)?,
448 None => {
449 return Err(FdbTransactError::ExternalError(anyhow!(
450 "timeline not initialized"
451 )));
452 }
453 };
454
455 if write_ts > current_write_ts.0 {
456 let new_ts_packed = pack(&PackableTimestamp(write_ts));
457 trx.set(&self.write_ts_key, &new_ts_packed);
458 }
459
460 Ok::<_, FdbTransactError>(())
461 }
462}
463
464#[async_trait]
465impl<N> TimestampOracle<Timestamp> for FdbTimestampOracle<N>
466where
467 N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
468{
469 async fn write_ts(&self) -> WriteTimestamp<Timestamp> {
470 if self.read_only {
471 panic!("attempting write_ts in read-only mode");
472 }
473
474 let proposed_next_ts = self.next.now();
475
476 let write_ts: Timestamp = self
477 .metrics
478 .oracle
479 .write_ts
480 .run_op(|| async {
481 self.db
482 .transact_boxed(
483 &proposed_next_ts,
484 |trx, proposed_next_ts| self.write_ts_trx(trx, **proposed_next_ts).boxed(),
485 TransactOption::default(),
486 )
487 .await
488 .map_err(anyhow::Error::from)
489 })
490 .await
491 .expect("write_ts transaction failed");
492
493 debug!(
494 timeline = ?self.timeline,
495 write_ts = ?write_ts,
496 proposed_next_ts = ?proposed_next_ts,
497 "returning from write_ts()"
498 );
499
500 let advance_to = write_ts.step_forward();
501
502 WriteTimestamp {
503 timestamp: write_ts,
504 advance_to,
505 }
506 }
507
508 async fn peek_write_ts(&self) -> Timestamp {
509 let write_ts = self
510 .metrics
511 .oracle
512 .peek_write_ts
513 .run_op(|| async {
514 self.db
515 .transact_boxed(
516 &(),
517 |trx, ()| self.peek_write_ts_trx(trx).boxed(),
518 TransactOption::default(),
519 )
520 .await
521 .map_err(anyhow::Error::from)?
522 .ok_or_else(|| anyhow!("timeline not initialized"))
523 .map(|ts| ts.0)
524 })
525 .await
526 .expect("peek_write_ts transaction failed");
527
528 debug!(
529 timeline = ?self.timeline,
530 write_ts = ?write_ts,
531 "returning from peek_write_ts()"
532 );
533
534 write_ts
535 }
536
537 async fn read_ts(&self) -> Timestamp {
538 let read_ts = self
539 .metrics
540 .oracle
541 .read_ts
542 .run_op(|| async {
543 self.db
544 .transact_boxed(
545 &(),
546 |trx, ()| self.read_ts_trx(trx).boxed(),
547 TransactOption::default(),
548 )
549 .await
550 .map_err(anyhow::Error::from)?
551 .ok_or_else(|| anyhow!("timeline not initialized"))
552 .map(|ts| ts.0)
553 })
554 .await
555 .expect("read_ts transaction failed");
556
557 debug!(
558 timeline = ?self.timeline,
559 read_ts = ?read_ts,
560 "returning from read_ts()"
561 );
562
563 read_ts
564 }
565
566 async fn apply_write(&self, write_ts: Timestamp) {
567 if self.read_only {
568 panic!("attempting apply_write in read-only mode");
569 }
570
571 self.metrics
572 .oracle
573 .apply_write
574 .run_op(|| async {
575 self.db
576 .transact_boxed(
577 &write_ts,
578 |trx, write_ts| self.apply_write_trx(trx, **write_ts).boxed(),
579 TransactOption::default(),
580 )
581 .await
582 .map_err(anyhow::Error::from)
583 })
584 .await
585 .expect("apply_write transaction failed");
586
587 debug!(
588 timeline = ?self.timeline,
589 write_ts = ?write_ts,
590 "returning from apply_write()"
591 );
592 }
593}
594
595#[cfg(test)]
596mod tests {
597 use super::*;
598
599 use mz_ore::now::NowFn;
600
601 use crate::TimestampOracle;
602
603 #[mz_ore::test(tokio::test)]
604 #[cfg_attr(miri, ignore)] #[ignore] async fn test_fdb_timestamp_oracle() -> Result<(), anyhow::Error> {
607 let config = FdbTimestampOracleConfig::new_for_test();
608
609 crate::tests::timestamp_oracle_impl_test(|timeline, now_fn: NowFn, initial_ts| {
610 let config = config.clone();
611 async move {
612 let oracle = FdbTimestampOracle::open(config, timeline, initial_ts, now_fn, false)
613 .await
614 .expect("failed to open FdbTimestampOracle");
615
616 let arced_oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync> =
617 Arc::new(oracle);
618
619 arced_oracle
620 }
621 })
622 .await?;
623
624 mz_foundationdb::shutdown_network();
625
626 Ok(())
627 }
628}