mz_storage/upsert/
rocksdb.rs
1use mz_rocksdb::{KeyUpdate, RocksDBInstance};
13use serde::{Serialize, de::DeserializeOwned};
14
15use super::UpsertKey;
16use super::types::{
17 GetStats, MergeStats, MergeValue, PutStats, PutValue, StateValue, UpsertStateBackend,
18 UpsertValueAndSize, ValueMetadata,
19};
20
21pub struct RocksDB<T, O> {
24 rocksdb: RocksDBInstance<UpsertKey, StateValue<T, O>>,
25}
26
27impl<T, O> RocksDB<T, O> {
28 pub fn new(rocksdb: RocksDBInstance<UpsertKey, StateValue<T, O>>) -> Self {
29 Self { rocksdb }
30 }
31}
32
33#[async_trait::async_trait(?Send)]
34impl<T, O> UpsertStateBackend<T, O> for RocksDB<T, O>
35where
36 O: Send + Sync + Serialize + DeserializeOwned + 'static,
37 T: Send + Sync + Serialize + DeserializeOwned + 'static,
38{
39 fn supports_merge(&self) -> bool {
40 self.rocksdb.supports_merges
41 }
42
43 async fn multi_put<P>(&mut self, puts: P) -> Result<PutStats, anyhow::Error>
44 where
45 P: IntoIterator<Item = (UpsertKey, PutValue<StateValue<T, O>>)>,
46 {
47 let mut p_stats = PutStats::default();
48 let stats = self
49 .rocksdb
50 .multi_update(puts.into_iter().map(
51 |(
52 k,
53 PutValue {
54 value,
55 previous_value_metadata,
56 },
57 )| {
58 p_stats.adjust(value.as_ref(), None, &previous_value_metadata);
59 let value = match value {
60 Some(v) => KeyUpdate::Put(v),
61 None => KeyUpdate::Delete,
62 };
63 (k, value, None)
64 },
65 ))
66 .await?;
67 p_stats.processed_puts += stats.processed_updates;
68 let size: i64 = stats.size_written.try_into().expect("less than i64 size");
69 p_stats.size_diff += size;
70
71 Ok(p_stats)
72 }
73
74 async fn multi_merge<M>(&mut self, merges: M) -> Result<MergeStats, anyhow::Error>
75 where
76 M: IntoIterator<Item = (UpsertKey, MergeValue<StateValue<T, O>>)>,
77 {
78 let mut m_stats = MergeStats::default();
79 let stats =
80 self.rocksdb
81 .multi_update(merges.into_iter().map(|(k, MergeValue { value, diff })| {
82 (k, KeyUpdate::Merge(value), Some(diff))
83 }))
84 .await?;
85 m_stats.written_merge_operands += stats.processed_updates;
86 m_stats.size_written += stats.size_written;
87 if let Some(diff) = stats.size_diff {
88 m_stats.size_diff += diff.into_inner();
89 }
90 Ok(m_stats)
91 }
92
93 async fn multi_get<'r, G, R>(
94 &mut self,
95 gets: G,
96 results_out: R,
97 ) -> Result<GetStats, anyhow::Error>
98 where
99 G: IntoIterator<Item = UpsertKey>,
100 R: IntoIterator<Item = &'r mut UpsertValueAndSize<T, O>>,
101 {
102 let mut g_stats = GetStats::default();
103 let stats = self
104 .rocksdb
105 .multi_get(gets, results_out, |value| {
106 value.map_or(
107 UpsertValueAndSize {
108 value: None,
109 metadata: None,
110 },
111 |v| {
112 let is_tombstone = v.value.is_tombstone();
113 UpsertValueAndSize {
114 value: Some(v.value),
115 metadata: Some(ValueMetadata {
116 size: v.size,
117 is_tombstone,
118 }),
119 }
120 },
121 )
122 })
123 .await?;
124
125 g_stats.processed_gets += stats.processed_gets;
126 g_stats.processed_gets_size += stats.processed_gets_size;
127 g_stats.returned_gets += stats.returned_gets;
128 Ok(g_stats)
129 }
130}