Skip to main content

mz_rocksdb/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An async wrapper around RocksDB, that does IO on a separate thread.
11//!
12//! This crate offers a limited API to communicate with RocksDB, to get
13//! the best performance possible (most importantly, by batching operations).
14//! Currently this API is only `upsert`, which replaces (or deletes) values for
15//! a set of keys, and returns the previous values.
16
17#![warn(missing_docs)]
18
19use std::convert::AsRef;
20use std::ops::Deref;
21use std::path::{Path, PathBuf};
22use std::time::Instant;
23
24use itertools::Itertools;
25use mz_ore::cast::CastFrom;
26use mz_ore::error::ErrorExt;
27use mz_ore::metrics::{DeleteOnDropCounter, DeleteOnDropHistogram};
28use mz_ore::retry::{Retry, RetryResult};
29use prometheus::core::AtomicU64;
30use rocksdb::merge_operator::MergeOperandsIter;
31use rocksdb::{DB, Env, Error as RocksDBError, ErrorKind, Options as RocksDBOptions, WriteOptions};
32use serde::Serialize;
33use serde::de::DeserializeOwned;
34use tokio::sync::{mpsc, oneshot};
35
36pub mod config;
37pub use config::{RocksDBConfig, RocksDBTuningParameters, defaults};
38
39use crate::config::WriteBufferManagerHandle;
40
41type Diff = mz_ore::Overflowing<i64>;
42
43/// An error using this RocksDB wrapper.
44#[derive(Debug, thiserror::Error)]
45pub enum Error {
46    /// An error from the underlying Kafka library.
47    #[error(transparent)]
48    RocksDB(#[from] RocksDBError),
49
50    /// Error when using the instance after RocksDB as errored
51    /// or been shutdown.
52    #[error("RocksDB thread has been shut down or errored")]
53    RocksDBThreadGoneAway,
54
55    /// Error decoding a value previously written.
56    #[error("failed to decode value")]
57    DecodeError(#[from] bincode::Error),
58
59    /// A tokio thread used by the implementation panicked.
60    #[error("tokio thread panicked")]
61    TokioPanic(#[from] tokio::task::JoinError),
62
63    /// A tokio thread used by the implementation panicked.
64    #[error("failed to cleanup in time")]
65    CleanupTimeout(#[from] tokio::time::error::Elapsed),
66
67    /// An error occured with a provided value.
68    #[error("error with value: {0}")]
69    ValueError(String),
70}
71
72/// An iterator over operand values to merge for a key in RocksDB.
73/// By convention the first value will be the existing value
74/// if it was present.
75pub struct ValueIterator<'a, O, V>
76where
77    O: bincode::Options + Copy + Send + Sync + 'static,
78    V: DeserializeOwned + Serialize + Send + Sync + 'static,
79{
80    iter: std::iter::Chain<std::option::IntoIter<&'a [u8]>, MergeOperandsIter<'a>>,
81    bincode: &'a O,
82    v: std::marker::PhantomData<V>,
83}
84
85impl<O, V> Iterator for ValueIterator<'_, O, V>
86where
87    O: bincode::Options + Copy + Send + Sync + 'static,
88    V: DeserializeOwned + Serialize + Send + Sync + 'static,
89{
90    type Item = V;
91
92    fn next(&mut self) -> Option<Self::Item> {
93        self.iter
94            .next()
95            .map(|v| self.bincode.deserialize(v).unwrap())
96    }
97}
98
99/// Helper type stub to satisfy generic bounds when initializing a `InstanceOptions` without a
100/// defined merge operator.
101pub type StubMergeOperator<V> =
102    fn(key: &[u8], operands: ValueIterator<bincode::DefaultOptions, V>) -> V;
103
104/// Fixed options to configure a [`RocksDBInstance`]. These are not tuning parameters,
105/// see the `config` modules for tuning. These are generally fixed within the binary.
106pub struct InstanceOptions<O, V, F> {
107    /// Whether or not to clear state at the instance
108    /// path before starting.
109    pub cleanup_on_new: bool,
110
111    /// If `cleanup_on_new`, how many times to try.
112    pub cleanup_tries: usize,
113
114    /// Whether or not to write writes
115    /// to the wal. This is not in `RocksDBTuningParameters` because it
116    /// applies to `WriteOptions` when creating `WriteBatch`es.
117    pub use_wal: bool,
118
119    /// A possibly shared RocksDB `Env`.
120    pub env: Env,
121
122    /// The bincode options to use for serializing and deserializing values.
123    pub bincode: O,
124
125    /// A merge operator to use for associative merges, if any. The first
126    /// item is the name of the operator to store in RocksDB for
127    /// compatibility checks, and the second is the merge function.
128    pub merge_operator: Option<(String, F)>,
129
130    v: std::marker::PhantomData<V>,
131}
132
133impl<O, V, F> InstanceOptions<O, V, F>
134where
135    O: bincode::Options + Copy + Send + Sync + 'static,
136    V: DeserializeOwned + Serialize + Send + Sync + 'static,
137    F: for<'a> Fn(&'a [u8], ValueIterator<'a, O, V>) -> V + Copy + Send + Sync + 'static,
138{
139    /// A new `Options` object with reasonable defaults.
140    pub fn new(
141        env: rocksdb::Env,
142        cleanup_tries: usize,
143        merge_operator: Option<(String, F)>,
144        bincode: O,
145    ) -> Self {
146        InstanceOptions {
147            cleanup_on_new: true,
148            cleanup_tries,
149            use_wal: false,
150            env,
151            merge_operator,
152            bincode,
153            v: std::marker::PhantomData,
154        }
155    }
156
157    fn as_rocksdb_options(
158        &self,
159        tuning_config: &RocksDBConfig,
160    ) -> (RocksDBOptions, Option<WriteBufferManagerHandle>) {
161        // Defaults + `create_if_missing`
162        let mut options = rocksdb::Options::default();
163        options.create_if_missing(true);
164
165        // Set the env first so tuning applies to the shared `Env`.
166        options.set_env(&self.env);
167
168        if let Some((fn_name, merge_fn)) = &self.merge_operator {
169            let bincode = self.bincode.clone();
170            let merge_fn = merge_fn.clone();
171            // We use an associative merge operator which is used for both full/partial merges
172            // since the value type is always the same for puts and merges.
173            // See `https://github.com/facebook/rocksdb/wiki/Merge-Operator#associativity-vs-non-associativity`
174            // for more info.
175            options.set_merge_operator_associative(fn_name, move |key, existing, operands| {
176                let operands = ValueIterator {
177                    iter: existing.into_iter().chain(operands.iter()),
178                    bincode: &bincode,
179                    v: std::marker::PhantomData::<V>,
180                };
181                let result = merge_fn(key, operands);
182                // NOTE: While the API specifies the return type as Option<Vec<u8>>, returning a None
183                // will cause rocksdb to throw a corruption error and SIGABRT the process.
184                Some(bincode.serialize(&result).unwrap())
185            });
186        }
187
188        let write_buffer_handle = config::apply_to_options(tuning_config, &mut options);
189        // Returns the rocksdb options and an optional `WriteBufferManagerHandle`
190        // if write buffer manager was enabled in the configs.
191        (options, write_buffer_handle)
192    }
193
194    fn as_rocksdb_write_options(&self) -> WriteOptions {
195        let mut wo = rocksdb::WriteOptions::new();
196        wo.disable_wal(!self.use_wal);
197        wo
198    }
199}
200
201/// Shared metrics about an instances usage of RocksDB. User-provided
202/// so the user can choose the labels.
203pub struct RocksDBSharedMetrics {
204    /// Latency of multi_gets, in fractional seconds.
205    pub multi_get_latency: DeleteOnDropHistogram<Vec<String>>,
206    /// Latency of write batch writes, in fractional seconds.
207    pub multi_put_latency: DeleteOnDropHistogram<Vec<String>>,
208}
209
210/// Worker metrics about an instances usage of RocksDB. User-provided
211/// so the user can choose the labels.
212pub struct RocksDBInstanceMetrics {
213    /// Size of multi_get batches.
214    pub multi_get_size: DeleteOnDropCounter<AtomicU64, Vec<String>>,
215    /// Size of multi_get non-empty results.
216    pub multi_get_result_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
217    /// Total size of bytes returned in the result
218    pub multi_get_result_bytes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
219    /// The number of calls to rocksdb multi_get
220    pub multi_get_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
221    /// The number of calls to rocksdb multi_put
222    pub multi_put_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
223    /// Size of write batches.
224    pub multi_put_size: DeleteOnDropCounter<AtomicU64, Vec<String>>,
225}
226
227/// The result type for `multi_get`.
228#[derive(Default, Debug)]
229pub struct MultiGetResult {
230    /// The number of keys we fetched.
231    pub processed_gets: u64,
232    /// The total size of values fetched.
233    pub processed_gets_size: u64,
234    /// The number of records returns.
235    pub returned_gets: u64,
236}
237
238/// The result type for individual gets.
239#[derive(Debug, Default, Clone)]
240pub struct GetResult<V> {
241    /// The previous value, if there was one.
242    pub value: V,
243    /// The size of `value` as persisted, if there was one.
244    /// Useful for users keeping track of statistics.
245    pub size: u64,
246}
247
248/// The result type for `multi_update`.
249#[derive(Default, Debug)]
250pub struct MultiUpdateResult {
251    /// The number of puts, merges, and deletes.
252    pub processed_updates: u64,
253    /// The total size of values we wrote to the database.
254    /// Does not contain any information about deletes.
255    pub size_written: u64,
256    /// The 'diff' size of the values we wrote to the database,
257    /// returned when the `MultiUpdate` command included a multiplier 'diff'
258    /// for at least one update value.
259    pub size_diff: Option<Diff>,
260}
261
262/// The type of update to perform on a key.
263#[derive(Debug)]
264pub enum KeyUpdate<V> {
265    /// Put a value into the database under the given key.
266    Put(V),
267    /// Merge the database value with the given value.
268    /// Will error if the merge operator is not set.
269    Merge(V),
270    /// Delete the value from the database.
271    Delete,
272}
273
274#[derive(Debug)]
275enum Command<K, V> {
276    MultiGet {
277        batch: Vec<K>,
278        // Scratch vector to return results in.
279        results_scratch: Vec<Option<GetResult<V>>>,
280        response_sender: oneshot::Sender<
281            Result<
282                (
283                    MultiGetResult,
284                    // The batch scratch vector being given back.
285                    Vec<K>,
286                    Vec<Option<GetResult<V>>>,
287                ),
288                Error,
289            >,
290        >,
291    },
292    MultiUpdate {
293        // The batch of updates to perform. The 3rd item in each tuple is an optional diff
294        // multiplier that when present, will be multiplied by the size of the encoded
295        // value written to the database and summed into the `MultiUpdateResult::size_diff` field.
296        batch: Vec<(K, KeyUpdate<V>, Option<Diff>)>,
297        // Scratch vector to return results in.
298        response_sender: oneshot::Sender<
299            Result<(MultiUpdateResult, Vec<(K, KeyUpdate<V>, Option<Diff>)>), Error>,
300        >,
301    },
302    Shutdown {
303        done_sender: oneshot::Sender<()>,
304    },
305    ManualCompaction {
306        done_sender: oneshot::Sender<()>,
307    },
308}
309
310/// An async wrapper around RocksDB.
311pub struct RocksDBInstance<K, V> {
312    tx: mpsc::Sender<Command<K, V>>,
313
314    // Scratch vector to send keys to the RocksDB thread
315    // during `MultiGet`.
316    multi_get_scratch: Vec<K>,
317
318    // Scratch vector to return results from the RocksDB thread
319    // during `MultiGet`.
320    multi_get_results_scratch: Vec<Option<GetResult<V>>>,
321
322    // Scratch vector to send updates to the RocksDB thread
323    // during `MultiUpdate`.
324    multi_update_scratch: Vec<(K, KeyUpdate<V>, Option<Diff>)>,
325
326    // Configuration that can change dynamically.
327    dynamic_config: config::RocksDBDynamicConfig,
328
329    /// Whether this instance supports merge operations (whether a
330    /// merge operator was set at creation time)
331    pub supports_merges: bool,
332
333    /// JoinHandle for the rocksdb core loop that processes requests.
334    handle: Option<std::thread::JoinHandle<()>>,
335}
336
337impl<K, V> RocksDBInstance<K, V>
338where
339    K: AsRef<[u8]> + Send + Sync + 'static,
340    V: Serialize + DeserializeOwned + Send + Sync + 'static,
341{
342    /// Start a new RocksDB instance at the path, using
343    /// the `Options` and `RocksDBTuningParameters` to
344    /// configure the instance.
345    ///
346    /// `metrics` is a set of metric types that this type will keep
347    /// up to date. `enc_opts` is the `bincode` options used to
348    /// serialize and deserialize the keys and values.
349    pub fn new<M, O, IM, F>(
350        instance_path: &Path,
351        options: InstanceOptions<O, V, F>,
352        tuning_config: RocksDBConfig,
353        shared_metrics: M,
354        instance_metrics: IM,
355    ) -> Result<Self, Error>
356    where
357        O: bincode::Options + Copy + Send + Sync + 'static,
358        M: Deref<Target = RocksDBSharedMetrics> + Send + 'static,
359        IM: Deref<Target = RocksDBInstanceMetrics> + Send + 'static,
360        F: for<'a> Fn(&'a [u8], ValueIterator<'a, O, V>) -> V + Copy + Send + Sync + 'static,
361    {
362        let dynamic_config = tuning_config.dynamic.clone();
363        let supports_merges = options.merge_operator.is_some();
364
365        // The buffer can be small here, as all interactions with it take `&mut self`.
366        let (tx, rx): (mpsc::Sender<Command<K, V>>, _) = mpsc::channel(10);
367
368        let instance_path = instance_path.to_owned();
369        // RocksDB inititalization and core loop are executed in a separate thread to avoid
370        // blocking and surfacing initialization errors which may result in a panic.
371        // If initialization fails, the thread exits.  The channel rx handle will be dropped,
372        // and any command to RocksDBInstance will fail with Error::RocksDBThreadGoneAway,
373        // resulting in suspend-and-restart.
374        let handle = std::thread::spawn(move || {
375            rocksdb_core_loop(
376                options,
377                tuning_config,
378                instance_path,
379                rx,
380                shared_metrics,
381                instance_metrics,
382            )
383        });
384
385        Ok(Self {
386            tx,
387            multi_get_scratch: Vec::new(),
388            multi_get_results_scratch: Vec::new(),
389            multi_update_scratch: Vec::new(),
390            dynamic_config,
391            supports_merges,
392            handle: Some(handle),
393        })
394    }
395
396    /// Take ownership of the join handle for the core thread. Once taken, this will just return
397    /// [`None`].
398    pub fn take_core_loop_handle(&mut self) -> Option<std::thread::JoinHandle<()>> {
399        self.handle.take()
400    }
401
402    /// For each _unique_ key in `gets`, place the stored value (if any) in `results_out`.
403    ///
404    /// Panics if `gets` and `results_out` are not the same length.
405    pub async fn multi_get<'r, G, R, Ret, Placement>(
406        &mut self,
407        gets: G,
408        results_out: R,
409        placement: Placement,
410    ) -> Result<MultiGetResult, Error>
411    where
412        G: IntoIterator<Item = K>,
413        R: IntoIterator<Item = &'r mut Ret>,
414        Ret: 'r,
415        Placement: Fn(Option<GetResult<V>>) -> Ret,
416    {
417        let batch_size = self.dynamic_config.batch_size();
418        let mut stats = MultiGetResult::default();
419
420        let mut gets = gets.into_iter().peekable();
421        if gets.peek().is_some() {
422            let gets = gets.chunks(batch_size);
423            let results_out = results_out.into_iter().chunks(batch_size);
424
425            for (gets, results_out) in gets.into_iter().zip_eq(results_out.into_iter()) {
426                let ret = self.multi_get_inner(gets, results_out, &placement).await?;
427                stats.processed_gets += ret.processed_gets;
428                stats.processed_gets_size += ret.processed_gets_size;
429                stats.returned_gets += ret.returned_gets;
430            }
431        }
432
433        Ok(stats)
434    }
435
436    async fn multi_get_inner<'r, G, R, Ret, Placement>(
437        &mut self,
438        gets: G,
439        results_out: R,
440        placement: &Placement,
441    ) -> Result<MultiGetResult, Error>
442    where
443        G: IntoIterator<Item = K>,
444        R: IntoIterator<Item = &'r mut Ret>,
445        Ret: 'r,
446        Placement: Fn(Option<GetResult<V>>) -> Ret,
447    {
448        let mut multi_get_vec = std::mem::take(&mut self.multi_get_scratch);
449        let mut results_vec = std::mem::take(&mut self.multi_get_results_scratch);
450        multi_get_vec.clear();
451        results_vec.clear();
452
453        multi_get_vec.extend(gets);
454        if multi_get_vec.is_empty() {
455            self.multi_get_scratch = multi_get_vec;
456            self.multi_get_results_scratch = results_vec;
457            return Ok(MultiGetResult {
458                processed_gets: 0,
459                processed_gets_size: 0,
460                returned_gets: 0,
461            });
462        }
463
464        let (tx, rx) = oneshot::channel();
465        self.tx
466            .send(Command::MultiGet {
467                batch: multi_get_vec,
468                results_scratch: results_vec,
469                response_sender: tx,
470            })
471            .await
472            .map_err(|_| Error::RocksDBThreadGoneAway)?;
473
474        // We also unwrap all rocksdb errors here.
475        match rx.await.map_err(|_| Error::RocksDBThreadGoneAway)? {
476            Ok((ret, get_scratch, mut results_scratch)) => {
477                for (place, get) in results_out.into_iter().zip_eq(results_scratch.drain(..)) {
478                    *place = placement(get);
479                }
480                self.multi_get_scratch = get_scratch;
481                self.multi_get_results_scratch = results_scratch;
482                Ok(ret)
483            }
484            Err(e) => {
485                // Note we don't attempt to preserve the scratch allocations here.
486                Err(e)
487            }
488        }
489    }
490
491    /// For each key in puts, store the given value, or delete it if
492    /// the value is `None`. If the same `key` appears multiple times,
493    /// the last value for the key wins.
494    /// The third item in each tuple is an optional diff multiplier that when present,
495    /// will be multiplied by the size of the encoded value written to the database and
496    /// summed into the `MultiUpdateResult::size_diff` field.
497    pub async fn multi_update<P>(&mut self, puts: P) -> Result<MultiUpdateResult, Error>
498    where
499        P: IntoIterator<Item = (K, KeyUpdate<V>, Option<Diff>)>,
500    {
501        let batch_size = self.dynamic_config.batch_size();
502        let mut stats = MultiUpdateResult::default();
503
504        let mut puts = puts.into_iter().peekable();
505        if puts.peek().is_some() {
506            let puts = puts.chunks(batch_size);
507
508            for puts in puts.into_iter() {
509                let ret = self.multi_update_inner(puts).await?;
510                stats.processed_updates += ret.processed_updates;
511                stats.size_written += ret.size_written;
512                if let Some(diff) = ret.size_diff {
513                    stats.size_diff = Some(stats.size_diff.unwrap_or(Diff::ZERO) + diff);
514                }
515            }
516        }
517
518        Ok(stats)
519    }
520
521    async fn multi_update_inner<P>(&mut self, updates: P) -> Result<MultiUpdateResult, Error>
522    where
523        P: IntoIterator<Item = (K, KeyUpdate<V>, Option<Diff>)>,
524    {
525        let mut multi_put_vec = std::mem::take(&mut self.multi_update_scratch);
526        multi_put_vec.clear();
527
528        multi_put_vec.extend(updates);
529        if multi_put_vec.is_empty() {
530            self.multi_update_scratch = multi_put_vec;
531            return Ok(MultiUpdateResult {
532                processed_updates: 0,
533                size_written: 0,
534                size_diff: None,
535            });
536        }
537
538        let (tx, rx) = oneshot::channel();
539        self.tx
540            .send(Command::MultiUpdate {
541                batch: multi_put_vec,
542                response_sender: tx,
543            })
544            .await
545            .map_err(|_| Error::RocksDBThreadGoneAway)?;
546
547        // We also unwrap all rocksdb errors here.
548        match rx.await.map_err(|_| Error::RocksDBThreadGoneAway)? {
549            Ok((ret, scratch)) => {
550                self.multi_update_scratch = scratch;
551                Ok(ret)
552            }
553            Err(e) => {
554                // Note we don't attempt to preserve the allocation here.
555                Err(e)
556            }
557        }
558    }
559
560    /// Trigger manual compaction of the RocksDB instance.
561    pub async fn manual_compaction(&self) -> Result<(), Error> {
562        let (tx, rx) = oneshot::channel();
563        self.tx
564            .send(Command::ManualCompaction { done_sender: tx })
565            .await
566            .map_err(|_| Error::RocksDBThreadGoneAway)?;
567
568        rx.await.map_err(|_| Error::RocksDBThreadGoneAway)
569    }
570
571    /// Gracefully shut down RocksDB. Can error if the instance
572    /// is already shut down or errored.
573    pub async fn close(self) -> Result<(), Error> {
574        let (tx, rx) = oneshot::channel();
575        self.tx
576            .send(Command::Shutdown { done_sender: tx })
577            .await
578            .map_err(|_| Error::RocksDBThreadGoneAway)?;
579
580        let _ = rx.await;
581
582        Ok(())
583    }
584}
585
586fn rocksdb_core_loop<K, V, M, O, IM, F>(
587    options: InstanceOptions<O, V, F>,
588    tuning_config: RocksDBConfig,
589    instance_path: PathBuf,
590    mut cmd_rx: mpsc::Receiver<Command<K, V>>,
591    shared_metrics: M,
592    instance_metrics: IM,
593) where
594    K: AsRef<[u8]> + Send + Sync + 'static,
595    V: Serialize + DeserializeOwned + Send + Sync + 'static,
596    M: Deref<Target = RocksDBSharedMetrics> + Send + 'static,
597    O: bincode::Options + Copy + Send + Sync + 'static,
598    F: for<'a> Fn(&'a [u8], ValueIterator<'a, O, V>) -> V + Send + Sync + Copy + 'static,
599    IM: Deref<Target = RocksDBInstanceMetrics> + Send + 'static,
600{
601    if options.cleanup_on_new && instance_path.exists() {
602        // We require that cleanup of the DB succeeds. Otherwise, we could open a DB with old,
603        // incorrect data. Because of races with dataflow shutdown, we retry a few times here.
604        // 1s with a 2x backoff is ~30s after 5 tries.
605        let retry = mz_ore::retry::Retry::default()
606            .max_tries(options.cleanup_tries)
607            // Large DB's could take multiple seconds to run.
608            .initial_backoff(std::time::Duration::from_secs(1));
609
610        let destroy_result = retry.retry(|_rs| {
611            if let Err(e) = DB::destroy(&RocksDBOptions::default(), &*instance_path) {
612                tracing::warn!(
613                    "failed to cleanup rocksdb dir on creation {}: {}",
614                    instance_path.display(),
615                    e.display_with_causes(),
616                );
617                RetryResult::RetryableErr(Error::from(e))
618            } else {
619                RetryResult::Ok(())
620            }
621        });
622        if let Err(e) = destroy_result {
623            tracing::error!(
624                "retries exhausted trying to cleanup rocksdb dir on creation {}: {}",
625                instance_path.display(),
626                e.display_with_causes(),
627            );
628            return;
629        }
630    }
631
632    let retry_max_duration = tuning_config.retry_max_duration;
633
634    // Handle to an optional reference of a write buffer manager which
635    // should be valid till the rocksdb thread is running.
636    // The shared write buffer manager will be cleaned up if all
637    // the handles are dropped across all the rocksdb instances.
638    let (rocksdb_options, write_buffer_handle) = options.as_rocksdb_options(&tuning_config);
639    tracing::info!(
640        "Starting rocksdb at {:?} with write_buffer_manager: {:?}",
641        instance_path,
642        write_buffer_handle
643    );
644
645    let retry_result = Retry::default()
646        .max_duration(retry_max_duration)
647        .retry(|_| match DB::open(&rocksdb_options, &instance_path) {
648            Ok(db) => RetryResult::Ok(db),
649            Err(e) => match e.kind() {
650                ErrorKind::TryAgain => RetryResult::RetryableErr(Error::RocksDB(e)),
651                _ => RetryResult::FatalErr(Error::RocksDB(e)),
652            },
653        });
654
655    let db: DB = match retry_result {
656        Ok(db) => db,
657        Err(e) => {
658            tracing::error!(
659                "failed to create rocksdb at {}: {}",
660                instance_path.display(),
661                e.display_with_causes(),
662            );
663            return;
664        }
665    };
666
667    let mut encoded_batch_buffers: Vec<Option<Vec<u8>>> = Vec::new();
668    let mut encoded_batch: Vec<(K, KeyUpdate<Vec<u8>>)> = Vec::new();
669
670    let wo = options.as_rocksdb_write_options();
671    while let Some(cmd) = cmd_rx.blocking_recv() {
672        match cmd {
673            Command::Shutdown { done_sender } => {
674                shutdown_and_cleanup(db, &instance_path);
675                drop(write_buffer_handle);
676                let _ = done_sender.send(());
677                return;
678            }
679            Command::ManualCompaction { done_sender } => {
680                // Compact the full key-range.
681                db.compact_range::<&[u8], &[u8]>(None, None);
682                let _ = done_sender.send(());
683            }
684            Command::MultiGet {
685                mut batch,
686                mut results_scratch,
687                response_sender,
688            } => {
689                let batch_size = batch.len();
690
691                // Perform the multi_get and record metrics, if there wasn't an error.
692                let now = Instant::now();
693                let retry_result = Retry::default()
694                    .max_duration(retry_max_duration)
695                    .retry(|_| {
696                        let gets = db.multi_get(batch.iter());
697                        let latency = now.elapsed();
698
699                        let gets: Result<Vec<_>, _> = gets.into_iter().collect();
700                        match gets {
701                            Ok(gets) => {
702                                shared_metrics
703                                    .multi_get_latency
704                                    .observe(latency.as_secs_f64());
705                                instance_metrics
706                                    .multi_get_size
707                                    .inc_by(batch_size.try_into().unwrap());
708                                instance_metrics.multi_get_count.inc();
709
710                                RetryResult::Ok(gets)
711                            }
712                            Err(e) => match e.kind() {
713                                ErrorKind::TryAgain => RetryResult::RetryableErr(Error::RocksDB(e)),
714                                _ => RetryResult::FatalErr(Error::RocksDB(e)),
715                            },
716                        }
717                    });
718
719                let _ = match retry_result {
720                    Ok(gets) => {
721                        let processed_gets: u64 = gets.len().try_into().unwrap();
722                        let mut processed_gets_size = 0;
723                        let mut returned_gets: u64 = 0;
724                        for previous_value in gets {
725                            let get_result = match previous_value {
726                                Some(previous_value) => {
727                                    match options.bincode.deserialize(&previous_value) {
728                                        Ok(value) => {
729                                            let size = u64::cast_from(previous_value.len());
730                                            processed_gets_size += size;
731                                            returned_gets += 1;
732                                            Some(GetResult { value, size })
733                                        }
734                                        Err(e) => {
735                                            let _ =
736                                                response_sender.send(Err(Error::DecodeError(e)));
737                                            return;
738                                        }
739                                    }
740                                }
741                                None => None,
742                            };
743                            results_scratch.push(get_result);
744                        }
745
746                        instance_metrics
747                            .multi_get_result_count
748                            .inc_by(returned_gets);
749                        instance_metrics
750                            .multi_get_result_bytes
751                            .inc_by(processed_gets_size);
752                        batch.clear();
753                        response_sender.send(Ok((
754                            MultiGetResult {
755                                processed_gets,
756                                processed_gets_size,
757                                returned_gets,
758                            },
759                            batch,
760                            results_scratch,
761                        )))
762                    }
763                    Err(e) => response_sender.send(Err(e)),
764                };
765            }
766            Command::MultiUpdate {
767                mut batch,
768                response_sender,
769            } => {
770                let batch_size = batch.len();
771
772                let mut ret = MultiUpdateResult {
773                    processed_updates: 0,
774                    size_written: 0,
775                    size_diff: None,
776                };
777
778                // initialize and push values into the buffer to match the batch size
779                let buf_size = encoded_batch_buffers.len();
780                for _ in buf_size..batch_size {
781                    encoded_batch_buffers.push(Some(Vec::new()));
782                }
783                // shrinking the buffers in case the scratch buffer's capacity is significantly
784                // more than the size of batch
785                if tuning_config.shrink_buffers_by_ratio > 0 {
786                    let reduced_capacity =
787                        encoded_batch_buffers.capacity() / tuning_config.shrink_buffers_by_ratio;
788                    if reduced_capacity > batch_size {
789                        encoded_batch_buffers.truncate(reduced_capacity);
790                        encoded_batch_buffers.shrink_to(reduced_capacity);
791
792                        encoded_batch.truncate(reduced_capacity);
793                        encoded_batch.shrink_to(reduced_capacity);
794                    }
795                }
796
797                let Some(encode_bufs) = encoded_batch_buffers.get_mut(0..batch_size) else {
798                    panic!(
799                        "Encoded buffers over-truncated. expected >= {batch_size} actual: {}",
800                        encoded_batch_buffers.len()
801                    );
802                };
803
804                // TODO(guswynn): sort by key before writing.
805                for ((key, value, diff), encode_buf) in batch.drain(..).zip_eq(encode_bufs) {
806                    ret.processed_updates += 1;
807
808                    match &value {
809                        update_type @ (KeyUpdate::Put(update) | KeyUpdate::Merge(update)) => {
810                            let mut encode_buf =
811                                encode_buf.take().expect("encode_buf should not be empty");
812                            encode_buf.clear();
813                            match options
814                                .bincode
815                                .serialize_into::<&mut Vec<u8>, _>(&mut encode_buf, update)
816                            {
817                                Ok(()) => {
818                                    ret.size_written += u64::cast_from(encode_buf.len());
819                                    // calculate the diff size if the diff multiplier is present
820                                    if let Some(diff) = diff {
821                                        let encoded_len = Diff::try_from(encode_buf.len())
822                                            .expect("less than i64 size");
823                                        ret.size_diff = Some(
824                                            ret.size_diff.unwrap_or(Diff::ZERO)
825                                                + (diff * encoded_len),
826                                        );
827                                    }
828                                }
829                                Err(e) => {
830                                    let _ = response_sender.send(Err(Error::DecodeError(e)));
831                                    return;
832                                }
833                            };
834                            if matches!(update_type, KeyUpdate::Put(_)) {
835                                encoded_batch.push((key, KeyUpdate::Put(encode_buf)));
836                            } else {
837                                encoded_batch.push((key, KeyUpdate::Merge(encode_buf)));
838                            }
839                        }
840                        KeyUpdate::Delete => encoded_batch.push((key, KeyUpdate::Delete)),
841                    }
842                }
843                // Perform the multi_update and record metrics, if there wasn't an error.
844                let now = Instant::now();
845                let retry_result = Retry::default()
846                    .max_duration(retry_max_duration)
847                    .retry(|_| {
848                        let mut writes = rocksdb::WriteBatch::default();
849
850                        for (key, value) in encoded_batch.iter() {
851                            match value {
852                                KeyUpdate::Put(update) => writes.put(key, update),
853                                KeyUpdate::Merge(update) => writes.merge(key, update),
854                                KeyUpdate::Delete => writes.delete(key),
855                            }
856                        }
857
858                        match db.write_opt(writes, &wo) {
859                            Ok(()) => {
860                                let latency = now.elapsed();
861                                shared_metrics
862                                    .multi_put_latency
863                                    .observe(latency.as_secs_f64());
864                                instance_metrics
865                                    .multi_put_size
866                                    .inc_by(batch_size.try_into().unwrap());
867                                instance_metrics.multi_put_count.inc();
868                                RetryResult::Ok(())
869                            }
870                            Err(e) => match e.kind() {
871                                ErrorKind::TryAgain => RetryResult::RetryableErr(Error::RocksDB(e)),
872                                _ => RetryResult::FatalErr(Error::RocksDB(e)),
873                            },
874                        }
875                    });
876
877                // put back the values in the buffer so we don't lose allocation
878                for (i, (_, encoded_buffer)) in encoded_batch.drain(..).enumerate() {
879                    if let KeyUpdate::Put(encoded_buffer) | KeyUpdate::Merge(encoded_buffer) =
880                        encoded_buffer
881                    {
882                        encoded_batch_buffers[i] = Some(encoded_buffer);
883                    }
884                }
885
886                match retry_result {
887                    Ok(()) => {
888                        batch.clear();
889                        let _ = response_sender.send(Ok((ret, batch)));
890                    }
891                    Err(e) => {
892                        let db_err = match e {
893                            Error::RocksDB(ref inner) => Some(inner.clone()),
894                            _ => None,
895                        };
896                        let _ = response_sender.send(Err(e));
897                        if let Some(db_err) = db_err {
898                            if !matches!(db_err.kind(), ErrorKind::TryAgain) {
899                                tracing::warn!(
900                                    "exiting on fatal rocksdb error at {}: {}",
901                                    instance_path.display(),
902                                    db_err.display_with_causes(),
903                                );
904                                break;
905                            }
906                        }
907                    }
908                };
909            }
910        }
911    }
912    shutdown_and_cleanup(db, &instance_path);
913}
914
915fn shutdown_and_cleanup(db: DB, instance_path: &PathBuf) {
916    // Gracefully cleanup if the `RocksDBInstance` has gone away.
917    db.cancel_all_background_work(true);
918    drop(db);
919    tracing::info!("dropped rocksdb at {}", instance_path.display());
920
921    // Note that we don't retry, as we already may race here with a source being restarted.
922    if let Err(e) = DB::destroy(&RocksDBOptions::default(), &*instance_path) {
923        tracing::warn!(
924            "failed to cleanup rocksdb dir at {}: {}",
925            instance_path.display(),
926            e.display_with_causes(),
927        );
928    }
929}