1use std::sync::Arc;
14
15use mz_ore::metrics::DeleteOnDropGauge;
16use prometheus::core::AtomicU64;
17use serde::{Serialize, de::DeserializeOwned};
18
19use super::UpsertKey;
20use super::memory::InMemoryHashMap;
21use super::rocksdb::RocksDB;
22use super::types::{
23 GetStats, MergeStats, MergeValue, PutStats, PutValue, StateValue, UpsertStateBackend,
24 UpsertValueAndSize,
25};
26
27pub enum BackendType<T, O> {
28 InMemory(InMemoryHashMap<T, O>),
29 RocksDb(RocksDB<T, O>),
30}
31
32pub struct AutoSpillBackend<T, O, F> {
33 backend_type: BackendType<T, O>,
34 auto_spill_threshold_bytes: usize,
35 rocksdb_autospill_in_use: Arc<DeleteOnDropGauge<AtomicU64, Vec<String>>>,
36 rocksdb_init_fn: Option<F>,
37}
38
39impl<T, O, F, Fut> AutoSpillBackend<T, O, F>
40where
41 O: Clone + Send + Sync + Serialize + DeserializeOwned + 'static,
42 F: FnOnce() -> Fut + 'static,
43 Fut: std::future::Future<Output = RocksDB<T, O>>,
44{
45 pub(crate) fn new(
46 rocksdb_init_fn: F,
47 auto_spill_threshold_bytes: usize,
48 rocksdb_autospill_in_use: Arc<DeleteOnDropGauge<AtomicU64, Vec<String>>>,
49 ) -> Self {
50 rocksdb_autospill_in_use.set(0);
52 Self {
53 backend_type: BackendType::InMemory(InMemoryHashMap::default()),
54 rocksdb_init_fn: Some(rocksdb_init_fn),
55 auto_spill_threshold_bytes,
56 rocksdb_autospill_in_use,
57 }
58 }
59}
60
61#[async_trait::async_trait(?Send)]
62impl<T, O, F, Fut> UpsertStateBackend<T, O> for AutoSpillBackend<T, O, F>
63where
64 O: Clone + Send + Sync + Serialize + DeserializeOwned + 'static,
65 T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static,
66 F: FnOnce() -> Fut + 'static,
67 Fut: std::future::Future<Output = RocksDB<T, O>>,
68{
69 fn supports_merge(&self) -> bool {
70 match &self.backend_type {
73 BackendType::InMemory(_) => false,
74 BackendType::RocksDb(backend) => backend.supports_merge(),
75 }
76 }
77
78 async fn multi_put<P>(&mut self, puts: P) -> Result<PutStats, anyhow::Error>
79 where
80 P: IntoIterator<Item = (UpsertKey, PutValue<StateValue<T, O>>)>,
81 {
82 match &mut self.backend_type {
85 BackendType::InMemory(map) => {
86 let mut put_stats = map.multi_put(puts).await?;
87 let in_memory_size: usize = map
88 .current_size()
89 .try_into()
90 .expect("unexpected error while casting");
91 if in_memory_size > self.auto_spill_threshold_bytes {
92 tracing::info!(%in_memory_size, %self.auto_spill_threshold_bytes, "spilling to disk for upsert");
93 let mut rocksdb_backend =
94 self.rocksdb_init_fn
95 .take()
96 .expect("Can only initialize once")()
97 .await;
98
99 let (last_known_size, new_puts) = map.drain();
100 let new_puts = new_puts.map(|(k, v)| {
101 (
102 k,
103 PutValue {
104 value: Some(v),
105 previous_value_metadata: None,
106 },
107 )
108 });
109
110 let rocksdb_stats = rocksdb_backend.multi_put(new_puts).await?;
111 put_stats.size_diff += rocksdb_stats.size_diff;
113 put_stats.size_diff -= last_known_size;
114 self.backend_type = BackendType::RocksDb(rocksdb_backend);
116 self.rocksdb_autospill_in_use.set(1);
118 }
119 Ok(put_stats)
120 }
121 BackendType::RocksDb(rocks_db) => rocks_db.multi_put(puts).await,
122 }
123 }
124
125 async fn multi_merge<M>(&mut self, merges: M) -> Result<MergeStats, anyhow::Error>
126 where
127 M: IntoIterator<Item = (UpsertKey, MergeValue<StateValue<T, O>>)>,
128 {
129 match &mut self.backend_type {
130 BackendType::InMemory(_) => {
131 anyhow::bail!("InMemoryHashMap does not support merging");
132 }
133 BackendType::RocksDb(rocks_db) => rocks_db.multi_merge(merges).await,
134 }
135 }
136
137 async fn multi_get<'r, G, R>(
138 &mut self,
139 gets: G,
140 results_out: R,
141 ) -> Result<GetStats, anyhow::Error>
142 where
143 G: IntoIterator<Item = UpsertKey>,
144 R: IntoIterator<Item = &'r mut UpsertValueAndSize<T, O>>,
145 O: 'r,
146 {
147 match &mut self.backend_type {
148 BackendType::InMemory(in_memory_hash_map) => {
149 in_memory_hash_map.multi_get(gets, results_out).await
150 }
151 BackendType::RocksDb(rocks_db) => rocks_db.multi_get(gets, results_out).await,
152 }
153 }
154}