1use std::io::Write;
22
23use anyhow::anyhow;
24use async_stream::try_stream;
25use async_trait::async_trait;
26use bytes::Bytes;
27use futures_util::future::FutureExt;
28use mz_foundationdb::FdbConfig;
29use mz_foundationdb::directory::{
30 Directory, DirectoryError, DirectoryLayer, DirectoryOutput, DirectorySubspace,
31};
32use mz_foundationdb::tuple::{
33 PackError, PackResult, Subspace, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset,
34};
35use mz_foundationdb::{
36 Database, FdbBindingError, FdbError, KeySelector, RangeOption, TransactError, TransactOption,
37 Transaction,
38};
39use mz_ore::url::SensitiveUrl;
40
41use crate::error::Error;
42use crate::location::{
43 CaSResult, Consensus, Determinate, ExternalError, Indeterminate, ResultStream, SeqNo,
44 VersionedData,
45};
46
47impl From<FdbError> for ExternalError {
48 fn from(x: FdbError) -> Self {
49 if x.is_retryable() {
50 ExternalError::Indeterminate(Indeterminate::new(x.into()))
51 } else {
52 ExternalError::Determinate(Determinate::new(x.into()))
53 }
54 }
55}
56
57impl From<FdbBindingError> for ExternalError {
58 fn from(x: FdbBindingError) -> Self {
59 ExternalError::Determinate(Determinate::new(x.into()))
60 }
61}
62
63#[derive(Clone, Debug)]
65pub struct FdbConsensusConfig {
66 url: SensitiveUrl,
67}
68
69impl FdbConsensusConfig {
70 pub fn new(url: SensitiveUrl) -> Result<Self, Error> {
72 Ok(FdbConsensusConfig { url })
73 }
74}
75
76pub struct FdbConsensus {
78 keys: DirectorySubspace,
80 data: DirectorySubspace,
82 db: Database,
84}
85
86enum FdbTransactError {
89 FdbError(FdbError),
90 ExternalError(ExternalError),
91}
92
93impl From<FdbError> for FdbTransactError {
94 fn from(value: FdbError) -> Self {
95 Self::FdbError(value)
96 }
97}
98
99impl From<ExternalError> for FdbTransactError {
100 fn from(value: ExternalError) -> Self {
101 Self::ExternalError(value)
102 }
103}
104
105impl From<PackError> for FdbTransactError {
106 fn from(value: PackError) -> Self {
107 ExternalError::Determinate(anyhow::Error::new(value).into()).into()
108 }
109}
110
111impl From<FdbTransactError> for ExternalError {
112 fn from(value: FdbTransactError) -> Self {
113 match value {
114 FdbTransactError::FdbError(e) => e.into(),
115 FdbTransactError::ExternalError(e) => e,
116 }
117 }
118}
119
120impl From<DirectoryError> for ExternalError {
121 fn from(e: DirectoryError) -> Self {
122 ExternalError::Determinate(anyhow!("directory error: {e:?}").into())
123 }
124}
125
126impl TransactError for FdbTransactError {
127 fn try_into_fdb_error(self) -> Result<FdbError, Self> {
128 match self {
129 Self::FdbError(e) => Ok(e),
130 other => Err(other),
131 }
132 }
133}
134
135impl TuplePack for SeqNo {
136 fn pack<W: Write>(
137 &self,
138 w: &mut W,
139 tuple_depth: TupleDepth,
140 ) -> std::io::Result<VersionstampOffset> {
141 self.0.pack(w, tuple_depth)
142 }
143}
144
145impl<'de> TupleUnpack<'de> for SeqNo {
146 fn unpack(input: &'de [u8], tuple_depth: TupleDepth) -> PackResult<(&'de [u8], Self)> {
147 u64::unpack(input, tuple_depth).map(|(rem, v)| (rem, SeqNo(v)))
148 }
149}
150
151impl std::fmt::Debug for FdbConsensus {
152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 f.debug_struct("FdbConsensus")
154 .field("keys", &self.keys)
155 .field("data", &self.data)
156 .finish_non_exhaustive()
157 }
158}
159
160impl FdbConsensus {
161 pub async fn open(config: FdbConsensusConfig) -> Result<Self, ExternalError> {
163 let fdb_config =
164 FdbConfig::parse(&config.url).map_err(|e| ExternalError::Determinate(e.into()))?;
165
166 mz_foundationdb::init_network();
167
168 let db = Database::new(None)?;
169 let directory = DirectoryLayer::default();
170 let keys_path: Vec<_> = fdb_config
171 .prefix
172 .iter()
173 .cloned()
174 .chain(std::iter::once("keys".to_owned()))
175 .collect();
176 let keys = Self::open_directory(&db, &directory, &keys_path).await?;
177 let data_path: Vec<_> = fdb_config
178 .prefix
179 .into_iter()
180 .chain(std::iter::once("data".to_owned()))
181 .collect();
182 let data = Self::open_directory(&db, &directory, &data_path).await?;
183 Ok(FdbConsensus { keys, data, db })
184 }
185
186 async fn open_directory(
189 db: &Database,
190 directory: &DirectoryLayer,
191 path: &[String],
192 ) -> Result<DirectorySubspace, ExternalError> {
193 let directory = db
194 .run(async |trx, _maybe_commited| {
195 Ok(directory.create_or_open(&trx, path, None, None).await)
196 })
197 .await??;
198 match directory {
199 DirectoryOutput::DirectorySubspace(subspace) => Ok(subspace),
200 DirectoryOutput::DirectoryPartition(_partition) => Err(ExternalError::from(anyhow!(
201 "consensus data cannot be a partition"
202 ))),
203 }
204 }
205
206 async fn head_trx(
212 &self,
213 trx: &Transaction,
214 data_key: &Subspace,
215 snapshot: bool,
216 ) -> Result<Option<VersionedData>, FdbTransactError> {
217 let mut range = RangeOption::from(data_key).rev();
218 range.limit = Some(1);
219 range.mode = mz_foundationdb::options::StreamingMode::Exact;
220 let values = trx.get_range(&range, 1, snapshot).await?;
221 if let Some(kv) = values.first() {
222 let seqno = data_key.unpack(kv.key())?;
223 Ok(Some(VersionedData {
224 seqno,
225 data: Bytes::from(kv.value().to_vec()),
226 }))
227 } else {
228 Ok(None)
229 }
230 }
231 async fn compare_and_set_trx(
232 &self,
233 trx: &Transaction,
234 data_key: &Subspace,
235 expected: &Option<SeqNo>,
236 new: &VersionedData,
237 key: &str,
238 ) -> Result<CaSResult, FdbTransactError> {
239 let current = self.head_trx(trx, data_key, false).await?;
241 let current_seqno = current.map(|v| v.seqno);
242
243 if expected != ¤t_seqno {
244 return Ok(CaSResult::ExpectationMismatch);
245 }
246
247 if expected.is_none() {
248 let key = self.keys.pack(&key);
250 trx.set(&key, &[]);
251 }
252
253 let data_seqno_key = data_key.pack(&new.seqno);
254 trx.set(&data_seqno_key, new.data.as_ref());
255 Ok(CaSResult::Committed)
256 }
257
258 async fn scan_trx(
259 &self,
260 trx: &Transaction,
261 data_key: &Subspace,
262 from: &SeqNo,
263 limit: &usize,
264 entries: &mut Vec<VersionedData>,
265 ) -> Result<(), FdbTransactError> {
266 let seqno_start = data_key.pack(&from);
267 let seqno_end = data_key.pack(&SeqNo::maximum());
268
269 let mut range = RangeOption::from(seqno_start..=seqno_end);
270 range.limit = Some(*limit);
271
272 entries.clear();
273
274 loop {
275 let output = trx.get_range(&range, 1, false).await?;
276 entries.reserve(output.len());
277 for key_value in &output {
278 let seqno = data_key.unpack(key_value.key())?;
279 entries.push(VersionedData {
280 seqno,
281 data: Bytes::from(key_value.value().to_vec()),
282 });
283 }
284
285 if let Some(next_range) = range.next_range(&output) {
286 range = next_range;
287 } else {
288 break;
289 }
290 }
291 Ok(())
292 }
293
294 async fn truncate_trx(
295 &self,
296 trx: &Transaction,
297 data_key: &Subspace,
298 until: &SeqNo,
299 ) -> Result<(), FdbTransactError> {
300 let current = self.head_trx(trx, data_key, true).await?;
303 if let Some(current) = current {
304 if current.seqno < *until {
305 return Err(ExternalError::Determinate(
306 anyhow!("upper bound too high for truncate: {until}").into(),
307 )
308 .into());
309 }
310 } else {
311 return Err(ExternalError::Determinate(anyhow!("no entries for key").into()).into());
312 }
313 let key_space_start = data_key.pack(&SeqNo::minimum());
314 let key_space_end = data_key.pack(&until);
315
316 trx.clear_range(&key_space_start, &key_space_end);
317 Ok(())
318 }
319}
320
321#[async_trait]
322impl Consensus for FdbConsensus {
323 fn list_keys(&self) -> ResultStream<'_, String> {
324 Box::pin(try_stream! {
325 let keys: Vec<String> = self
326 .db
327 .run(async |trx, _maybe_commited| {
328 let mut range = RangeOption::from(self.keys.range());
329 let mut keys = Vec::new();
330 loop {
331 let values = trx.get_range(&range, 1, false).await?;
332 for value in &values {
333 let key: String = self.keys.unpack(value.key())
334 .map_err(FdbBindingError::PackError)?;
335 keys.push(key);
336 }
337 if let Some(last) = values.last() {
338 range.begin = KeySelector::first_greater_than(last.key().to_vec());
339 } else {
340 break;
341 }
342 }
343 Ok(keys)
344 }).await?;
345
346 for shard in keys {
347 yield shard;
348 }
349 })
350 }
351
352 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
353 let data_key = self.data.subspace(&key);
354
355 let ok = self
356 .db
357 .transact_boxed(
358 &data_key,
359 |trx, data_key| self.head_trx(trx, data_key, true).boxed(),
361 TransactOption::default(),
362 )
363 .await?;
364 Ok(ok)
365 }
366
367 async fn compare_and_set(
368 &self,
369 key: &str,
370 expected: Option<SeqNo>,
371 new: VersionedData,
372 ) -> Result<CaSResult, ExternalError> {
373 if let Some(expected) = expected {
374 if new.seqno <= expected {
375 return Err(Error::from(
376 format!("new seqno must be strictly greater than expected. Got new: {:?} expected: {:?}",
377 new.seqno, expected)).into());
378 }
379 }
380 if new.seqno.0 > i64::MAX.try_into().expect("i64::MAX known to fit in u64") {
381 return Err(ExternalError::from(anyhow!(
382 "sequence numbers must fit within [0, i64::MAX], received: {:?}",
383 new.seqno
384 )));
385 }
386
387 let data_key = self.data.subspace(&key);
388
389 let ok = self
390 .db
391 .transact_boxed(
392 (expected, &new, &*key),
393 |trx, (expected, new, key)| {
394 self.compare_and_set_trx(trx, &data_key, expected, new, key)
395 .boxed()
396 },
397 TransactOption::default(),
398 )
399 .await?;
400 Ok(ok)
401 }
402
403 async fn scan(
404 &self,
405 key: &str,
406 from: SeqNo,
407 limit: usize,
408 ) -> Result<Vec<VersionedData>, ExternalError> {
409 let data_key = self.data.subspace(&key);
410 let mut entries = Vec::new();
411 self.db
412 .transact_boxed(
413 (&data_key, from, limit, &mut entries),
414 |trx, (data_key, from, limit, entries)| {
415 self.scan_trx(trx, data_key, from, limit, entries).boxed()
416 },
417 TransactOption::default(),
418 )
419 .await?;
420
421 entries.sort_by_key(|e| e.seqno);
422 Ok(entries)
423 }
424
425 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
426 let data_key = self.data.subspace(&key);
427
428 self.db
429 .transact_boxed(
430 (&data_key, seqno),
431 |trx, (data_key, seqno)| self.truncate_trx(trx, data_key, seqno).boxed(),
432 TransactOption::idempotent(),
433 )
434 .await?;
435 Ok(None)
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442
443 use mz_foundationdb::directory::Directory;
444 use uuid::Uuid;
445
446 use crate::location::tests::consensus_impl_test;
447
448 async fn drop_and_recreate(consensus: &FdbConsensus) -> Result<(), ExternalError> {
452 consensus
453 .db
454 .run(async |trx, _maybe_commited| {
455 consensus.keys.remove(&trx, &[]).await?;
456 consensus.data.remove(&trx, &[]).await?;
457 Ok(())
458 })
459 .await?;
460 Ok(())
461 }
462
463 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
464 #[cfg_attr(miri, ignore)] #[ignore] async fn fdb_consensus() -> Result<(), ExternalError> {
467 let config = FdbConsensusConfig::new(
468 std::str::FromStr::from_str("foundationdb:?prefix=test/consensus").unwrap(),
469 )?;
470
471 {
472 let fdb = FdbConsensus::open(config.clone()).await?;
473 drop_and_recreate(&fdb).await?;
474 }
475
476 consensus_impl_test(|| FdbConsensus::open(config.clone())).await?;
477
478 let consensus = FdbConsensus::open(config.clone()).await?;
480 let key = Uuid::new_v4().to_string();
481 let mut state = VersionedData {
482 seqno: SeqNo(5),
483 data: Bytes::from("abc"),
484 };
485
486 assert_eq!(
487 consensus.compare_and_set(&key, None, state.clone()).await,
488 Ok(CaSResult::Committed),
489 );
490 state.seqno = SeqNo(6);
491 assert_eq!(
492 consensus
493 .compare_and_set(&key, Some(SeqNo(5)), state.clone())
494 .await,
495 Ok(CaSResult::Committed),
496 );
497 state.seqno = SeqNo(129 + 5);
498 assert_eq!(
499 consensus
500 .compare_and_set(&key, Some(SeqNo(6)), state.clone())
501 .await,
502 Ok(CaSResult::Committed),
503 );
504
505 assert_eq!(consensus.head(&key).await, Ok(Some(state.clone())));
506
507 println!("--- SCANNING ---");
508
509 for data in consensus.scan(&key, SeqNo(129), 10).await? {
510 println!(
511 "scan data: seqno: {:?}, {} bytes",
512 data.seqno,
513 data.data.len()
514 );
515 }
516
517 drop_and_recreate(&consensus).await?;
518
519 assert_eq!(consensus.head(&key).await, Ok(None));
520
521 mz_foundationdb::shutdown_network();
522 Ok(())
523 }
524}