1use std::io::Write;
19
20use anyhow::anyhow;
21use async_stream::try_stream;
22use async_trait::async_trait;
23use bytes::Bytes;
24use futures_util::future::FutureExt;
25use mz_foundationdb::FdbConfig;
26use mz_foundationdb::directory::{
27 Directory, DirectoryError, DirectoryLayer, DirectoryOutput, DirectorySubspace,
28};
29use mz_foundationdb::tuple::{
30 PackError, PackResult, Subspace, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset, pack,
31 unpack,
32};
33use mz_foundationdb::{
34 Database, FdbBindingError, FdbError, KeySelector, RangeOption, TransactError, TransactOption,
35 Transaction,
36};
37use mz_ore::url::SensitiveUrl;
38
39use crate::error::Error;
40use crate::location::{
41 CaSResult, Consensus, Determinate, ExternalError, Indeterminate, ResultStream, SeqNo,
42 VersionedData,
43};
44
45impl From<FdbError> for ExternalError {
46 fn from(x: FdbError) -> Self {
47 if x.is_retryable() {
48 ExternalError::Indeterminate(Indeterminate::new(x.into()))
49 } else {
50 ExternalError::Determinate(Determinate::new(x.into()))
51 }
52 }
53}
54
55impl From<FdbBindingError> for ExternalError {
56 fn from(x: FdbBindingError) -> Self {
57 ExternalError::Determinate(Determinate::new(x.into()))
58 }
59}
60
61#[derive(Clone, Debug)]
63pub struct FdbConsensusConfig {
64 url: SensitiveUrl,
65}
66
67impl FdbConsensusConfig {
68 pub fn new(url: SensitiveUrl) -> Result<Self, Error> {
70 Ok(FdbConsensusConfig { url })
71 }
72}
73
74pub struct FdbConsensus {
76 keys: DirectorySubspace,
78 data: DirectorySubspace,
80 db: Database,
82}
83
84enum FdbTransactError {
87 FdbError(FdbError),
88 ExternalError(ExternalError),
89}
90
91impl From<FdbError> for FdbTransactError {
92 fn from(value: FdbError) -> Self {
93 Self::FdbError(value)
94 }
95}
96
97impl From<ExternalError> for FdbTransactError {
98 fn from(value: ExternalError) -> Self {
99 Self::ExternalError(value)
100 }
101}
102
103impl From<PackError> for FdbTransactError {
104 fn from(value: PackError) -> Self {
105 ExternalError::Determinate(anyhow::Error::new(value).into()).into()
106 }
107}
108
109impl From<FdbTransactError> for ExternalError {
110 fn from(value: FdbTransactError) -> Self {
111 match value {
112 FdbTransactError::FdbError(e) => e.into(),
113 FdbTransactError::ExternalError(e) => e,
114 }
115 }
116}
117
118impl From<DirectoryError> for ExternalError {
119 fn from(e: DirectoryError) -> Self {
120 ExternalError::Determinate(anyhow!("directory error: {e:?}").into())
121 }
122}
123
124impl TransactError for FdbTransactError {
125 fn try_into_fdb_error(self) -> Result<FdbError, Self> {
126 match self {
127 Self::FdbError(e) => Ok(e),
128 other => Err(other),
129 }
130 }
131}
132
133impl TuplePack for SeqNo {
134 fn pack<W: Write>(
135 &self,
136 w: &mut W,
137 tuple_depth: TupleDepth,
138 ) -> std::io::Result<VersionstampOffset> {
139 self.0.pack(w, tuple_depth)
140 }
141}
142
143impl<'de> TupleUnpack<'de> for SeqNo {
144 fn unpack(input: &'de [u8], tuple_depth: TupleDepth) -> PackResult<(&'de [u8], Self)> {
145 u64::unpack(input, tuple_depth).map(|(rem, v)| (rem, SeqNo(v)))
146 }
147}
148
149impl std::fmt::Debug for FdbConsensus {
150 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151 f.debug_struct("FdbConsensus")
152 .field("keys", &self.keys)
153 .field("data", &self.data)
154 .finish_non_exhaustive()
155 }
156}
157
158impl FdbConsensus {
159 pub async fn open(config: FdbConsensusConfig) -> Result<Self, ExternalError> {
161 let fdb_config =
162 FdbConfig::parse(&config.url).map_err(|e| ExternalError::Determinate(e.into()))?;
163
164 mz_foundationdb::init_network();
165
166 let db = Database::new(None)?;
167 let directory = DirectoryLayer::default();
168 let keys_path: Vec<_> = fdb_config
169 .prefix
170 .iter()
171 .cloned()
172 .chain(std::iter::once("keys".to_owned()))
173 .collect();
174 let keys = Self::open_directory(&db, &directory, &keys_path).await?;
175 let data_path: Vec<_> = fdb_config
176 .prefix
177 .into_iter()
178 .chain(std::iter::once("data".to_owned()))
179 .collect();
180 let data = Self::open_directory(&db, &directory, &data_path).await?;
181 Ok(FdbConsensus { keys, data, db })
182 }
183
184 async fn open_directory(
187 db: &Database,
188 directory: &DirectoryLayer,
189 path: &[String],
190 ) -> Result<DirectorySubspace, ExternalError> {
191 let directory = db
192 .run(async |trx, _maybe_commited| {
193 Ok(directory.create_or_open(&trx, path, None, None).await)
194 })
195 .await??;
196 match directory {
197 DirectoryOutput::DirectorySubspace(subspace) => Ok(subspace),
198 DirectoryOutput::DirectoryPartition(_partition) => Err(ExternalError::from(anyhow!(
199 "consensus data cannot be a partition"
200 ))),
201 }
202 }
203
204 async fn head_trx(
205 &self,
206 trx: &Transaction,
207 data_key: &Subspace,
208 ) -> Result<Option<VersionedData>, FdbTransactError> {
209 let mut range = RangeOption::from(data_key).rev();
210 range.limit = Some(1);
211 range.mode = mz_foundationdb::options::StreamingMode::Exact;
212 let values = trx.get_range(&range, 1, true).await?;
214 if let Some(kv) = values.first() {
215 let seqno = data_key.unpack(kv.key())?;
216 Ok(Some(VersionedData {
217 seqno,
218 data: Bytes::from(kv.value().to_vec()),
219 }))
220 } else {
221 Ok(None)
222 }
223 }
224 async fn compare_and_set_trx(
225 &self,
226 trx: &Transaction,
227 data_key: &Subspace,
228 expected: &Option<SeqNo>,
229 new: &VersionedData,
230 key: &str,
231 ) -> Result<CaSResult, FdbTransactError> {
232 let seqno = trx
233 .get(data_key.bytes(), false)
234 .await?
235 .map(|data| unpack(&data))
236 .transpose()?;
237
238 if expected != &seqno {
239 return Ok(CaSResult::ExpectationMismatch);
240 }
241
242 trx.set(data_key.bytes(), &pack(&new.seqno));
243
244 if expected.is_none() {
245 let key = self.keys.pack(&key);
247 trx.set(&key, &[]);
248 }
249
250 let data_seqno_key = data_key.pack(&new.seqno);
251 trx.set(&data_seqno_key, new.data.as_ref());
252 Ok(CaSResult::Committed)
253 }
254
255 async fn scan_trx(
256 &self,
257 trx: &Transaction,
258 data_key: &Subspace,
259 from: &SeqNo,
260 limit: &usize,
261 entries: &mut Vec<VersionedData>,
262 ) -> Result<(), FdbTransactError> {
263 let seqno_start = data_key.pack(&from);
264 let seqno_end = data_key.pack(&SeqNo::maximum());
265
266 let mut range = RangeOption::from(seqno_start..=seqno_end);
267 range.limit = Some(*limit);
268
269 entries.clear();
270
271 loop {
272 let output = trx.get_range(&range, 1, false).await?;
273 entries.reserve(output.len());
274 for key_value in &output {
275 let seqno = data_key.unpack(key_value.key())?;
276 entries.push(VersionedData {
277 seqno,
278 data: Bytes::from(key_value.value().to_vec()),
279 });
280 }
281
282 if let Some(next_range) = range.next_range(&output) {
283 range = next_range;
284 } else {
285 break;
286 }
287 }
288 Ok(())
289 }
290
291 async fn truncate_trx(
292 &self,
293 trx: &Transaction,
294 data_key: &Subspace,
295 until: &SeqNo,
296 ) -> Result<(), FdbTransactError> {
297 let seqno = trx.get(data_key.bytes(), false).await?;
298 if let Some(seqno) = &seqno {
299 let current_seqno: SeqNo = unpack(seqno)?;
300 if current_seqno < *until {
301 return Err(ExternalError::Determinate(
302 anyhow!("upper bound too high for truncate: {until}").into(),
303 )
304 .into());
305 }
306 } else {
307 return Err(ExternalError::Determinate(anyhow!("no entries for key").into()).into());
308 }
309 let key_space_start = data_key.pack(&SeqNo::minimum());
310 let key_space_end = data_key.pack(&until);
311
312 trx.clear_range(&key_space_start, &key_space_end);
313 Ok(())
314 }
315}
316
317#[async_trait]
318impl Consensus for FdbConsensus {
319 fn list_keys(&self) -> ResultStream<'_, String> {
320 Box::pin(try_stream! {
321 let keys: Vec<String> = self
322 .db
323 .run(async |trx, _maybe_commited| {
324 let mut range = RangeOption::from(self.keys.range());
325 let mut keys = Vec::new();
326 loop {
327 let values = trx.get_range(&range, 1, false).await?;
328 for value in &values {
329 let key: String = self.keys.unpack(value.key()).map_err(FdbBindingError::PackError)?;
330 keys.push(key);
331 }
332 if let Some(last) = values.last() {
333 range.begin = KeySelector::first_greater_than(last.key().to_vec());
334 } else {
335 break;
336 }
337 }
338 Ok(keys)
339 }).await?;
340
341 for shard in keys {
342 yield shard;
343 }
344 })
345 }
346
347 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
348 let data_key = self.data.subspace(&key);
349
350 let ok = self
351 .db
352 .transact_boxed(
353 &data_key,
354 |trx, data_key| self.head_trx(trx, data_key).boxed(),
355 TransactOption::default(),
356 )
357 .await?;
358 Ok(ok)
359 }
360
361 async fn compare_and_set(
362 &self,
363 key: &str,
364 expected: Option<SeqNo>,
365 new: VersionedData,
366 ) -> Result<CaSResult, ExternalError> {
367 if let Some(expected) = expected {
368 if new.seqno <= expected {
369 return Err(Error::from(
370 format!("new seqno must be strictly greater than expected. Got new: {:?} expected: {:?}",
371 new.seqno, expected)).into());
372 }
373 }
374 if new.seqno.0 > i64::MAX.try_into().expect("i64::MAX known to fit in u64") {
375 return Err(ExternalError::from(anyhow!(
376 "sequence numbers must fit within [0, i64::MAX], received: {:?}",
377 new.seqno
378 )));
379 }
380
381 let data_key = self.data.subspace(&key);
382
383 let ok = self
384 .db
385 .transact_boxed(
386 (expected, &new, &*key),
387 |trx, (expected, new, key)| {
388 self.compare_and_set_trx(trx, &data_key, expected, new, key)
389 .boxed()
390 },
391 TransactOption::default(),
392 )
393 .await?;
394 Ok(ok)
395 }
396
397 async fn scan(
398 &self,
399 key: &str,
400 from: SeqNo,
401 limit: usize,
402 ) -> Result<Vec<VersionedData>, ExternalError> {
403 let data_key = self.data.subspace(&key);
404 let mut entries = Vec::new();
405 self.db
406 .transact_boxed(
407 (&data_key, from, limit, &mut entries),
408 |trx, (data_key, from, limit, entries)| {
409 self.scan_trx(trx, data_key, from, limit, entries).boxed()
410 },
411 TransactOption::default(),
412 )
413 .await?;
414
415 entries.sort_by_key(|e| e.seqno);
416 Ok(entries)
417 }
418
419 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
420 let data_key = self.data.subspace(&key);
421
422 self.db
423 .transact_boxed(
424 (&data_key, seqno),
425 |trx, (data_key, seqno)| self.truncate_trx(trx, data_key, seqno).boxed(),
426 TransactOption::idempotent(),
427 )
428 .await?;
429 Ok(None)
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436
437 use mz_foundationdb::directory::Directory;
438 use uuid::Uuid;
439
440 use crate::location::tests::consensus_impl_test;
441
442 async fn drop_and_recreate(consensus: &FdbConsensus) -> Result<(), ExternalError> {
446 consensus
447 .db
448 .run(async |trx, _maybe_commited| {
449 consensus.keys.remove(&trx, &[]).await?;
450 consensus.data.remove(&trx, &[]).await?;
451 Ok(())
452 })
453 .await?;
454 Ok(())
455 }
456
457 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
458 #[cfg_attr(miri, ignore)] async fn fdb_consensus() -> Result<(), ExternalError> {
460 let config = FdbConsensusConfig::new(
461 std::str::FromStr::from_str("foundationdb:?prefix=test/consensus").unwrap(),
462 )?;
463
464 {
465 let fdb = FdbConsensus::open(config.clone()).await?;
466 drop_and_recreate(&fdb).await?;
467 }
468
469 consensus_impl_test(|| FdbConsensus::open(config.clone())).await?;
470
471 let consensus = FdbConsensus::open(config.clone()).await?;
473 let key = Uuid::new_v4().to_string();
474 let mut state = VersionedData {
475 seqno: SeqNo(5),
476 data: Bytes::from("abc"),
477 };
478
479 assert_eq!(
480 consensus.compare_and_set(&key, None, state.clone()).await,
481 Ok(CaSResult::Committed),
482 );
483 state.seqno = SeqNo(6);
484 assert_eq!(
485 consensus
486 .compare_and_set(&key, Some(SeqNo(5)), state.clone())
487 .await,
488 Ok(CaSResult::Committed),
489 );
490 state.seqno = SeqNo(129 + 5);
491 assert_eq!(
492 consensus
493 .compare_and_set(&key, Some(SeqNo(6)), state.clone())
494 .await,
495 Ok(CaSResult::Committed),
496 );
497
498 assert_eq!(consensus.head(&key).await, Ok(Some(state.clone())));
499
500 println!("--- SCANNING ---");
501
502 for data in consensus.scan(&key, SeqNo(129), 10).await? {
503 println!(
504 "scan data: seqno: {:?}, {} bytes",
505 data.seqno,
506 data.data.len()
507 );
508 }
509
510 drop_and_recreate(&consensus).await?;
511
512 assert_eq!(consensus.head(&key).await, Ok(None));
513
514 mz_foundationdb::shutdown_network();
515 Ok(())
516 }
517}