mz_storage/upsert/
memory.rs
1#![allow(clippy::disallowed_types)]
16
17use std::collections::HashMap;
18use std::collections::hash_map::Drain;
19
20use itertools::Itertools;
21
22use super::UpsertKey;
23use super::types::{
24 GetStats, MergeStats, MergeValue, PutStats, PutValue, StateValue, UpsertStateBackend,
25 UpsertValueAndSize, ValueMetadata,
26};
27
28pub struct InMemoryHashMap<T, O> {
30 state: HashMap<UpsertKey, StateValue<T, O>>,
31 total_size: i64,
32}
33
34impl<T, O> InMemoryHashMap<T, O> {
35 pub fn drain(&mut self) -> (i64, Drain<'_, UpsertKey, StateValue<T, O>>) {
37 let last_size = self.total_size;
38 self.total_size = 0;
39
40 (last_size, self.state.drain())
41 }
42
43 pub fn current_size(&self) -> i64 {
45 self.total_size
46 }
47}
48
49impl<T, O> Default for InMemoryHashMap<T, O> {
50 fn default() -> Self {
51 Self {
52 state: HashMap::new(),
53 total_size: 0,
54 }
55 }
56}
57
58#[async_trait::async_trait(?Send)]
59impl<T, O> UpsertStateBackend<T, O> for InMemoryHashMap<T, O>
60where
61 O: Clone + 'static,
62 T: Clone + 'static,
63{
64 fn supports_merge(&self) -> bool {
65 false
66 }
67
68 async fn multi_put<P>(&mut self, puts: P) -> Result<PutStats, anyhow::Error>
69 where
70 P: IntoIterator<Item = (UpsertKey, PutValue<StateValue<T, O>>)>,
71 {
72 let mut stats = PutStats::default();
73 for (key, p_value) in puts {
74 stats.processed_puts += 1;
75 match p_value.value {
76 Some(value) => {
77 let size: i64 = value.memory_size().try_into().expect("less than i64 size");
78 stats.adjust(Some(&value), Some(size), &p_value.previous_value_metadata);
79 self.state.insert(key, value);
80 }
81 None => {
82 stats.adjust::<T, O>(None, None, &p_value.previous_value_metadata);
83 self.state.remove(&key);
84 }
85 }
86 }
87 self.total_size += stats.size_diff;
88 Ok(stats)
89 }
90
91 async fn multi_merge<M>(&mut self, _merges: M) -> Result<MergeStats, anyhow::Error>
92 where
93 M: IntoIterator<Item = (UpsertKey, MergeValue<StateValue<T, O>>)>,
94 {
95 anyhow::bail!("InMemoryHashMap does not support merging");
96 }
97
98 async fn multi_get<'r, G, R>(
99 &mut self,
100 gets: G,
101 results_out: R,
102 ) -> Result<GetStats, anyhow::Error>
103 where
104 G: IntoIterator<Item = UpsertKey>,
105 R: IntoIterator<Item = &'r mut UpsertValueAndSize<T, O>>,
106 {
107 let mut stats = GetStats::default();
108 for (key, result_out) in gets.into_iter().zip_eq(results_out) {
109 stats.processed_gets += 1;
110 let value = self.state.get(&key).cloned();
111 let metadata = value.as_ref().map(|v| ValueMetadata {
112 size: v.memory_size(),
113 is_tombstone: v.is_tombstone(),
114 });
115 stats.processed_gets_size += metadata.map_or(0, |m| m.size);
116 stats.returned_gets += metadata.map_or(0, |_| 1);
117 *result_out = UpsertValueAndSize { value, metadata };
118 }
119 Ok(stats)
120 }
121}