mz_rocksdb/
config.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// This module is mostly boilerplate, with all relevant
11// documentation on `RocksDBTuningParameters`.
12#![allow(missing_docs)]
13
14//! This module handles converting `mz_rocksdb_types` into `rocksdb` types.
15
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::{Arc, Mutex, Weak};
18use std::time::Duration;
19
20use mz_ore::cast::CastLossy;
21
22use derivative::Derivative;
23use rocksdb::{DBCompactionStyle, DBCompressionType, WriteBufferManager};
24
25pub use mz_rocksdb_types::config::defaults;
26pub use mz_rocksdb_types::config::*;
27
28#[derive(Debug, Clone)]
29pub struct RocksDBDynamicConfig {
30    batch_size: Arc<AtomicUsize>,
31}
32
33impl RocksDBDynamicConfig {
34    pub fn batch_size(&self) -> usize {
35        // SeqCst is probably not required here, but its the easiest to reason about
36        self.batch_size.load(Ordering::SeqCst)
37    }
38}
39
40/// Configurable options for a `RocksDBInstance`. Some can be updated
41/// dynamically, as cloned instances of this object will shared dynamic values.
42#[derive(Clone, Derivative)]
43#[derivative(Debug)]
44pub struct RocksDBConfig {
45    pub compaction_style: CompactionStyle,
46    pub optimize_compaction_memtable_budget: usize,
47    pub level_compaction_dynamic_level_bytes: bool,
48    pub universal_compaction_target_ratio: i32,
49    pub parallelism: Option<i32>,
50    pub compression_type: CompressionType,
51    pub bottommost_compression_type: CompressionType,
52    pub retry_max_duration: Duration,
53    pub stats_log_interval_seconds: u32,
54    pub stats_persist_interval_seconds: u32,
55    pub point_lookup_block_cache_size_mb: Option<u32>,
56    pub shrink_buffers_by_ratio: usize,
57    pub dynamic: RocksDBDynamicConfig,
58
59    /// Write buffer manager configs
60    pub write_buffer_manager_config: RocksDbWriteBufferManagerConfig,
61    /// Shared write buffer manager instance,
62    /// can only be instantiated once via `get_or_init_handle`
63    #[derivative(Debug = "ignore")]
64    pub shared_write_buffer_manager: SharedWriteBufferManager,
65}
66
67impl RocksDBConfig {
68    pub fn new(
69        shared_write_buffer_manager: SharedWriteBufferManager,
70        cluster_memory_limit: Option<usize>,
71    ) -> Self {
72        Self::new_from_params(
73            RocksDBTuningParameters::default(),
74            shared_write_buffer_manager,
75            cluster_memory_limit,
76        )
77    }
78
79    fn new_from_params(
80        params: RocksDBTuningParameters,
81        shared_write_buffer_manager: SharedWriteBufferManager,
82        cluster_memory_limit: Option<usize>,
83    ) -> Self {
84        let RocksDBTuningParameters {
85            compaction_style,
86            optimize_compaction_memtable_budget,
87            level_compaction_dynamic_level_bytes,
88            universal_compaction_target_ratio,
89            parallelism,
90            compression_type,
91            bottommost_compression_type,
92            batch_size,
93            retry_max_duration,
94            stats_log_interval_seconds,
95            stats_persist_interval_seconds,
96            point_lookup_block_cache_size_mb,
97            shrink_buffers_by_ratio,
98            write_buffer_manager_memory_bytes,
99            write_buffer_manager_memory_fraction,
100            write_buffer_manager_allow_stall,
101        } = params;
102
103        Self {
104            compaction_style,
105            optimize_compaction_memtable_budget,
106            level_compaction_dynamic_level_bytes,
107            universal_compaction_target_ratio,
108            parallelism,
109            compression_type,
110            bottommost_compression_type,
111            retry_max_duration,
112            stats_log_interval_seconds,
113            stats_persist_interval_seconds,
114            point_lookup_block_cache_size_mb,
115            shrink_buffers_by_ratio,
116            dynamic: RocksDBDynamicConfig {
117                batch_size: Arc::new(AtomicUsize::new(batch_size)),
118            },
119            write_buffer_manager_config: RocksDbWriteBufferManagerConfig {
120                write_buffer_manager_memory_bytes,
121                write_buffer_manager_memory_fraction,
122                write_buffer_manager_allow_stall,
123                cluster_memory_limit,
124            },
125            shared_write_buffer_manager,
126        }
127    }
128
129    /// Apply the new parameters to the config. Dynamic parameters
130    /// are updated in place.
131    pub fn apply(&mut self, params: RocksDBTuningParameters) {
132        let RocksDBTuningParameters {
133            compaction_style,
134            optimize_compaction_memtable_budget,
135            level_compaction_dynamic_level_bytes,
136            universal_compaction_target_ratio,
137            parallelism,
138            compression_type,
139            bottommost_compression_type,
140            batch_size,
141            retry_max_duration,
142            stats_log_interval_seconds,
143            stats_persist_interval_seconds,
144            point_lookup_block_cache_size_mb,
145            shrink_buffers_by_ratio,
146            write_buffer_manager_memory_bytes,
147            write_buffer_manager_memory_fraction,
148            write_buffer_manager_allow_stall,
149        } = params;
150
151        self.compaction_style = compaction_style;
152        self.optimize_compaction_memtable_budget = optimize_compaction_memtable_budget;
153        self.level_compaction_dynamic_level_bytes = level_compaction_dynamic_level_bytes;
154        self.universal_compaction_target_ratio = universal_compaction_target_ratio;
155        self.parallelism = parallelism;
156        self.compression_type = compression_type;
157        self.bottommost_compression_type = bottommost_compression_type;
158        self.retry_max_duration = retry_max_duration;
159        self.stats_log_interval_seconds = stats_log_interval_seconds;
160        self.stats_persist_interval_seconds = stats_persist_interval_seconds;
161        self.point_lookup_block_cache_size_mb = point_lookup_block_cache_size_mb;
162        self.shrink_buffers_by_ratio = shrink_buffers_by_ratio;
163
164        self.write_buffer_manager_config
165            .write_buffer_manager_memory_bytes = write_buffer_manager_memory_bytes;
166        self.write_buffer_manager_config
167            .write_buffer_manager_memory_fraction = write_buffer_manager_memory_fraction;
168        self.write_buffer_manager_config
169            .write_buffer_manager_allow_stall = write_buffer_manager_allow_stall;
170
171        // SeqCst is probably not required here, but its the easiest to reason about
172        self.dynamic.batch_size.store(batch_size, Ordering::SeqCst);
173    }
174}
175
176#[derive(Clone, Default)]
177pub struct SharedWriteBufferManager {
178    /// Keeping a Weak pointer to [WriteBufferManager] here behind an Arc and a Mutex.
179    /// The strong pointers will be owned by each `RocksDBInstance`.
180    /// When the rocksdb instances are cleaned up, the [WriteBufferManager] here will
181    /// be cleaned up as well.
182    /// Updates to config values via [RocksDbWriteBufferManagerConfig] will not update
183    /// the [WriteBufferManager] once it's initialized here and there's at least one RocksDBInstance
184    /// keeping a strong reference to it.
185    shared: Arc<Mutex<Weak<WriteBufferManager>>>,
186}
187
188#[derive(Derivative)]
189#[derivative(Debug)]
190/// A handle to the [WriteBufferManager] which will be dropped when the
191/// rocksdb thread is dropped.
192pub struct WriteBufferManagerHandle {
193    #[derivative(Debug(format_with = "fmt_write_buffer_manager"))]
194    inner: Arc<WriteBufferManager>,
195}
196
197fn fmt_write_buffer_manager(
198    buf: &Arc<WriteBufferManager>,
199    fmt: &mut std::fmt::Formatter,
200) -> Result<(), std::fmt::Error> {
201    fmt.debug_struct("WriteBufferManager")
202        .field("enabled", &buf.enabled())
203        .field("buffer_size", &buf.get_buffer_size())
204        .field("memory_usage", &buf.get_usage())
205        .finish()
206}
207
208impl SharedWriteBufferManager {
209    /// If a shared [WriteBufferManager] does not already exist, then it's initialized
210    /// with given `initializer`.
211    /// A strong reference is returned for the shared buffer manager.
212    pub(crate) fn get_or_init<F>(&self, initializer: F) -> WriteBufferManagerHandle
213    where
214        F: FnOnce() -> WriteBufferManager,
215    {
216        let mut lock = self.shared.lock().expect("lock poisoned");
217
218        let wbm = match lock.upgrade() {
219            Some(wbm) => wbm,
220            None => {
221                let new_wbm: Arc<WriteBufferManager> = Arc::new(initializer());
222                *lock = Arc::downgrade(&new_wbm);
223                new_wbm
224            }
225        };
226        WriteBufferManagerHandle { inner: wbm }
227    }
228
229    /// This will return a non-empty [WriteBufferManager] only after it has been
230    /// initialized by a RocksDBInstance.
231    /// This method is only used in tests.
232    pub fn get(&self) -> Option<Arc<WriteBufferManager>> {
233        self.shared.lock().expect("lock poisoned").upgrade()
234    }
235}
236
237/// An `Into` we can implement on foreign types
238trait IntoRocksDBType {
239    type Type;
240    fn into_rocksdb(self) -> Self::Type;
241}
242
243impl IntoRocksDBType for CompactionStyle {
244    type Type = DBCompactionStyle;
245    fn into_rocksdb(self) -> Self::Type {
246        use CompactionStyle::*;
247        match self {
248            Level => DBCompactionStyle::Level,
249            Universal => DBCompactionStyle::Universal,
250        }
251    }
252}
253
254impl IntoRocksDBType for CompressionType {
255    type Type = DBCompressionType;
256    fn into_rocksdb(self) -> Self::Type {
257        use CompressionType::*;
258        match self {
259            Zstd => DBCompressionType::Zstd,
260            Snappy => DBCompressionType::Snappy,
261            Lz4 => DBCompressionType::Lz4,
262            None => DBCompressionType::None,
263        }
264    }
265}
266/// Apply these tuning parameters to a `rocksdb::Options`. Some may
267/// be applied to a shared `Env` underlying the `Options`.
268/// If configured, then a write buffer manager will be initialized
269/// and a handle to it will be returned.
270pub fn apply_to_options(
271    config: &RocksDBConfig,
272    options: &mut rocksdb::Options,
273) -> Option<WriteBufferManagerHandle> {
274    let RocksDBConfig {
275        compaction_style,
276        optimize_compaction_memtable_budget,
277        level_compaction_dynamic_level_bytes,
278        universal_compaction_target_ratio,
279        parallelism,
280        compression_type,
281        bottommost_compression_type,
282        retry_max_duration: _,
283        stats_log_interval_seconds,
284        stats_persist_interval_seconds,
285        point_lookup_block_cache_size_mb,
286        shrink_buffers_by_ratio: _,
287        dynamic: _,
288        shared_write_buffer_manager,
289        write_buffer_manager_config,
290    } = config;
291
292    options.set_compression_type((*compression_type).into_rocksdb());
293
294    if *bottommost_compression_type != CompressionType::None {
295        options.set_bottommost_compression_type((*bottommost_compression_type).into_rocksdb())
296    }
297
298    options.set_compaction_style((*compaction_style).into_rocksdb());
299    match compaction_style {
300        CompactionStyle::Level => {
301            options.optimize_level_style_compaction(*optimize_compaction_memtable_budget);
302            options.set_level_compaction_dynamic_level_bytes(*level_compaction_dynamic_level_bytes);
303        }
304        CompactionStyle::Universal => {
305            options.optimize_universal_style_compaction(*optimize_compaction_memtable_budget);
306            options.set_level_compaction_dynamic_level_bytes(*level_compaction_dynamic_level_bytes);
307
308            let mut universal_options = rocksdb::UniversalCompactOptions::default();
309            universal_options
310                .set_max_size_amplification_percent(*universal_compaction_target_ratio);
311
312            options.set_universal_compaction_options(&universal_options);
313        }
314    }
315
316    let parallelism = if let Some(parallelism) = parallelism {
317        *parallelism
318    } else {
319        // TODO(guswynn): it's unclear if this should be `get_physical`. The
320        // RocksDB docs do not make it clear.
321        num_cpus::get()
322            .try_into()
323            .expect("More than 3 billion cores")
324    };
325    options.increase_parallelism(parallelism);
326
327    options.set_stats_dump_period_sec(*stats_log_interval_seconds);
328    options.set_stats_persist_period_sec(*stats_persist_interval_seconds);
329
330    if let Some(block_cache_size_mb) = point_lookup_block_cache_size_mb {
331        options.optimize_for_point_lookup((*block_cache_size_mb).into());
332    }
333
334    let write_buffer_manager = get_write_buffer_manager(write_buffer_manager_config);
335    let write_buffer_manager_handle = write_buffer_manager.map(|buf| {
336        let handle = shared_write_buffer_manager.get_or_init(|| buf);
337        options.set_write_buffer_manager(&handle.inner);
338        handle
339    });
340
341    write_buffer_manager_handle
342}
343
344/// Getting write buffer manager based on configured values
345pub(crate) fn get_write_buffer_manager(
346    write_buffer_manager_config: &RocksDbWriteBufferManagerConfig,
347) -> Option<WriteBufferManager> {
348    let RocksDbWriteBufferManagerConfig {
349        write_buffer_manager_memory_bytes,
350        write_buffer_manager_memory_fraction,
351        write_buffer_manager_allow_stall,
352        cluster_memory_limit,
353    } = write_buffer_manager_config;
354
355    if write_buffer_manager_memory_bytes.is_some() {
356        let current_cluster_max_buffer_limit =
357            cluster_memory_limit.as_ref().and_then(|cluster_memory| {
358                write_buffer_manager_memory_fraction
359                    .map(|fraction| usize::cast_lossy(f64::cast_lossy(*cluster_memory) * fraction))
360            });
361        let write_buffer_manager_bytes = current_cluster_max_buffer_limit
362            .or(*write_buffer_manager_memory_bytes)
363            .unwrap();
364        Some(WriteBufferManager::new_write_buffer_manager(
365            write_buffer_manager_bytes,
366            *write_buffer_manager_allow_stall,
367        ))
368    } else {
369        None
370    }
371}
372
373#[cfg(test)]
374mod test {
375    use super::*;
376
377    #[mz_ore::test]
378    fn dynamic_defaults() {
379        assert_eq!(
380            RocksDBConfig::new(Default::default(), None)
381                .dynamic
382                .batch_size
383                .load(Ordering::SeqCst),
384            defaults::DEFAULT_BATCH_SIZE
385        )
386    }
387
388    #[mz_ore::test]
389    fn test_no_default() {
390        let config = RocksDbWriteBufferManagerConfig {
391            write_buffer_manager_memory_bytes: None,
392            write_buffer_manager_memory_fraction: Some(0.5),
393            write_buffer_manager_allow_stall: false,
394            cluster_memory_limit: Some(1000),
395        };
396
397        let write_buffer_manager = get_write_buffer_manager(&config);
398        assert!(write_buffer_manager.is_none());
399    }
400
401    #[mz_ore::test]
402    fn test_default() {
403        let config = RocksDbWriteBufferManagerConfig {
404            write_buffer_manager_memory_bytes: Some(1000),
405            write_buffer_manager_memory_fraction: Some(0.5),
406            write_buffer_manager_allow_stall: false,
407            cluster_memory_limit: None,
408        };
409
410        let write_buffer_manager = get_write_buffer_manager(&config);
411
412        assert!(write_buffer_manager.is_some());
413        let write_buffer_manager = write_buffer_manager.unwrap();
414        assert!(write_buffer_manager.enabled());
415        assert_eq!(
416            write_buffer_manager.get_buffer_size(),
417            config.write_buffer_manager_memory_bytes.unwrap()
418        )
419    }
420
421    #[mz_ore::test]
422    fn test_calculated_cluster_limit() {
423        let config = RocksDbWriteBufferManagerConfig {
424            write_buffer_manager_memory_bytes: Some(30000),
425            write_buffer_manager_memory_fraction: Some(0.5),
426            write_buffer_manager_allow_stall: false,
427            cluster_memory_limit: Some(2000),
428        };
429
430        let write_buffer_manager = get_write_buffer_manager(&config);
431
432        assert!(write_buffer_manager.is_some());
433        let write_buffer_manager = write_buffer_manager.unwrap();
434        assert!(write_buffer_manager.enabled());
435        // the limit should be 50% of 2000 i.e. 1000
436        assert_eq!(write_buffer_manager.get_buffer_size(), 1000)
437    }
438}