1#![allow(missing_docs)]
13
14use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::{Arc, Mutex, Weak};
18use std::time::Duration;
19
20use mz_ore::cast::CastLossy;
21
22use derivative::Derivative;
23use rocksdb::{DBCompactionStyle, DBCompressionType, WriteBufferManager};
24
25pub use mz_rocksdb_types::config::defaults;
26pub use mz_rocksdb_types::config::*;
27
28#[derive(Debug, Clone)]
29pub struct RocksDBDynamicConfig {
30 batch_size: Arc<AtomicUsize>,
31}
32
33impl RocksDBDynamicConfig {
34 pub fn batch_size(&self) -> usize {
35 self.batch_size.load(Ordering::SeqCst)
37 }
38}
39
40#[derive(Clone, Derivative)]
43#[derivative(Debug)]
44pub struct RocksDBConfig {
45 pub compaction_style: CompactionStyle,
46 pub optimize_compaction_memtable_budget: usize,
47 pub level_compaction_dynamic_level_bytes: bool,
48 pub universal_compaction_target_ratio: i32,
49 pub parallelism: Option<i32>,
50 pub compression_type: CompressionType,
51 pub bottommost_compression_type: CompressionType,
52 pub retry_max_duration: Duration,
53 pub stats_log_interval_seconds: u32,
54 pub stats_persist_interval_seconds: u32,
55 pub point_lookup_block_cache_size_mb: Option<u32>,
56 pub shrink_buffers_by_ratio: usize,
57 pub dynamic: RocksDBDynamicConfig,
58
59 pub write_buffer_manager_config: RocksDbWriteBufferManagerConfig,
61 #[derivative(Debug = "ignore")]
64 pub shared_write_buffer_manager: SharedWriteBufferManager,
65}
66
67impl RocksDBConfig {
68 pub fn new(
69 shared_write_buffer_manager: SharedWriteBufferManager,
70 cluster_memory_limit: Option<usize>,
71 ) -> Self {
72 Self::new_from_params(
73 RocksDBTuningParameters::default(),
74 shared_write_buffer_manager,
75 cluster_memory_limit,
76 )
77 }
78
79 fn new_from_params(
80 params: RocksDBTuningParameters,
81 shared_write_buffer_manager: SharedWriteBufferManager,
82 cluster_memory_limit: Option<usize>,
83 ) -> Self {
84 let RocksDBTuningParameters {
85 compaction_style,
86 optimize_compaction_memtable_budget,
87 level_compaction_dynamic_level_bytes,
88 universal_compaction_target_ratio,
89 parallelism,
90 compression_type,
91 bottommost_compression_type,
92 batch_size,
93 retry_max_duration,
94 stats_log_interval_seconds,
95 stats_persist_interval_seconds,
96 point_lookup_block_cache_size_mb,
97 shrink_buffers_by_ratio,
98 write_buffer_manager_memory_bytes,
99 write_buffer_manager_memory_fraction,
100 write_buffer_manager_allow_stall,
101 } = params;
102
103 Self {
104 compaction_style,
105 optimize_compaction_memtable_budget,
106 level_compaction_dynamic_level_bytes,
107 universal_compaction_target_ratio,
108 parallelism,
109 compression_type,
110 bottommost_compression_type,
111 retry_max_duration,
112 stats_log_interval_seconds,
113 stats_persist_interval_seconds,
114 point_lookup_block_cache_size_mb,
115 shrink_buffers_by_ratio,
116 dynamic: RocksDBDynamicConfig {
117 batch_size: Arc::new(AtomicUsize::new(batch_size)),
118 },
119 write_buffer_manager_config: RocksDbWriteBufferManagerConfig {
120 write_buffer_manager_memory_bytes,
121 write_buffer_manager_memory_fraction,
122 write_buffer_manager_allow_stall,
123 cluster_memory_limit,
124 },
125 shared_write_buffer_manager,
126 }
127 }
128
129 pub fn apply(&mut self, params: RocksDBTuningParameters) {
132 let RocksDBTuningParameters {
133 compaction_style,
134 optimize_compaction_memtable_budget,
135 level_compaction_dynamic_level_bytes,
136 universal_compaction_target_ratio,
137 parallelism,
138 compression_type,
139 bottommost_compression_type,
140 batch_size,
141 retry_max_duration,
142 stats_log_interval_seconds,
143 stats_persist_interval_seconds,
144 point_lookup_block_cache_size_mb,
145 shrink_buffers_by_ratio,
146 write_buffer_manager_memory_bytes,
147 write_buffer_manager_memory_fraction,
148 write_buffer_manager_allow_stall,
149 } = params;
150
151 self.compaction_style = compaction_style;
152 self.optimize_compaction_memtable_budget = optimize_compaction_memtable_budget;
153 self.level_compaction_dynamic_level_bytes = level_compaction_dynamic_level_bytes;
154 self.universal_compaction_target_ratio = universal_compaction_target_ratio;
155 self.parallelism = parallelism;
156 self.compression_type = compression_type;
157 self.bottommost_compression_type = bottommost_compression_type;
158 self.retry_max_duration = retry_max_duration;
159 self.stats_log_interval_seconds = stats_log_interval_seconds;
160 self.stats_persist_interval_seconds = stats_persist_interval_seconds;
161 self.point_lookup_block_cache_size_mb = point_lookup_block_cache_size_mb;
162 self.shrink_buffers_by_ratio = shrink_buffers_by_ratio;
163
164 self.write_buffer_manager_config
165 .write_buffer_manager_memory_bytes = write_buffer_manager_memory_bytes;
166 self.write_buffer_manager_config
167 .write_buffer_manager_memory_fraction = write_buffer_manager_memory_fraction;
168 self.write_buffer_manager_config
169 .write_buffer_manager_allow_stall = write_buffer_manager_allow_stall;
170
171 self.dynamic.batch_size.store(batch_size, Ordering::SeqCst);
173 }
174}
175
176#[derive(Clone, Default)]
177pub struct SharedWriteBufferManager {
178 shared: Arc<Mutex<Weak<WriteBufferManager>>>,
186}
187
188#[derive(Derivative)]
189#[derivative(Debug)]
190pub struct WriteBufferManagerHandle {
193 #[derivative(Debug(format_with = "fmt_write_buffer_manager"))]
194 inner: Arc<WriteBufferManager>,
195}
196
197fn fmt_write_buffer_manager(
198 buf: &Arc<WriteBufferManager>,
199 fmt: &mut std::fmt::Formatter,
200) -> Result<(), std::fmt::Error> {
201 fmt.debug_struct("WriteBufferManager")
202 .field("enabled", &buf.enabled())
203 .field("buffer_size", &buf.get_buffer_size())
204 .field("memory_usage", &buf.get_usage())
205 .finish()
206}
207
208impl SharedWriteBufferManager {
209 pub(crate) fn get_or_init<F>(&self, initializer: F) -> WriteBufferManagerHandle
213 where
214 F: FnOnce() -> WriteBufferManager,
215 {
216 let mut lock = self.shared.lock().expect("lock poisoned");
217
218 let wbm = match lock.upgrade() {
219 Some(wbm) => wbm,
220 None => {
221 let new_wbm: Arc<WriteBufferManager> = Arc::new(initializer());
222 *lock = Arc::downgrade(&new_wbm);
223 new_wbm
224 }
225 };
226 WriteBufferManagerHandle { inner: wbm }
227 }
228
229 pub fn get(&self) -> Option<Arc<WriteBufferManager>> {
233 self.shared.lock().expect("lock poisoned").upgrade()
234 }
235}
236
237trait IntoRocksDBType {
239 type Type;
240 fn into_rocksdb(self) -> Self::Type;
241}
242
243impl IntoRocksDBType for CompactionStyle {
244 type Type = DBCompactionStyle;
245 fn into_rocksdb(self) -> Self::Type {
246 use CompactionStyle::*;
247 match self {
248 Level => DBCompactionStyle::Level,
249 Universal => DBCompactionStyle::Universal,
250 }
251 }
252}
253
254impl IntoRocksDBType for CompressionType {
255 type Type = DBCompressionType;
256 fn into_rocksdb(self) -> Self::Type {
257 use CompressionType::*;
258 match self {
259 Zstd => DBCompressionType::Zstd,
260 Snappy => DBCompressionType::Snappy,
261 Lz4 => DBCompressionType::Lz4,
262 None => DBCompressionType::None,
263 }
264 }
265}
266pub fn apply_to_options(
271 config: &RocksDBConfig,
272 options: &mut rocksdb::Options,
273) -> Option<WriteBufferManagerHandle> {
274 let RocksDBConfig {
275 compaction_style,
276 optimize_compaction_memtable_budget,
277 level_compaction_dynamic_level_bytes,
278 universal_compaction_target_ratio,
279 parallelism,
280 compression_type,
281 bottommost_compression_type,
282 retry_max_duration: _,
283 stats_log_interval_seconds,
284 stats_persist_interval_seconds,
285 point_lookup_block_cache_size_mb,
286 shrink_buffers_by_ratio: _,
287 dynamic: _,
288 shared_write_buffer_manager,
289 write_buffer_manager_config,
290 } = config;
291
292 options.set_compression_type((*compression_type).into_rocksdb());
293
294 if *bottommost_compression_type != CompressionType::None {
295 options.set_bottommost_compression_type((*bottommost_compression_type).into_rocksdb())
296 }
297
298 options.set_compaction_style((*compaction_style).into_rocksdb());
299 match compaction_style {
300 CompactionStyle::Level => {
301 options.optimize_level_style_compaction(*optimize_compaction_memtable_budget);
302 options.set_level_compaction_dynamic_level_bytes(*level_compaction_dynamic_level_bytes);
303 }
304 CompactionStyle::Universal => {
305 options.optimize_universal_style_compaction(*optimize_compaction_memtable_budget);
306 options.set_level_compaction_dynamic_level_bytes(*level_compaction_dynamic_level_bytes);
307
308 let mut universal_options = rocksdb::UniversalCompactOptions::default();
309 universal_options
310 .set_max_size_amplification_percent(*universal_compaction_target_ratio);
311
312 options.set_universal_compaction_options(&universal_options);
313 }
314 }
315
316 let parallelism = if let Some(parallelism) = parallelism {
317 *parallelism
318 } else {
319 num_cpus::get()
322 .try_into()
323 .expect("More than 3 billion cores")
324 };
325 options.increase_parallelism(parallelism);
326
327 options.set_stats_dump_period_sec(*stats_log_interval_seconds);
328 options.set_stats_persist_period_sec(*stats_persist_interval_seconds);
329
330 if let Some(block_cache_size_mb) = point_lookup_block_cache_size_mb {
331 options.optimize_for_point_lookup((*block_cache_size_mb).into());
332 }
333
334 let write_buffer_manager = get_write_buffer_manager(write_buffer_manager_config);
335 let write_buffer_manager_handle = write_buffer_manager.map(|buf| {
336 let handle = shared_write_buffer_manager.get_or_init(|| buf);
337 options.set_write_buffer_manager(&handle.inner);
338 handle
339 });
340
341 write_buffer_manager_handle
342}
343
344pub(crate) fn get_write_buffer_manager(
346 write_buffer_manager_config: &RocksDbWriteBufferManagerConfig,
347) -> Option<WriteBufferManager> {
348 let RocksDbWriteBufferManagerConfig {
349 write_buffer_manager_memory_bytes,
350 write_buffer_manager_memory_fraction,
351 write_buffer_manager_allow_stall,
352 cluster_memory_limit,
353 } = write_buffer_manager_config;
354
355 if write_buffer_manager_memory_bytes.is_some() {
356 let current_cluster_max_buffer_limit =
357 cluster_memory_limit.as_ref().and_then(|cluster_memory| {
358 write_buffer_manager_memory_fraction
359 .map(|fraction| usize::cast_lossy(f64::cast_lossy(*cluster_memory) * fraction))
360 });
361 let write_buffer_manager_bytes = current_cluster_max_buffer_limit
362 .or(*write_buffer_manager_memory_bytes)
363 .unwrap();
364 Some(WriteBufferManager::new_write_buffer_manager(
365 write_buffer_manager_bytes,
366 *write_buffer_manager_allow_stall,
367 ))
368 } else {
369 None
370 }
371}
372
373#[cfg(test)]
374mod test {
375 use super::*;
376
377 #[mz_ore::test]
378 fn dynamic_defaults() {
379 assert_eq!(
380 RocksDBConfig::new(Default::default(), None)
381 .dynamic
382 .batch_size
383 .load(Ordering::SeqCst),
384 defaults::DEFAULT_BATCH_SIZE
385 )
386 }
387
388 #[mz_ore::test]
389 fn test_no_default() {
390 let config = RocksDbWriteBufferManagerConfig {
391 write_buffer_manager_memory_bytes: None,
392 write_buffer_manager_memory_fraction: Some(0.5),
393 write_buffer_manager_allow_stall: false,
394 cluster_memory_limit: Some(1000),
395 };
396
397 let write_buffer_manager = get_write_buffer_manager(&config);
398 assert!(write_buffer_manager.is_none());
399 }
400
401 #[mz_ore::test]
402 fn test_default() {
403 let config = RocksDbWriteBufferManagerConfig {
404 write_buffer_manager_memory_bytes: Some(1000),
405 write_buffer_manager_memory_fraction: Some(0.5),
406 write_buffer_manager_allow_stall: false,
407 cluster_memory_limit: None,
408 };
409
410 let write_buffer_manager = get_write_buffer_manager(&config);
411
412 assert!(write_buffer_manager.is_some());
413 let write_buffer_manager = write_buffer_manager.unwrap();
414 assert!(write_buffer_manager.enabled());
415 assert_eq!(
416 write_buffer_manager.get_buffer_size(),
417 config.write_buffer_manager_memory_bytes.unwrap()
418 )
419 }
420
421 #[mz_ore::test]
422 fn test_calculated_cluster_limit() {
423 let config = RocksDbWriteBufferManagerConfig {
424 write_buffer_manager_memory_bytes: Some(30000),
425 write_buffer_manager_memory_fraction: Some(0.5),
426 write_buffer_manager_allow_stall: false,
427 cluster_memory_limit: Some(2000),
428 };
429
430 let write_buffer_manager = get_write_buffer_manager(&config);
431
432 assert!(write_buffer_manager.is_some());
433 let write_buffer_manager = write_buffer_manager.unwrap();
434 assert!(write_buffer_manager.enabled());
435 assert_eq!(write_buffer_manager.get_buffer_size(), 1000)
437 }
438}