1use std::io::Write;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use anyhow::anyhow;
24use async_trait::async_trait;
25use futures_util::future::FutureExt;
26use mz_foundationdb::FdbConfig;
27use mz_foundationdb::directory::{Directory, DirectoryError, DirectoryLayer, DirectoryOutput};
28use mz_foundationdb::tuple::{
29 PackError, PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset, pack, unpack,
30};
31use mz_foundationdb::{
32 Database, FdbBindingError, FdbError, TransactError, TransactOption, Transaction,
33};
34use mz_ore::metrics::MetricsRegistry;
35use mz_ore::url::SensitiveUrl;
36use mz_repr::Timestamp;
37use tracing::{debug, info};
38
39use crate::metrics::Metrics;
40use crate::{GenericNowFn, TimestampOracle, WriteTimestamp};
41
42pub struct FdbTimestampOracle<N> {
44 timeline: String,
45 next: N,
46 db: Arc<Database>,
47 read_only: bool,
50 read_ts_key: Vec<u8>,
52 write_ts_key: Vec<u8>,
54}
55
56impl<N> std::fmt::Debug for FdbTimestampOracle<N>
57where
58 N: GenericNowFn<Timestamp> + std::fmt::Debug,
59{
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 f.debug_struct("FdbTimestampOracle")
62 .field("timeline", &self.timeline)
63 .field("next", &self.next)
64 .field("read_only", &self.read_only)
65 .field("read_ts_key", &self.read_ts_key)
66 .field("write_ts_key", &self.write_ts_key)
67 .finish_non_exhaustive()
68 }
69}
70
71#[derive(Clone, Debug)]
74pub struct FdbTimestampOracleConfig {
75 url: SensitiveUrl,
76 metrics: Arc<Metrics>,
77}
78
79impl FdbTimestampOracleConfig {
80 pub fn new(url: SensitiveUrl, metrics_registry: &MetricsRegistry) -> Self {
82 let metrics = Arc::new(Metrics::new(metrics_registry));
83 Self { url, metrics }
84 }
85
86 pub fn new_for_test() -> Self {
93 Self {
94 url: FromStr::from_str("foundationdb:?prefix=test/tsoracle").unwrap(),
95 metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
96 }
97 }
98
99 pub(crate) fn metrics(&self) -> &Arc<Metrics> {
101 &self.metrics
102 }
103}
104
105enum FdbTransactError {
108 FdbError(FdbError),
109 ExternalError(anyhow::Error),
110}
111
112impl std::fmt::Debug for FdbTransactError {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 match self {
115 FdbTransactError::FdbError(e) => write!(f, "FdbError({})", e),
116 FdbTransactError::ExternalError(e) => write!(f, "ExternalError({:?})", e),
117 }
118 }
119}
120
121impl From<FdbError> for FdbTransactError {
122 fn from(value: FdbError) -> Self {
123 Self::FdbError(value)
124 }
125}
126
127impl From<anyhow::Error> for FdbTransactError {
128 fn from(value: anyhow::Error) -> Self {
129 Self::ExternalError(value)
130 }
131}
132
133impl From<PackError> for FdbTransactError {
134 fn from(value: PackError) -> Self {
135 Self::ExternalError(anyhow::Error::new(value))
136 }
137}
138
139impl From<FdbBindingError> for FdbTransactError {
140 fn from(value: FdbBindingError) -> Self {
141 Self::ExternalError(anyhow::Error::new(value))
142 }
143}
144
145impl From<FdbTransactError> for anyhow::Error {
146 fn from(value: FdbTransactError) -> Self {
147 match value {
148 FdbTransactError::FdbError(e) => anyhow::Error::new(e),
149 FdbTransactError::ExternalError(e) => e,
150 }
151 }
152}
153
154impl TransactError for FdbTransactError {
155 fn try_into_fdb_error(self) -> Result<FdbError, Self> {
156 match self {
157 Self::FdbError(e) => Ok(e),
158 other => Err(other),
159 }
160 }
161}
162
163#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
165struct PackableTimestamp(Timestamp);
166
167impl TuplePack for PackableTimestamp {
168 fn pack<W: Write>(
169 &self,
170 w: &mut W,
171 tuple_depth: TupleDepth,
172 ) -> std::io::Result<VersionstampOffset> {
173 u64::from(self.0).pack(w, tuple_depth)
174 }
175}
176
177impl<'de> TupleUnpack<'de> for PackableTimestamp {
178 fn unpack(input: &'de [u8], tuple_depth: TupleDepth) -> PackResult<(&'de [u8], Self)> {
179 u64::unpack(input, tuple_depth).map(|(rem, v)| (rem, PackableTimestamp(Timestamp::from(v))))
180 }
181}
182
183impl<N: Sync> FdbTimestampOracle<N> {
184 async fn open_inner(
185 timeline: String,
186 next: N,
187 read_only: bool,
188 db: Arc<Database>,
189 prefix: Vec<String>,
190 directory: DirectoryLayer,
191 ) -> Result<FdbTimestampOracle<N>, anyhow::Error> {
192 let timeline_path: Vec<_> = prefix
194 .into_iter()
195 .chain(std::iter::once(timeline.clone()))
196 .collect();
197
198 let timeline_subspace = db
199 .run(async |trx, _maybe_committed| {
200 Ok(directory
201 .create_or_open(&trx, &timeline_path, None, None)
202 .await)
203 })
204 .await?
205 .map_err(|e| anyhow!("directory error: {e:?}"))?;
206
207 let timeline_subspace = match timeline_subspace {
208 DirectoryOutput::DirectorySubspace(subspace) => subspace,
209 DirectoryOutput::DirectoryPartition(_partition) => {
210 return Err(anyhow!("timestamp oracle timeline cannot be a partition"));
211 }
212 };
213
214 let read_ts_key = timeline_subspace.pack(&"read_ts");
215 let write_ts_key = timeline_subspace.pack(&"write_ts");
216
217 Ok(Self {
218 timeline,
219 next,
220 read_ts_key,
221 write_ts_key,
222 db,
223 read_only,
224 })
225 }
226
227 async fn max_ts(&self) -> Result<Option<PackableTimestamp>, anyhow::Error> {
228 let max_ts = self
229 .db
230 .transact_boxed(
231 &(),
232 |trx, ()| self.max_rs_trx(trx).boxed(),
233 TransactOption::default(),
234 )
235 .await?;
236 Ok(max_ts)
237 }
238
239 async fn max_rs_trx(
240 &self,
241 trx: &Transaction,
242 ) -> Result<Option<PackableTimestamp>, FdbTransactError> {
243 let read_data = trx.get(&self.read_ts_key, false).await?;
244 let write_data = trx.get(&self.write_ts_key, false).await?;
245
246 let read_ts: Option<PackableTimestamp> =
247 read_data.map(|data| unpack(&data).expect("must unpack"));
248
249 let write_ts: Option<PackableTimestamp> =
250 write_data.map(|data| unpack(&data).expect("must unpack"));
251
252 let max_ts = std::cmp::max(read_ts, write_ts);
253
254 Ok::<_, FdbTransactError>(max_ts)
255 }
256}
257
258impl<N> FdbTimestampOracle<N>
259where
260 N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
261{
262 pub async fn open(
266 config: FdbTimestampOracleConfig,
267 timeline: String,
268 initially: Timestamp,
269 next: N,
270 read_only: bool,
271 ) -> Result<Self, anyhow::Error> {
272 info!(config = ?config, "opening FdbTimestampOracle");
273
274 let fdb_config = FdbConfig::parse(&config.url)?;
275
276 mz_foundationdb::init_network();
277
278 let db = Arc::new(Database::new(None)?);
279 let prefix = fdb_config.prefix;
280 let directory = DirectoryLayer::default();
281
282 let oracle = Self::open_inner(timeline, next, read_only, db, prefix, directory).await?;
283
284 oracle.initialize(initially).await?;
286
287 Ok(oracle)
288 }
289
290 async fn initialize(&self, initially: Timestamp) -> Result<(), FdbBindingError> {
294 let initially_packed = pack(&PackableTimestamp(initially));
297
298 self.db
299 .run(async |trx, _maybe_committed| {
300 let existing_read = trx.get(&self.read_ts_key, false).await?;
302 if existing_read.is_none() {
303 trx.set(&self.read_ts_key, &initially_packed);
304 }
305
306 let existing_write = trx.get(&self.write_ts_key, false).await?;
308 if existing_write.is_none() {
309 trx.set(&self.write_ts_key, &initially_packed);
310 }
311
312 Ok(())
313 })
314 .await?;
315
316 if !self.read_only {
318 self.apply_write(initially).await;
319 }
320
321 Ok(())
322 }
323
324 pub async fn get_all_timelines(
330 config: FdbTimestampOracleConfig,
331 ) -> Result<Vec<(String, Timestamp)>, anyhow::Error> {
332 let fdb_config = FdbConfig::parse(&config.url)?;
333
334 mz_foundationdb::init_network();
335
336 let db = Arc::new(Database::new(None)?);
337 let prefix = fdb_config.prefix;
338 let directory = DirectoryLayer::default();
339
340 let timeline_names = match db
342 .run(async |trx, _maybe_committed| Ok(directory.list(&trx, &prefix).await))
343 .await?
344 {
345 Err(DirectoryError::PathDoesNotExists) => Vec::new(), Err(e) => return Err(anyhow!("directory error: {e:?}")),
347 Ok(timelines) => timelines,
348 };
349
350 let mut result = Vec::with_capacity(timeline_names.len());
351
352 for timeline_name in timeline_names {
354 let oracle = FdbTimestampOracle::<()>::open_inner(
355 timeline_name.clone(),
356 (),
357 true,
358 Arc::clone(&db),
359 prefix.clone(),
360 directory.clone(),
361 )
362 .await?;
363
364 if let Some(ts) = oracle.max_ts().await? {
365 result.push((timeline_name, ts.0));
366 }
367 }
368
369 Ok(result)
370 }
371
372 async fn write_ts_trx(
373 &self,
374 trx: &Transaction,
375 proposed_next_ts: Timestamp,
376 ) -> Result<Timestamp, FdbTransactError> {
377 let current = trx.get(&self.write_ts_key, false).await?;
379 let current_ts: PackableTimestamp = match current {
380 Some(data) => unpack(&data)?,
381 None => {
382 return Err(FdbTransactError::ExternalError(anyhow!(
383 "timeline not initialized"
384 )));
385 }
386 };
387
388 let incremented = current_ts.0.step_forward();
390 let new_ts = std::cmp::max(incremented, proposed_next_ts);
391
392 let new_ts_packed = pack(&PackableTimestamp(new_ts));
394 trx.set(&self.write_ts_key, &new_ts_packed);
395
396 Ok(new_ts)
397 }
398
399 async fn peek_write_ts_trx(
400 &self,
401 trx: &Transaction,
402 ) -> Result<Option<PackableTimestamp>, FdbTransactError> {
403 let data = trx.get(&self.write_ts_key, false).await?;
404 Ok(data.map(|data| unpack(&data)).transpose()?)
405 }
406
407 async fn read_ts_trx(
408 &self,
409 trx: &Transaction,
410 ) -> Result<Option<PackableTimestamp>, FdbTransactError> {
411 let data = trx.get(&self.read_ts_key, false).await?;
412 Ok(data.map(|data| unpack(&data)).transpose()?)
413 }
414
415 async fn apply_write_trx(
416 &self,
417 trx: &Transaction,
418 write_ts: Timestamp,
419 ) -> Result<(), FdbTransactError> {
420 let current_read = trx.get(&self.read_ts_key, false).await?;
422 let current_read_ts: PackableTimestamp = match current_read {
423 Some(data) => unpack(&data)?,
424 None => {
425 return Err(FdbTransactError::ExternalError(anyhow!(
426 "timeline not initialized"
427 )));
428 }
429 };
430
431 if write_ts > current_read_ts.0 {
432 let new_ts_packed = pack(&PackableTimestamp(write_ts));
433 trx.set(&self.read_ts_key, &new_ts_packed);
434 }
435
436 let current_write = trx.get(&self.write_ts_key, false).await?;
438 let current_write_ts: PackableTimestamp = match current_write {
439 Some(data) => unpack(&data)?,
440 None => {
441 return Err(FdbTransactError::ExternalError(anyhow!(
442 "timeline not initialized"
443 )));
444 }
445 };
446
447 if write_ts > current_write_ts.0 {
448 let new_ts_packed = pack(&PackableTimestamp(write_ts));
449 trx.set(&self.write_ts_key, &new_ts_packed);
450 }
451
452 Ok::<_, FdbTransactError>(())
453 }
454}
455
456#[async_trait]
457impl<N> TimestampOracle<Timestamp> for FdbTimestampOracle<N>
458where
459 N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
460{
461 async fn write_ts(&self) -> WriteTimestamp<Timestamp> {
462 if self.read_only {
463 panic!("attempting write_ts in read-only mode");
464 }
465
466 let proposed_next_ts = self.next.now();
467
468 let write_ts: Timestamp = self
469 .db
470 .transact_boxed(
471 &proposed_next_ts,
472 |trx, proposed_next_ts| self.write_ts_trx(trx, **proposed_next_ts).boxed(),
473 TransactOption::default(),
474 )
475 .await
476 .expect("write_ts transaction failed");
477
478 debug!(
479 timeline = ?self.timeline,
480 write_ts = ?write_ts,
481 proposed_next_ts = ?proposed_next_ts,
482 "returning from write_ts()"
483 );
484
485 let advance_to = write_ts.step_forward();
486
487 WriteTimestamp {
488 timestamp: write_ts,
489 advance_to,
490 }
491 }
492
493 async fn peek_write_ts(&self) -> Timestamp {
494 let write_ts = self
495 .db
496 .transact_boxed(
497 &(),
498 |trx, ()| self.peek_write_ts_trx(trx).boxed(),
499 TransactOption::default(),
500 )
501 .await
502 .expect("peek_write_ts transaction failed")
503 .expect("timeline not initialized")
504 .0;
505
506 debug!(
507 timeline = ?self.timeline,
508 write_ts = ?write_ts,
509 "returning from peek_write_ts()"
510 );
511
512 write_ts
513 }
514
515 async fn read_ts(&self) -> Timestamp {
516 let read_ts = self
517 .db
518 .transact_boxed(
519 &(),
520 |trx, ()| self.read_ts_trx(trx).boxed(),
521 TransactOption::default(),
522 )
523 .await
524 .expect("read_ts transaction failed")
525 .expect("timeline not initialized")
526 .0;
527
528 debug!(
529 timeline = ?self.timeline,
530 read_ts = ?read_ts,
531 "returning from read_ts()"
532 );
533
534 read_ts
535 }
536
537 async fn apply_write(&self, write_ts: Timestamp) {
538 if self.read_only {
539 panic!("attempting apply_write in read-only mode");
540 }
541
542 self.db
543 .transact_boxed(
544 &write_ts,
545 |trx, write_ts| self.apply_write_trx(trx, **write_ts).boxed(),
546 TransactOption::default(),
547 )
548 .await
549 .expect("apply_write transaction failed");
550
551 debug!(
552 timeline = ?self.timeline,
553 write_ts = ?write_ts,
554 "returning from apply_write()"
555 );
556 }
557}
558
559#[cfg(test)]
560mod tests {
561 use super::*;
562
563 use mz_ore::now::NowFn;
564
565 use crate::TimestampOracle;
566
567 #[mz_ore::test(tokio::test)]
568 #[cfg_attr(miri, ignore)] async fn test_fdb_timestamp_oracle() -> Result<(), anyhow::Error> {
570 let config = FdbTimestampOracleConfig::new_for_test();
571
572 crate::tests::timestamp_oracle_impl_test(|timeline, now_fn: NowFn, initial_ts| {
573 let config = config.clone();
574 async move {
575 let oracle = FdbTimestampOracle::open(config, timeline, initial_ts, now_fn, false)
576 .await
577 .expect("failed to open FdbTimestampOracle");
578
579 let arced_oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync> =
580 Arc::new(oracle);
581
582 arced_oracle
583 }
584 })
585 .await?;
586
587 mz_foundationdb::shutdown_network();
588
589 Ok(())
590 }
591}