1#![allow(missing_docs)]
13
14use std::fmt::Debug;
41use std::str::FromStr;
42use std::time::Duration;
43
44use mz_ore::cast::CastFrom;
45use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
46
47use proptest_derive::Arbitrary;
48use serde::{Deserialize, Serialize};
49use uncased::UncasedStr;
50
51include!(concat!(env!("OUT_DIR"), "/mz_rocksdb_types.config.rs"));
52
53#[derive(Serialize, Deserialize, PartialEq, Clone, Debug, Arbitrary)]
57pub struct RocksDBTuningParameters {
58 pub compaction_style: CompactionStyle,
71 pub optimize_compaction_memtable_budget: usize,
81
82 pub level_compaction_dynamic_level_bytes: bool,
93
94 pub universal_compaction_target_ratio: i32,
99
100 pub parallelism: Option<i32>,
107
108 pub compression_type: CompressionType,
121
122 pub bottommost_compression_type: CompressionType,
124
125 pub batch_size: usize,
127
128 pub retry_max_duration: Duration,
130
131 pub stats_log_interval_seconds: u32,
133
134 pub stats_persist_interval_seconds: u32,
136
137 pub point_lookup_block_cache_size_mb: Option<u32>,
141
142 pub shrink_buffers_by_ratio: usize,
147
148 pub write_buffer_manager_memory_bytes: Option<usize>,
151 pub write_buffer_manager_memory_fraction: Option<f64>,
153 pub write_buffer_manager_allow_stall: bool,
155}
156
157impl Default for RocksDBTuningParameters {
158 fn default() -> Self {
159 Self {
160 compaction_style: defaults::DEFAULT_COMPACTION_STYLE,
161 optimize_compaction_memtable_budget:
162 defaults::DEFAULT_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET,
163 level_compaction_dynamic_level_bytes:
164 defaults::DEFAULT_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES,
165 universal_compaction_target_ratio: defaults::DEFAULT_UNIVERSAL_COMPACTION_RATIO,
166 parallelism: defaults::DEFAULT_PARALLELISM,
167 compression_type: defaults::DEFAULT_COMPRESSION_TYPE,
168 bottommost_compression_type: defaults::DEFAULT_BOTTOMMOST_COMPRESSION_TYPE,
169 batch_size: defaults::DEFAULT_BATCH_SIZE,
170 retry_max_duration: defaults::DEFAULT_RETRY_DURATION,
171 stats_log_interval_seconds: defaults::DEFAULT_STATS_LOG_INTERVAL_S,
172 stats_persist_interval_seconds: defaults::DEFAULT_STATS_PERSIST_INTERVAL_S,
173 point_lookup_block_cache_size_mb: None,
174 shrink_buffers_by_ratio: defaults::DEFAULT_SHRINK_BUFFERS_BY_RATIO,
175 write_buffer_manager_memory_bytes: None,
176 write_buffer_manager_memory_fraction: None,
177 write_buffer_manager_allow_stall: false,
178 }
179 }
180}
181
182impl RocksDBTuningParameters {
183 pub fn from_parameters(
185 compaction_style: CompactionStyle,
186 optimize_compaction_memtable_budget: usize,
187 level_compaction_dynamic_level_bytes: bool,
188 universal_compaction_target_ratio: i32,
189 parallelism: Option<i32>,
190 compression_type: CompressionType,
191 bottommost_compression_type: CompressionType,
192 batch_size: usize,
193 retry_max_duration: Duration,
194 stats_log_interval_seconds: u32,
195 stats_persist_interval_seconds: u32,
196 point_lookup_block_cache_size_mb: Option<u32>,
197 shrink_buffers_by_ratio: usize,
198 write_buffer_manager_memory_bytes: Option<usize>,
199 write_buffer_manager_memory_fraction: Option<f64>,
200 write_buffer_manager_allow_stall: bool,
201 ) -> Result<Self, anyhow::Error> {
202 Ok(Self {
203 compaction_style,
204 optimize_compaction_memtable_budget,
205 level_compaction_dynamic_level_bytes,
206 universal_compaction_target_ratio: if universal_compaction_target_ratio > 100 {
207 universal_compaction_target_ratio
208 } else {
209 return Err(anyhow::anyhow!(
210 "universal_compaction_target_ratio ({}) must be > 100",
211 universal_compaction_target_ratio
212 ));
213 },
214 parallelism: match parallelism {
215 Some(parallelism) => {
216 if parallelism < 1 {
217 return Err(anyhow::anyhow!(
218 "parallelism({}) must be > 1, or not specified",
219 universal_compaction_target_ratio
220 ));
221 }
222 Some(parallelism)
223 }
224 None => None,
225 },
226 compression_type,
227 bottommost_compression_type,
228 batch_size,
229 retry_max_duration,
230 stats_log_interval_seconds,
231 stats_persist_interval_seconds,
232 point_lookup_block_cache_size_mb,
233 shrink_buffers_by_ratio,
234 write_buffer_manager_memory_bytes,
235 write_buffer_manager_memory_fraction,
236 write_buffer_manager_allow_stall,
237 })
238 }
239}
240
241#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Debug, Arbitrary)]
244pub enum CompactionStyle {
245 Level,
246 Universal,
247}
248
249impl FromStr for CompactionStyle {
250 type Err = anyhow::Error;
251
252 fn from_str(s: &str) -> Result<Self, Self::Err> {
253 let s = UncasedStr::new(s);
254 if s == "level" {
255 Ok(Self::Level)
256 } else if s == "universal" {
257 Ok(Self::Universal)
258 } else {
259 Err(anyhow::anyhow!("{} is not a supported compaction style", s))
260 }
261 }
262}
263
264impl std::fmt::Display for CompactionStyle {
265 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
266 match self {
267 CompactionStyle::Level => write!(f, "level"),
268 CompactionStyle::Universal => write!(f, "universal"),
269 }
270 }
271}
272
273#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Debug, Arbitrary)]
276pub enum CompressionType {
277 Zstd,
278 Snappy,
279 Lz4,
280 None,
281}
282
283impl FromStr for CompressionType {
284 type Err = anyhow::Error;
285
286 fn from_str(s: &str) -> Result<Self, Self::Err> {
287 let s = UncasedStr::new(s);
288 if s == "zstd" {
289 Ok(Self::Zstd)
290 } else if s == "snappy" {
291 Ok(Self::Snappy)
292 } else if s == "lz4" {
293 Ok(Self::Lz4)
294 } else if s == "none" {
295 Ok(Self::None)
296 } else {
297 Err(anyhow::anyhow!("{} is not a supported compression type", s))
298 }
299 }
300}
301
302impl std::fmt::Display for CompressionType {
303 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
304 match self {
305 CompressionType::Zstd => write!(f, "zstd"),
306 CompressionType::Snappy => write!(f, "snappy"),
307 CompressionType::Lz4 => write!(f, "lz4"),
308 CompressionType::None => write!(f, "none"),
309 }
310 }
311}
312
313impl RustType<ProtoRocksDbTuningParameters> for RocksDBTuningParameters {
314 fn into_proto(&self) -> ProtoRocksDbTuningParameters {
315 use proto_rocks_db_tuning_parameters::{
316 ProtoCompactionStyle, ProtoCompressionType, proto_compaction_style,
317 proto_compression_type,
318 };
319
320 fn compression_into_proto(compression_type: &CompressionType) -> ProtoCompressionType {
321 ProtoCompressionType {
322 kind: Some(match compression_type {
323 CompressionType::Zstd => proto_compression_type::Kind::Zstd(()),
324 CompressionType::Snappy => proto_compression_type::Kind::Snappy(()),
325 CompressionType::Lz4 => proto_compression_type::Kind::Lz4(()),
326 CompressionType::None => proto_compression_type::Kind::None(()),
327 }),
328 }
329 }
330 ProtoRocksDbTuningParameters {
331 compaction_style: Some(ProtoCompactionStyle {
332 kind: Some(match self.compaction_style {
333 CompactionStyle::Level => proto_compaction_style::Kind::Level(()),
334 CompactionStyle::Universal => proto_compaction_style::Kind::Universal(()),
335 }),
336 }),
337 optimize_compaction_memtable_budget: u64::cast_from(
338 self.optimize_compaction_memtable_budget,
339 ),
340 level_compaction_dynamic_level_bytes: self.level_compaction_dynamic_level_bytes,
341 universal_compaction_target_ratio: self.universal_compaction_target_ratio,
342 parallelism: self.parallelism,
343 compression_type: Some(compression_into_proto(&self.compression_type)),
344 bottommost_compression_type: Some(compression_into_proto(
345 &self.bottommost_compression_type,
346 )),
347 batch_size: u64::cast_from(self.batch_size),
348 retry_max_duration: Some(self.retry_max_duration.into_proto()),
349 stats_log_interval_seconds: self.stats_log_interval_seconds,
350 stats_persist_interval_seconds: self.stats_persist_interval_seconds,
351 point_lookup_block_cache_size_mb: self.point_lookup_block_cache_size_mb,
352 shrink_buffers_by_ratio: u64::cast_from(self.shrink_buffers_by_ratio),
353 write_buffer_manager_memory_bytes: self
354 .write_buffer_manager_memory_bytes
355 .map(u64::cast_from),
356 write_buffer_manager_memory_fraction: self.write_buffer_manager_memory_fraction,
357 write_buffer_manager_allow_stall: self.write_buffer_manager_allow_stall,
358 }
359 }
360
361 fn from_proto(proto: ProtoRocksDbTuningParameters) -> Result<Self, TryFromProtoError> {
362 use proto_rocks_db_tuning_parameters::{
363 ProtoCompactionStyle, ProtoCompressionType, proto_compaction_style,
364 proto_compression_type,
365 };
366
367 fn compression_from_proto(
368 compression_type: Option<ProtoCompressionType>,
369 ) -> Result<CompressionType, TryFromProtoError> {
370 match compression_type {
371 Some(ProtoCompressionType {
372 kind: Some(proto_compression_type::Kind::Zstd(())),
373 }) => Ok(CompressionType::Zstd),
374 Some(ProtoCompressionType {
375 kind: Some(proto_compression_type::Kind::Snappy(())),
376 }) => Ok(CompressionType::Snappy),
377 Some(ProtoCompressionType {
378 kind: Some(proto_compression_type::Kind::Lz4(())),
379 }) => Ok(CompressionType::Lz4),
380 Some(ProtoCompressionType {
381 kind: Some(proto_compression_type::Kind::None(())),
382 }) => Ok(CompressionType::None),
383 Some(ProtoCompressionType { kind: None }) => Err(TryFromProtoError::MissingField(
384 "ProtoRocksDbTuningParameters::compression_type::kind".into(),
385 )),
386 None => Err(TryFromProtoError::MissingField(
387 "ProtoRocksDbTuningParameters::compression_type".into(),
388 )),
389 }
390 }
391 Ok(Self {
392 compaction_style: match proto.compaction_style {
393 Some(ProtoCompactionStyle {
394 kind: Some(proto_compaction_style::Kind::Level(())),
395 }) => CompactionStyle::Level,
396 Some(ProtoCompactionStyle {
397 kind: Some(proto_compaction_style::Kind::Universal(())),
398 }) => CompactionStyle::Universal,
399 Some(ProtoCompactionStyle { kind: None }) => {
400 return Err(TryFromProtoError::MissingField(
401 "ProtoRocksDbTuningParameters::compaction_style::kind".into(),
402 ));
403 }
404 None => {
405 return Err(TryFromProtoError::MissingField(
406 "ProtoRocksDbTuningParameters::compaction_style".into(),
407 ));
408 }
409 },
410 optimize_compaction_memtable_budget: usize::cast_from(
411 proto.optimize_compaction_memtable_budget,
412 ),
413 level_compaction_dynamic_level_bytes: proto.level_compaction_dynamic_level_bytes,
414 universal_compaction_target_ratio: proto.universal_compaction_target_ratio,
415 parallelism: proto.parallelism,
416 compression_type: compression_from_proto(proto.compression_type)?,
417 bottommost_compression_type: compression_from_proto(proto.bottommost_compression_type)?,
418 batch_size: usize::cast_from(proto.batch_size),
419 retry_max_duration: proto
420 .retry_max_duration
421 .into_rust_if_some("ProtoRocksDbTuningParameters::retry_max_duration")?,
422 stats_log_interval_seconds: proto.stats_log_interval_seconds,
423 stats_persist_interval_seconds: proto.stats_persist_interval_seconds,
424 point_lookup_block_cache_size_mb: proto.point_lookup_block_cache_size_mb,
425 shrink_buffers_by_ratio: usize::cast_from(proto.shrink_buffers_by_ratio),
426 write_buffer_manager_memory_bytes: proto
427 .write_buffer_manager_memory_bytes
428 .map(usize::cast_from),
429 write_buffer_manager_memory_fraction: proto.write_buffer_manager_memory_fraction,
430 write_buffer_manager_allow_stall: proto.write_buffer_manager_allow_stall,
431 })
432 }
433}
434
435#[derive(Clone, Debug)]
436pub struct RocksDbWriteBufferManagerConfig {
437 pub write_buffer_manager_memory_bytes: Option<usize>,
440 pub write_buffer_manager_memory_fraction: Option<f64>,
442 pub write_buffer_manager_allow_stall: bool,
444 pub cluster_memory_limit: Option<usize>,
447}
448
449pub mod defaults {
452 use std::time::Duration;
453
454 use super::*;
455
456 pub const DEFAULT_COMPACTION_STYLE: CompactionStyle = CompactionStyle::Level;
457
458 pub const DEFAULT_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET: usize = 512 * 1024 * 1024 / 3;
461
462 pub const DEFAULT_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES: bool = true;
463
464 pub const DEFAULT_UNIVERSAL_COMPACTION_RATIO: i32 = 200;
466
467 pub const DEFAULT_PARALLELISM: Option<i32> = None;
468
469 pub const DEFAULT_COMPRESSION_TYPE: CompressionType = CompressionType::Lz4;
470
471 pub const DEFAULT_BOTTOMMOST_COMPRESSION_TYPE: CompressionType = CompressionType::Lz4;
472
473 pub const DEFAULT_BATCH_SIZE: usize = 20 * 1024;
477
478 pub const DEFAULT_RETRY_DURATION: Duration = Duration::from_secs(1);
480
481 pub const DEFAULT_AUTO_SPILL_MEMORY_THRESHOLD: usize =
488 DEFAULT_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET / 4 * 2;
489
490 pub const DEFAULT_STATS_LOG_INTERVAL_S: u32 = 600;
492
493 pub const DEFAULT_STATS_PERSIST_INTERVAL_S: u32 = 600;
495
496 pub const DEFAULT_SHRINK_BUFFERS_BY_RATIO: usize = 0;
498
499 pub const DEFAULT_WRITE_BUFFER_MANAGER_ALLOW_STALL: bool = false;
501}
502
503#[cfg(test)]
504mod tests {
505 use mz_ore::assert_ok;
506 use mz_proto::protobuf_roundtrip;
507 use proptest::prelude::*;
508
509 use super::*;
510
511 #[mz_ore::test]
512 fn defaults_equality() {
513 let r = RocksDBTuningParameters::from_parameters(
514 defaults::DEFAULT_COMPACTION_STYLE,
515 defaults::DEFAULT_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET,
516 defaults::DEFAULT_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES,
517 defaults::DEFAULT_UNIVERSAL_COMPACTION_RATIO,
518 defaults::DEFAULT_PARALLELISM,
519 defaults::DEFAULT_COMPRESSION_TYPE,
520 defaults::DEFAULT_BOTTOMMOST_COMPRESSION_TYPE,
521 defaults::DEFAULT_BATCH_SIZE,
522 defaults::DEFAULT_RETRY_DURATION,
523 defaults::DEFAULT_STATS_LOG_INTERVAL_S,
524 defaults::DEFAULT_STATS_PERSIST_INTERVAL_S,
525 None,
526 defaults::DEFAULT_SHRINK_BUFFERS_BY_RATIO,
527 None,
528 None,
529 defaults::DEFAULT_WRITE_BUFFER_MANAGER_ALLOW_STALL,
530 )
531 .unwrap();
532
533 assert_eq!(r, RocksDBTuningParameters::default());
534 }
535
536 #[mz_ore::test]
537 #[cfg_attr(miri, ignore)] fn rocksdb_tuning_roundtrip() {
539 proptest!(|(expect in any::<RocksDBTuningParameters>())| {
540 let actual = protobuf_roundtrip::<_, ProtoRocksDbTuningParameters>(&expect);
541 assert_ok!(actual);
542 assert_eq!(actual.unwrap(), expect);
543
544 });
545 }
546}