mz_storage/upsert/
rocksdb.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An `UpsertStateBackend` that stores values in RocksDB.
11
12use mz_rocksdb::{KeyUpdate, RocksDBInstance};
13use serde::{Serialize, de::DeserializeOwned};
14
15use super::UpsertKey;
16use super::types::{
17    GetStats, MergeStats, MergeValue, PutStats, PutValue, StateValue, UpsertStateBackend,
18    UpsertValueAndSize, ValueMetadata,
19};
20
21/// A `UpsertStateBackend` implementation backed by RocksDB.
22/// This is currently untested, and simply compiles.
23pub struct RocksDB<T, O> {
24    rocksdb: RocksDBInstance<UpsertKey, StateValue<T, O>>,
25}
26
27impl<T, O> RocksDB<T, O> {
28    pub fn new(rocksdb: RocksDBInstance<UpsertKey, StateValue<T, O>>) -> Self {
29        Self { rocksdb }
30    }
31}
32
33#[async_trait::async_trait(?Send)]
34impl<T, O> UpsertStateBackend<T, O> for RocksDB<T, O>
35where
36    O: Send + Sync + Serialize + DeserializeOwned + 'static,
37    T: Send + Sync + Serialize + DeserializeOwned + 'static,
38{
39    fn supports_merge(&self) -> bool {
40        self.rocksdb.supports_merges
41    }
42
43    async fn multi_put<P>(&mut self, puts: P) -> Result<PutStats, anyhow::Error>
44    where
45        P: IntoIterator<Item = (UpsertKey, PutValue<StateValue<T, O>>)>,
46    {
47        let mut p_stats = PutStats::default();
48        let stats = self
49            .rocksdb
50            .multi_update(puts.into_iter().map(
51                |(
52                    k,
53                    PutValue {
54                        value,
55                        previous_value_metadata,
56                    },
57                )| {
58                    p_stats.adjust(value.as_ref(), None, &previous_value_metadata);
59                    let value = match value {
60                        Some(v) => KeyUpdate::Put(v),
61                        None => KeyUpdate::Delete,
62                    };
63                    (k, value, None)
64                },
65            ))
66            .await?;
67        p_stats.processed_puts += stats.processed_updates;
68        let size: i64 = stats.size_written.try_into().expect("less than i64 size");
69        p_stats.size_diff += size;
70
71        Ok(p_stats)
72    }
73
74    async fn multi_merge<M>(&mut self, merges: M) -> Result<MergeStats, anyhow::Error>
75    where
76        M: IntoIterator<Item = (UpsertKey, MergeValue<StateValue<T, O>>)>,
77    {
78        let mut m_stats = MergeStats::default();
79        let stats =
80            self.rocksdb
81                .multi_update(merges.into_iter().map(|(k, MergeValue { value, diff })| {
82                    (k, KeyUpdate::Merge(value), Some(diff))
83                }))
84                .await?;
85        m_stats.written_merge_operands += stats.processed_updates;
86        m_stats.size_written += stats.size_written;
87        if let Some(diff) = stats.size_diff {
88            m_stats.size_diff += diff.into_inner();
89        }
90        Ok(m_stats)
91    }
92
93    async fn multi_get<'r, G, R>(
94        &mut self,
95        gets: G,
96        results_out: R,
97    ) -> Result<GetStats, anyhow::Error>
98    where
99        G: IntoIterator<Item = UpsertKey>,
100        R: IntoIterator<Item = &'r mut UpsertValueAndSize<T, O>>,
101    {
102        let mut g_stats = GetStats::default();
103        let stats = self
104            .rocksdb
105            .multi_get(gets, results_out, |value| {
106                value.map_or(
107                    UpsertValueAndSize {
108                        value: None,
109                        metadata: None,
110                    },
111                    |v| {
112                        let is_tombstone = v.value.is_tombstone();
113                        UpsertValueAndSize {
114                            value: Some(v.value),
115                            metadata: Some(ValueMetadata {
116                                size: v.size,
117                                is_tombstone,
118                            }),
119                        }
120                    },
121                )
122            })
123            .await?;
124
125        g_stats.processed_gets += stats.processed_gets;
126        g_stats.processed_gets_size += stats.processed_gets_size;
127        g_stats.returned_gets += stats.returned_gets;
128        Ok(g_stats)
129    }
130}