mz_storage/upsert/
autospill.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An `UpsertStateBackend` that starts in memory and spills to RocksDB
11//! when the total size passes some threshold.
12
13use std::sync::Arc;
14
15use mz_ore::metrics::DeleteOnDropGauge;
16use prometheus::core::AtomicU64;
17use serde::{Serialize, de::DeserializeOwned};
18
19use super::UpsertKey;
20use super::memory::InMemoryHashMap;
21use super::rocksdb::RocksDB;
22use super::types::{
23    GetStats, MergeStats, MergeValue, PutStats, PutValue, StateValue, UpsertStateBackend,
24    UpsertValueAndSize,
25};
26
27pub enum BackendType<T, O> {
28    InMemory(InMemoryHashMap<T, O>),
29    RocksDb(RocksDB<T, O>),
30}
31
32pub struct AutoSpillBackend<T, O, F> {
33    backend_type: BackendType<T, O>,
34    auto_spill_threshold_bytes: usize,
35    rocksdb_autospill_in_use: Arc<DeleteOnDropGauge<AtomicU64, Vec<String>>>,
36    rocksdb_init_fn: Option<F>,
37}
38
39impl<T, O, F, Fut> AutoSpillBackend<T, O, F>
40where
41    O: Clone + Send + Sync + Serialize + DeserializeOwned + 'static,
42    F: FnOnce() -> Fut + 'static,
43    Fut: std::future::Future<Output = RocksDB<T, O>>,
44{
45    pub(crate) fn new(
46        rocksdb_init_fn: F,
47        auto_spill_threshold_bytes: usize,
48        rocksdb_autospill_in_use: Arc<DeleteOnDropGauge<AtomicU64, Vec<String>>>,
49    ) -> Self {
50        // Initializing the metric to 0, to reflect in memory hash map is being used
51        rocksdb_autospill_in_use.set(0);
52        Self {
53            backend_type: BackendType::InMemory(InMemoryHashMap::default()),
54            rocksdb_init_fn: Some(rocksdb_init_fn),
55            auto_spill_threshold_bytes,
56            rocksdb_autospill_in_use,
57        }
58    }
59}
60
61#[async_trait::async_trait(?Send)]
62impl<T, O, F, Fut> UpsertStateBackend<T, O> for AutoSpillBackend<T, O, F>
63where
64    O: Clone + Send + Sync + Serialize + DeserializeOwned + 'static,
65    T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static,
66    F: FnOnce() -> Fut + 'static,
67    Fut: std::future::Future<Output = RocksDB<T, O>>,
68{
69    fn supports_merge(&self) -> bool {
70        // We only support merge if the backend supports it; the in-memory backend does not
71        // and the rocksdb backend does if configure to do so.
72        match &self.backend_type {
73            BackendType::InMemory(_) => false,
74            BackendType::RocksDb(backend) => backend.supports_merge(),
75        }
76    }
77
78    async fn multi_put<P>(&mut self, puts: P) -> Result<PutStats, anyhow::Error>
79    where
80        P: IntoIterator<Item = (UpsertKey, PutValue<StateValue<T, O>>)>,
81    {
82        // Note that we never revert back to memory if the size shrinks below the threshold.
83        // That case is considered rare and not worth the complexity.
84        match &mut self.backend_type {
85            BackendType::InMemory(map) => {
86                let mut put_stats = map.multi_put(puts).await?;
87                let in_memory_size: usize = map
88                    .current_size()
89                    .try_into()
90                    .expect("unexpected error while casting");
91                if in_memory_size > self.auto_spill_threshold_bytes {
92                    tracing::info!(%in_memory_size, %self.auto_spill_threshold_bytes, "spilling to disk for upsert");
93                    let mut rocksdb_backend =
94                        self.rocksdb_init_fn
95                            .take()
96                            .expect("Can only initialize once")()
97                        .await;
98
99                    let (last_known_size, new_puts) = map.drain();
100                    let new_puts = new_puts.map(|(k, v)| {
101                        (
102                            k,
103                            PutValue {
104                                value: Some(v),
105                                previous_value_metadata: None,
106                            },
107                        )
108                    });
109
110                    let rocksdb_stats = rocksdb_backend.multi_put(new_puts).await?;
111                    // Adjusting the sizes as the value sizes in rocksdb could be different than in memory
112                    put_stats.size_diff += rocksdb_stats.size_diff;
113                    put_stats.size_diff -= last_known_size;
114                    // Setting backend to rocksdb
115                    self.backend_type = BackendType::RocksDb(rocksdb_backend);
116                    // Switching metric to 1 for rocksdb
117                    self.rocksdb_autospill_in_use.set(1);
118                }
119                Ok(put_stats)
120            }
121            BackendType::RocksDb(rocks_db) => rocks_db.multi_put(puts).await,
122        }
123    }
124
125    async fn multi_merge<M>(&mut self, merges: M) -> Result<MergeStats, anyhow::Error>
126    where
127        M: IntoIterator<Item = (UpsertKey, MergeValue<StateValue<T, O>>)>,
128    {
129        match &mut self.backend_type {
130            BackendType::InMemory(_) => {
131                anyhow::bail!("InMemoryHashMap does not support merging");
132            }
133            BackendType::RocksDb(rocks_db) => rocks_db.multi_merge(merges).await,
134        }
135    }
136
137    async fn multi_get<'r, G, R>(
138        &mut self,
139        gets: G,
140        results_out: R,
141    ) -> Result<GetStats, anyhow::Error>
142    where
143        G: IntoIterator<Item = UpsertKey>,
144        R: IntoIterator<Item = &'r mut UpsertValueAndSize<T, O>>,
145        O: 'r,
146    {
147        match &mut self.backend_type {
148            BackendType::InMemory(in_memory_hash_map) => {
149                in_memory_hash_map.multi_get(gets, results_out).await
150            }
151            BackendType::RocksDb(rocks_db) => rocks_db.multi_get(gets, results_out).await,
152        }
153    }
154}