1#![warn(missing_docs)]
18
19use std::convert::AsRef;
20use std::ops::Deref;
21use std::path::{Path, PathBuf};
22use std::time::Instant;
23
24use itertools::Itertools;
25use mz_ore::cast::CastFrom;
26use mz_ore::error::ErrorExt;
27use mz_ore::metrics::{DeleteOnDropCounter, DeleteOnDropHistogram};
28use mz_ore::retry::{Retry, RetryResult};
29use prometheus::core::AtomicU64;
30use rocksdb::merge_operator::MergeOperandsIter;
31use rocksdb::{DB, Env, Error as RocksDBError, ErrorKind, Options as RocksDBOptions, WriteOptions};
32use serde::Serialize;
33use serde::de::DeserializeOwned;
34use tokio::sync::{mpsc, oneshot};
35
36pub mod config;
37pub use config::{RocksDBConfig, RocksDBTuningParameters, defaults};
38
39use crate::config::WriteBufferManagerHandle;
40
41type Diff = mz_ore::Overflowing<i64>;
42
43#[derive(Debug, thiserror::Error)]
45pub enum Error {
46 #[error(transparent)]
48 RocksDB(#[from] RocksDBError),
49
50 #[error("RocksDB thread has been shut down or errored")]
53 RocksDBThreadGoneAway,
54
55 #[error("failed to decode value")]
57 DecodeError(#[from] bincode::Error),
58
59 #[error("tokio thread panicked")]
61 TokioPanic(#[from] tokio::task::JoinError),
62
63 #[error("failed to cleanup in time")]
65 CleanupTimeout(#[from] tokio::time::error::Elapsed),
66
67 #[error("error with value: {0}")]
69 ValueError(String),
70}
71
72pub struct ValueIterator<'a, O, V>
76where
77 O: bincode::Options + Copy + Send + Sync + 'static,
78 V: DeserializeOwned + Serialize + Send + Sync + 'static,
79{
80 iter: std::iter::Chain<std::option::IntoIter<&'a [u8]>, MergeOperandsIter<'a>>,
81 bincode: &'a O,
82 v: std::marker::PhantomData<V>,
83}
84
85impl<O, V> Iterator for ValueIterator<'_, O, V>
86where
87 O: bincode::Options + Copy + Send + Sync + 'static,
88 V: DeserializeOwned + Serialize + Send + Sync + 'static,
89{
90 type Item = V;
91
92 fn next(&mut self) -> Option<Self::Item> {
93 self.iter
94 .next()
95 .map(|v| self.bincode.deserialize(v).unwrap())
96 }
97}
98
99pub type StubMergeOperator<V> =
102 fn(key: &[u8], operands: ValueIterator<bincode::DefaultOptions, V>) -> V;
103
104pub struct InstanceOptions<O, V, F> {
107 pub cleanup_on_new: bool,
110
111 pub cleanup_tries: usize,
113
114 pub use_wal: bool,
118
119 pub env: Env,
121
122 pub bincode: O,
124
125 pub merge_operator: Option<(String, F)>,
129
130 v: std::marker::PhantomData<V>,
131}
132
133impl<O, V, F> InstanceOptions<O, V, F>
134where
135 O: bincode::Options + Copy + Send + Sync + 'static,
136 V: DeserializeOwned + Serialize + Send + Sync + 'static,
137 F: for<'a> Fn(&'a [u8], ValueIterator<'a, O, V>) -> V + Copy + Send + Sync + 'static,
138{
139 pub fn new(
141 env: rocksdb::Env,
142 cleanup_tries: usize,
143 merge_operator: Option<(String, F)>,
144 bincode: O,
145 ) -> Self {
146 InstanceOptions {
147 cleanup_on_new: true,
148 cleanup_tries,
149 use_wal: false,
150 env,
151 merge_operator,
152 bincode,
153 v: std::marker::PhantomData,
154 }
155 }
156
157 fn as_rocksdb_options(
158 &self,
159 tuning_config: &RocksDBConfig,
160 ) -> (RocksDBOptions, Option<WriteBufferManagerHandle>) {
161 let mut options = rocksdb::Options::default();
163 options.create_if_missing(true);
164
165 options.set_env(&self.env);
167
168 if let Some((fn_name, merge_fn)) = &self.merge_operator {
169 let bincode = self.bincode.clone();
170 let merge_fn = merge_fn.clone();
171 options.set_merge_operator_associative(fn_name, move |key, existing, operands| {
176 let operands = ValueIterator {
177 iter: existing.into_iter().chain(operands.iter()),
178 bincode: &bincode,
179 v: std::marker::PhantomData::<V>,
180 };
181 let result = merge_fn(key, operands);
182 Some(bincode.serialize(&result).unwrap())
185 });
186 }
187
188 let write_buffer_handle = config::apply_to_options(tuning_config, &mut options);
189 (options, write_buffer_handle)
192 }
193
194 fn as_rocksdb_write_options(&self) -> WriteOptions {
195 let mut wo = rocksdb::WriteOptions::new();
196 wo.disable_wal(!self.use_wal);
197 wo
198 }
199}
200
201pub struct RocksDBSharedMetrics {
204 pub multi_get_latency: DeleteOnDropHistogram<Vec<String>>,
206 pub multi_put_latency: DeleteOnDropHistogram<Vec<String>>,
208}
209
210pub struct RocksDBInstanceMetrics {
213 pub multi_get_size: DeleteOnDropCounter<AtomicU64, Vec<String>>,
215 pub multi_get_result_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
217 pub multi_get_result_bytes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
219 pub multi_get_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
221 pub multi_put_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
223 pub multi_put_size: DeleteOnDropCounter<AtomicU64, Vec<String>>,
225}
226
227#[derive(Default, Debug)]
229pub struct MultiGetResult {
230 pub processed_gets: u64,
232 pub processed_gets_size: u64,
234 pub returned_gets: u64,
236}
237
238#[derive(Debug, Default, Clone)]
240pub struct GetResult<V> {
241 pub value: V,
243 pub size: u64,
246}
247
248#[derive(Default, Debug)]
250pub struct MultiUpdateResult {
251 pub processed_updates: u64,
253 pub size_written: u64,
256 pub size_diff: Option<Diff>,
260}
261
262#[derive(Debug)]
264pub enum KeyUpdate<V> {
265 Put(V),
267 Merge(V),
270 Delete,
272}
273
274#[derive(Debug)]
275enum Command<K, V> {
276 MultiGet {
277 batch: Vec<K>,
278 results_scratch: Vec<Option<GetResult<V>>>,
280 response_sender: oneshot::Sender<
281 Result<
282 (
283 MultiGetResult,
284 Vec<K>,
286 Vec<Option<GetResult<V>>>,
287 ),
288 Error,
289 >,
290 >,
291 },
292 MultiUpdate {
293 batch: Vec<(K, KeyUpdate<V>, Option<Diff>)>,
297 response_sender: oneshot::Sender<
299 Result<(MultiUpdateResult, Vec<(K, KeyUpdate<V>, Option<Diff>)>), Error>,
300 >,
301 },
302 Shutdown {
303 done_sender: oneshot::Sender<()>,
304 },
305 ManualCompaction {
306 done_sender: oneshot::Sender<()>,
307 },
308}
309
310pub struct RocksDBInstance<K, V> {
312 tx: mpsc::Sender<Command<K, V>>,
313
314 multi_get_scratch: Vec<K>,
317
318 multi_get_results_scratch: Vec<Option<GetResult<V>>>,
321
322 multi_update_scratch: Vec<(K, KeyUpdate<V>, Option<Diff>)>,
325
326 dynamic_config: config::RocksDBDynamicConfig,
328
329 pub supports_merges: bool,
332
333 handle: Option<std::thread::JoinHandle<()>>,
335}
336
337impl<K, V> RocksDBInstance<K, V>
338where
339 K: AsRef<[u8]> + Send + Sync + 'static,
340 V: Serialize + DeserializeOwned + Send + Sync + 'static,
341{
342 pub fn new<M, O, IM, F>(
350 instance_path: &Path,
351 options: InstanceOptions<O, V, F>,
352 tuning_config: RocksDBConfig,
353 shared_metrics: M,
354 instance_metrics: IM,
355 ) -> Result<Self, Error>
356 where
357 O: bincode::Options + Copy + Send + Sync + 'static,
358 M: Deref<Target = RocksDBSharedMetrics> + Send + 'static,
359 IM: Deref<Target = RocksDBInstanceMetrics> + Send + 'static,
360 F: for<'a> Fn(&'a [u8], ValueIterator<'a, O, V>) -> V + Copy + Send + Sync + 'static,
361 {
362 let dynamic_config = tuning_config.dynamic.clone();
363 let supports_merges = options.merge_operator.is_some();
364
365 let (tx, rx): (mpsc::Sender<Command<K, V>>, _) = mpsc::channel(10);
367
368 let instance_path = instance_path.to_owned();
369 let handle = std::thread::spawn(move || {
375 rocksdb_core_loop(
376 options,
377 tuning_config,
378 instance_path,
379 rx,
380 shared_metrics,
381 instance_metrics,
382 )
383 });
384
385 Ok(Self {
386 tx,
387 multi_get_scratch: Vec::new(),
388 multi_get_results_scratch: Vec::new(),
389 multi_update_scratch: Vec::new(),
390 dynamic_config,
391 supports_merges,
392 handle: Some(handle),
393 })
394 }
395
396 pub fn take_core_loop_handle(&mut self) -> Option<std::thread::JoinHandle<()>> {
399 self.handle.take()
400 }
401
402 pub async fn multi_get<'r, G, R, Ret, Placement>(
406 &mut self,
407 gets: G,
408 results_out: R,
409 placement: Placement,
410 ) -> Result<MultiGetResult, Error>
411 where
412 G: IntoIterator<Item = K>,
413 R: IntoIterator<Item = &'r mut Ret>,
414 Ret: 'r,
415 Placement: Fn(Option<GetResult<V>>) -> Ret,
416 {
417 let batch_size = self.dynamic_config.batch_size();
418 let mut stats = MultiGetResult::default();
419
420 let mut gets = gets.into_iter().peekable();
421 if gets.peek().is_some() {
422 let gets = gets.chunks(batch_size);
423 let results_out = results_out.into_iter().chunks(batch_size);
424
425 for (gets, results_out) in gets.into_iter().zip_eq(results_out.into_iter()) {
426 let ret = self.multi_get_inner(gets, results_out, &placement).await?;
427 stats.processed_gets += ret.processed_gets;
428 stats.processed_gets_size += ret.processed_gets_size;
429 stats.returned_gets += ret.returned_gets;
430 }
431 }
432
433 Ok(stats)
434 }
435
436 async fn multi_get_inner<'r, G, R, Ret, Placement>(
437 &mut self,
438 gets: G,
439 results_out: R,
440 placement: &Placement,
441 ) -> Result<MultiGetResult, Error>
442 where
443 G: IntoIterator<Item = K>,
444 R: IntoIterator<Item = &'r mut Ret>,
445 Ret: 'r,
446 Placement: Fn(Option<GetResult<V>>) -> Ret,
447 {
448 let mut multi_get_vec = std::mem::take(&mut self.multi_get_scratch);
449 let mut results_vec = std::mem::take(&mut self.multi_get_results_scratch);
450 multi_get_vec.clear();
451 results_vec.clear();
452
453 multi_get_vec.extend(gets);
454 if multi_get_vec.is_empty() {
455 self.multi_get_scratch = multi_get_vec;
456 self.multi_get_results_scratch = results_vec;
457 return Ok(MultiGetResult {
458 processed_gets: 0,
459 processed_gets_size: 0,
460 returned_gets: 0,
461 });
462 }
463
464 let (tx, rx) = oneshot::channel();
465 self.tx
466 .send(Command::MultiGet {
467 batch: multi_get_vec,
468 results_scratch: results_vec,
469 response_sender: tx,
470 })
471 .await
472 .map_err(|_| Error::RocksDBThreadGoneAway)?;
473
474 match rx.await.map_err(|_| Error::RocksDBThreadGoneAway)? {
476 Ok((ret, get_scratch, mut results_scratch)) => {
477 for (place, get) in results_out.into_iter().zip_eq(results_scratch.drain(..)) {
478 *place = placement(get);
479 }
480 self.multi_get_scratch = get_scratch;
481 self.multi_get_results_scratch = results_scratch;
482 Ok(ret)
483 }
484 Err(e) => {
485 Err(e)
487 }
488 }
489 }
490
491 pub async fn multi_update<P>(&mut self, puts: P) -> Result<MultiUpdateResult, Error>
498 where
499 P: IntoIterator<Item = (K, KeyUpdate<V>, Option<Diff>)>,
500 {
501 let batch_size = self.dynamic_config.batch_size();
502 let mut stats = MultiUpdateResult::default();
503
504 let mut puts = puts.into_iter().peekable();
505 if puts.peek().is_some() {
506 let puts = puts.chunks(batch_size);
507
508 for puts in puts.into_iter() {
509 let ret = self.multi_update_inner(puts).await?;
510 stats.processed_updates += ret.processed_updates;
511 stats.size_written += ret.size_written;
512 if let Some(diff) = ret.size_diff {
513 stats.size_diff = Some(stats.size_diff.unwrap_or(Diff::ZERO) + diff);
514 }
515 }
516 }
517
518 Ok(stats)
519 }
520
521 async fn multi_update_inner<P>(&mut self, updates: P) -> Result<MultiUpdateResult, Error>
522 where
523 P: IntoIterator<Item = (K, KeyUpdate<V>, Option<Diff>)>,
524 {
525 let mut multi_put_vec = std::mem::take(&mut self.multi_update_scratch);
526 multi_put_vec.clear();
527
528 multi_put_vec.extend(updates);
529 if multi_put_vec.is_empty() {
530 self.multi_update_scratch = multi_put_vec;
531 return Ok(MultiUpdateResult {
532 processed_updates: 0,
533 size_written: 0,
534 size_diff: None,
535 });
536 }
537
538 let (tx, rx) = oneshot::channel();
539 self.tx
540 .send(Command::MultiUpdate {
541 batch: multi_put_vec,
542 response_sender: tx,
543 })
544 .await
545 .map_err(|_| Error::RocksDBThreadGoneAway)?;
546
547 match rx.await.map_err(|_| Error::RocksDBThreadGoneAway)? {
549 Ok((ret, scratch)) => {
550 self.multi_update_scratch = scratch;
551 Ok(ret)
552 }
553 Err(e) => {
554 Err(e)
556 }
557 }
558 }
559
560 pub async fn manual_compaction(&self) -> Result<(), Error> {
562 let (tx, rx) = oneshot::channel();
563 self.tx
564 .send(Command::ManualCompaction { done_sender: tx })
565 .await
566 .map_err(|_| Error::RocksDBThreadGoneAway)?;
567
568 rx.await.map_err(|_| Error::RocksDBThreadGoneAway)
569 }
570
571 pub async fn close(self) -> Result<(), Error> {
574 let (tx, rx) = oneshot::channel();
575 self.tx
576 .send(Command::Shutdown { done_sender: tx })
577 .await
578 .map_err(|_| Error::RocksDBThreadGoneAway)?;
579
580 let _ = rx.await;
581
582 Ok(())
583 }
584}
585
586fn rocksdb_core_loop<K, V, M, O, IM, F>(
587 options: InstanceOptions<O, V, F>,
588 tuning_config: RocksDBConfig,
589 instance_path: PathBuf,
590 mut cmd_rx: mpsc::Receiver<Command<K, V>>,
591 shared_metrics: M,
592 instance_metrics: IM,
593) where
594 K: AsRef<[u8]> + Send + Sync + 'static,
595 V: Serialize + DeserializeOwned + Send + Sync + 'static,
596 M: Deref<Target = RocksDBSharedMetrics> + Send + 'static,
597 O: bincode::Options + Copy + Send + Sync + 'static,
598 F: for<'a> Fn(&'a [u8], ValueIterator<'a, O, V>) -> V + Send + Sync + Copy + 'static,
599 IM: Deref<Target = RocksDBInstanceMetrics> + Send + 'static,
600{
601 if options.cleanup_on_new && instance_path.exists() {
602 let retry = mz_ore::retry::Retry::default()
606 .max_tries(options.cleanup_tries)
607 .initial_backoff(std::time::Duration::from_secs(1));
609
610 let destroy_result = retry.retry(|_rs| {
611 if let Err(e) = DB::destroy(&RocksDBOptions::default(), &*instance_path) {
612 tracing::warn!(
613 "failed to cleanup rocksdb dir on creation {}: {}",
614 instance_path.display(),
615 e.display_with_causes(),
616 );
617 RetryResult::RetryableErr(Error::from(e))
618 } else {
619 RetryResult::Ok(())
620 }
621 });
622 if let Err(e) = destroy_result {
623 tracing::error!(
624 "retries exhausted trying to cleanup rocksdb dir on creation {}: {}",
625 instance_path.display(),
626 e.display_with_causes(),
627 );
628 return;
629 }
630 }
631
632 let retry_max_duration = tuning_config.retry_max_duration;
633
634 let (rocksdb_options, write_buffer_handle) = options.as_rocksdb_options(&tuning_config);
639 tracing::info!(
640 "Starting rocksdb at {:?} with write_buffer_manager: {:?}",
641 instance_path,
642 write_buffer_handle
643 );
644
645 let retry_result = Retry::default()
646 .max_duration(retry_max_duration)
647 .retry(|_| match DB::open(&rocksdb_options, &instance_path) {
648 Ok(db) => RetryResult::Ok(db),
649 Err(e) => match e.kind() {
650 ErrorKind::TryAgain => RetryResult::RetryableErr(Error::RocksDB(e)),
651 _ => RetryResult::FatalErr(Error::RocksDB(e)),
652 },
653 });
654
655 let db: DB = match retry_result {
656 Ok(db) => db,
657 Err(e) => {
658 tracing::error!(
659 "failed to create rocksdb at {}: {}",
660 instance_path.display(),
661 e.display_with_causes(),
662 );
663 return;
664 }
665 };
666
667 let mut encoded_batch_buffers: Vec<Option<Vec<u8>>> = Vec::new();
668 let mut encoded_batch: Vec<(K, KeyUpdate<Vec<u8>>)> = Vec::new();
669
670 let wo = options.as_rocksdb_write_options();
671 while let Some(cmd) = cmd_rx.blocking_recv() {
672 match cmd {
673 Command::Shutdown { done_sender } => {
674 shutdown_and_cleanup(db, &instance_path);
675 drop(write_buffer_handle);
676 let _ = done_sender.send(());
677 return;
678 }
679 Command::ManualCompaction { done_sender } => {
680 db.compact_range::<&[u8], &[u8]>(None, None);
682 let _ = done_sender.send(());
683 }
684 Command::MultiGet {
685 mut batch,
686 mut results_scratch,
687 response_sender,
688 } => {
689 let batch_size = batch.len();
690
691 let now = Instant::now();
693 let retry_result = Retry::default()
694 .max_duration(retry_max_duration)
695 .retry(|_| {
696 let gets = db.multi_get(batch.iter());
697 let latency = now.elapsed();
698
699 let gets: Result<Vec<_>, _> = gets.into_iter().collect();
700 match gets {
701 Ok(gets) => {
702 shared_metrics
703 .multi_get_latency
704 .observe(latency.as_secs_f64());
705 instance_metrics
706 .multi_get_size
707 .inc_by(batch_size.try_into().unwrap());
708 instance_metrics.multi_get_count.inc();
709
710 RetryResult::Ok(gets)
711 }
712 Err(e) => match e.kind() {
713 ErrorKind::TryAgain => RetryResult::RetryableErr(Error::RocksDB(e)),
714 _ => RetryResult::FatalErr(Error::RocksDB(e)),
715 },
716 }
717 });
718
719 let _ = match retry_result {
720 Ok(gets) => {
721 let processed_gets: u64 = gets.len().try_into().unwrap();
722 let mut processed_gets_size = 0;
723 let mut returned_gets: u64 = 0;
724 for previous_value in gets {
725 let get_result = match previous_value {
726 Some(previous_value) => {
727 match options.bincode.deserialize(&previous_value) {
728 Ok(value) => {
729 let size = u64::cast_from(previous_value.len());
730 processed_gets_size += size;
731 returned_gets += 1;
732 Some(GetResult { value, size })
733 }
734 Err(e) => {
735 let _ =
736 response_sender.send(Err(Error::DecodeError(e)));
737 return;
738 }
739 }
740 }
741 None => None,
742 };
743 results_scratch.push(get_result);
744 }
745
746 instance_metrics
747 .multi_get_result_count
748 .inc_by(returned_gets);
749 instance_metrics
750 .multi_get_result_bytes
751 .inc_by(processed_gets_size);
752 batch.clear();
753 response_sender.send(Ok((
754 MultiGetResult {
755 processed_gets,
756 processed_gets_size,
757 returned_gets,
758 },
759 batch,
760 results_scratch,
761 )))
762 }
763 Err(e) => response_sender.send(Err(e)),
764 };
765 }
766 Command::MultiUpdate {
767 mut batch,
768 response_sender,
769 } => {
770 let batch_size = batch.len();
771
772 let mut ret = MultiUpdateResult {
773 processed_updates: 0,
774 size_written: 0,
775 size_diff: None,
776 };
777
778 let buf_size = encoded_batch_buffers.len();
780 for _ in buf_size..batch_size {
781 encoded_batch_buffers.push(Some(Vec::new()));
782 }
783 if tuning_config.shrink_buffers_by_ratio > 0 {
786 let reduced_capacity =
787 encoded_batch_buffers.capacity() / tuning_config.shrink_buffers_by_ratio;
788 if reduced_capacity > batch_size {
789 encoded_batch_buffers.truncate(reduced_capacity);
790 encoded_batch_buffers.shrink_to(reduced_capacity);
791
792 encoded_batch.truncate(reduced_capacity);
793 encoded_batch.shrink_to(reduced_capacity);
794 }
795 }
796
797 let Some(encode_bufs) = encoded_batch_buffers.get_mut(0..batch_size) else {
798 panic!(
799 "Encoded buffers over-truncated. expected >= {batch_size} actual: {}",
800 encoded_batch_buffers.len()
801 );
802 };
803
804 for ((key, value, diff), encode_buf) in batch.drain(..).zip_eq(encode_bufs) {
806 ret.processed_updates += 1;
807
808 match &value {
809 update_type @ (KeyUpdate::Put(update) | KeyUpdate::Merge(update)) => {
810 let mut encode_buf =
811 encode_buf.take().expect("encode_buf should not be empty");
812 encode_buf.clear();
813 match options
814 .bincode
815 .serialize_into::<&mut Vec<u8>, _>(&mut encode_buf, update)
816 {
817 Ok(()) => {
818 ret.size_written += u64::cast_from(encode_buf.len());
819 if let Some(diff) = diff {
821 let encoded_len = Diff::try_from(encode_buf.len())
822 .expect("less than i64 size");
823 ret.size_diff = Some(
824 ret.size_diff.unwrap_or(Diff::ZERO)
825 + (diff * encoded_len),
826 );
827 }
828 }
829 Err(e) => {
830 let _ = response_sender.send(Err(Error::DecodeError(e)));
831 return;
832 }
833 };
834 if matches!(update_type, KeyUpdate::Put(_)) {
835 encoded_batch.push((key, KeyUpdate::Put(encode_buf)));
836 } else {
837 encoded_batch.push((key, KeyUpdate::Merge(encode_buf)));
838 }
839 }
840 KeyUpdate::Delete => encoded_batch.push((key, KeyUpdate::Delete)),
841 }
842 }
843 let now = Instant::now();
845 let retry_result = Retry::default()
846 .max_duration(retry_max_duration)
847 .retry(|_| {
848 let mut writes = rocksdb::WriteBatch::default();
849
850 for (key, value) in encoded_batch.iter() {
851 match value {
852 KeyUpdate::Put(update) => writes.put(key, update),
853 KeyUpdate::Merge(update) => writes.merge(key, update),
854 KeyUpdate::Delete => writes.delete(key),
855 }
856 }
857
858 match db.write_opt(writes, &wo) {
859 Ok(()) => {
860 let latency = now.elapsed();
861 shared_metrics
862 .multi_put_latency
863 .observe(latency.as_secs_f64());
864 instance_metrics
865 .multi_put_size
866 .inc_by(batch_size.try_into().unwrap());
867 instance_metrics.multi_put_count.inc();
868 RetryResult::Ok(())
869 }
870 Err(e) => match e.kind() {
871 ErrorKind::TryAgain => RetryResult::RetryableErr(Error::RocksDB(e)),
872 _ => RetryResult::FatalErr(Error::RocksDB(e)),
873 },
874 }
875 });
876
877 for (i, (_, encoded_buffer)) in encoded_batch.drain(..).enumerate() {
879 if let KeyUpdate::Put(encoded_buffer) | KeyUpdate::Merge(encoded_buffer) =
880 encoded_buffer
881 {
882 encoded_batch_buffers[i] = Some(encoded_buffer);
883 }
884 }
885
886 match retry_result {
887 Ok(()) => {
888 batch.clear();
889 let _ = response_sender.send(Ok((ret, batch)));
890 }
891 Err(e) => {
892 let db_err = match e {
893 Error::RocksDB(ref inner) => Some(inner.clone()),
894 _ => None,
895 };
896 let _ = response_sender.send(Err(e));
897 if let Some(db_err) = db_err {
898 if !matches!(db_err.kind(), ErrorKind::TryAgain) {
899 tracing::warn!(
900 "exiting on fatal rocksdb error at {}: {}",
901 instance_path.display(),
902 db_err.display_with_causes(),
903 );
904 break;
905 }
906 }
907 }
908 };
909 }
910 }
911 }
912 shutdown_and_cleanup(db, &instance_path);
913}
914
915fn shutdown_and_cleanup(db: DB, instance_path: &PathBuf) {
916 db.cancel_all_background_work(true);
918 drop(db);
919 tracing::info!("dropped rocksdb at {}", instance_path.display());
920
921 if let Err(e) = DB::destroy(&RocksDBOptions::default(), &*instance_path) {
923 tracing::warn!(
924 "failed to cleanup rocksdb dir at {}: {}",
925 instance_path.display(),
926 e.display_with_causes(),
927 );
928 }
929}