1use std::future::Future;
13use std::sync::{Arc, Mutex};
14use std::time::{Instant, UNIX_EPOCH};
15
16use anyhow::anyhow;
17use async_trait::async_trait;
18use bytes::Bytes;
19use mz_ore::bytes::SegmentedBytes;
20use rand::prelude::SmallRng;
21use rand::{Rng, SeedableRng};
22use tracing::trace;
23
24use crate::location::{
25 Blob, BlobMetadata, CaSResult, Consensus, Determinate, ExternalError, ResultStream, SeqNo,
26 VersionedData,
27};
28
29#[derive(Debug)]
30struct UnreliableCore {
31 rng: SmallRng,
32 should_happen: f64,
33 should_timeout: f64,
34 }
36
37#[derive(Clone, Debug)]
39pub struct UnreliableHandle {
40 core: Arc<Mutex<UnreliableCore>>,
41}
42
43impl Default for UnreliableHandle {
44 fn default() -> Self {
45 let seed = UNIX_EPOCH
46 .elapsed()
47 .map_or(0, |x| u64::from(x.subsec_nanos()));
48 Self::new(seed, 0.95, 0.05)
49 }
50}
51
52impl UnreliableHandle {
53 pub fn new(seed: u64, should_happen: f64, should_timeout: f64) -> Self {
55 assert!(should_happen >= 0.0);
56 assert!(should_happen <= 1.0);
57 assert!(should_timeout >= 0.0);
58 assert!(should_timeout <= 1.0);
59 let core = UnreliableCore {
60 rng: SmallRng::seed_from_u64(seed),
61 should_happen,
62 should_timeout,
63 };
64 UnreliableHandle {
65 core: Arc::new(Mutex::new(core)),
66 }
67 }
68
69 pub fn partially_available(&self, should_happen: f64, should_timeout: f64) {
71 assert!(should_happen >= 0.0);
72 assert!(should_happen <= 1.0);
73 assert!(should_timeout >= 0.0);
74 assert!(should_timeout <= 1.0);
75 let mut core = self.core.lock().expect("mutex poisoned");
76 core.should_happen = should_happen;
77 core.should_timeout = should_timeout;
78 }
79
80 pub fn totally_unavailable(&self) {
82 self.partially_available(0.0, 1.0);
83 }
84
85 pub fn totally_available(&self) {
87 self.partially_available(1.0, 0.0);
88 }
89
90 fn should_happen(&self) -> bool {
91 let mut core = self.core.lock().expect("mutex poisoned");
92 let should_happen = core.should_happen;
93 core.rng.random_bool(should_happen)
94 }
95
96 fn should_timeout(&self) -> bool {
97 let mut core = self.core.lock().expect("mutex poisoned");
98 let should_timeout = core.should_timeout;
99 core.rng.random_bool(should_timeout)
100 }
101
102 async fn run_op<R, F, WorkFn>(&self, name: &str, work_fn: WorkFn) -> Result<R, ExternalError>
103 where
104 F: Future<Output = Result<R, ExternalError>>,
105 WorkFn: FnOnce() -> F,
106 {
107 let (should_happen, should_timeout) = (self.should_happen(), self.should_timeout());
108 trace!(
109 "unreliable {} should_happen={} should_timeout={}",
110 name, should_happen, should_timeout,
111 );
112 match (should_happen, should_timeout) {
113 (true, true) => {
114 let _res = work_fn().await;
115 Err(ExternalError::new_timeout(Instant::now()))
116 }
117 (true, false) => work_fn().await,
118 (false, true) => Err(ExternalError::new_timeout(Instant::now())),
119 (false, false) => Err(ExternalError::Determinate(Determinate::new(anyhow!(
120 "unreliable"
121 )))),
122 }
123 }
124}
125
126#[derive(Debug)]
128pub struct UnreliableBlob {
129 handle: UnreliableHandle,
130 blob: Arc<dyn Blob>,
131}
132
133impl UnreliableBlob {
134 pub fn new(blob: Arc<dyn Blob>, handle: UnreliableHandle) -> Self {
136 UnreliableBlob { handle, blob }
137 }
138}
139
140#[async_trait]
141impl Blob for UnreliableBlob {
142 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
143 self.handle.run_op("get", || self.blob.get(key)).await
144 }
145
146 async fn list_keys_and_metadata(
147 &self,
148 key_prefix: &str,
149 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
150 ) -> Result<(), ExternalError> {
151 self.handle
152 .run_op("list_keys", || {
153 self.blob.list_keys_and_metadata(key_prefix, f)
154 })
155 .await
156 }
157
158 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
159 self.handle
160 .run_op("set", || self.blob.set(key, value))
161 .await
162 }
163
164 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
165 self.handle.run_op("delete", || self.blob.delete(key)).await
166 }
167
168 async fn restore(&self, key: &str) -> Result<(), ExternalError> {
169 self.handle
170 .run_op("restore", || self.blob.restore(key))
171 .await
172 }
173}
174
175#[derive(Debug)]
177pub struct UnreliableConsensus {
178 handle: UnreliableHandle,
179 consensus: Arc<dyn Consensus>,
180}
181
182impl UnreliableConsensus {
183 pub fn new(consensus: Arc<dyn Consensus>, handle: UnreliableHandle) -> Self {
185 UnreliableConsensus { consensus, handle }
186 }
187}
188
189#[async_trait]
190impl Consensus for UnreliableConsensus {
191 fn list_keys(&self) -> ResultStream<'_, String> {
192 self.consensus.list_keys()
194 }
195
196 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
197 self.handle
198 .run_op("head", || self.consensus.head(key))
199 .await
200 }
201
202 async fn compare_and_set(
203 &self,
204 key: &str,
205 new: VersionedData,
206 ) -> Result<CaSResult, ExternalError> {
207 self.handle
208 .run_op("compare_and_set", || {
209 self.consensus.compare_and_set(key, new)
210 })
211 .await
212 }
213
214 async fn scan(
215 &self,
216 key: &str,
217 from: SeqNo,
218 limit: usize,
219 ) -> Result<Vec<VersionedData>, ExternalError> {
220 self.handle
221 .run_op("scan", || self.consensus.scan(key, from, limit))
222 .await
223 }
224
225 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
226 self.handle
227 .run_op("truncate", || self.consensus.truncate(key, seqno))
228 .await
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use mz_ore::{assert_err, assert_ok};
235
236 use crate::mem::{MemBlob, MemBlobConfig, MemConsensus};
237
238 use super::*;
239
240 #[mz_ore::test(tokio::test)]
241 #[cfg_attr(miri, ignore)] async fn unreliable_blob() {
243 let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
244 let handle = UnreliableHandle::default();
245 let blob = UnreliableBlob::new(blob, handle.clone());
246
247 {
249 (*handle.core.lock().expect("mutex poisoned")).rng = SmallRng::seed_from_u64(0);
250 }
251
252 let mut succeeded = 0;
254 for _ in 0..100 {
255 if blob.get("a").await.is_ok() {
256 succeeded += 1;
257 }
258 }
259 assert!(succeeded > 50 && succeeded < 99, "succeeded={}", succeeded);
262
263 handle.totally_available();
265 assert_ok!(blob.get("a").await);
266
267 handle.totally_unavailable();
269 assert_err!(blob.get("a").await);
270 }
271
272 #[mz_ore::test(tokio::test)]
273 #[cfg_attr(miri, ignore)] async fn unreliable_consensus() {
275 let consensus = Arc::new(MemConsensus::default());
276 let handle = UnreliableHandle::default();
277 let consensus = UnreliableConsensus::new(consensus, handle.clone());
278
279 {
281 (*handle.core.lock().expect("mutex poisoned")).rng = SmallRng::seed_from_u64(0);
282 }
283
284 let mut succeeded = 0;
286 for _ in 0..100 {
287 if consensus.head("key").await.is_ok() {
288 succeeded += 1;
289 }
290 }
291 assert!(succeeded > 50 && succeeded < 99, "succeeded={}", succeeded);
294
295 handle.totally_available();
297 assert_ok!(consensus.head("key").await);
298
299 handle.totally_unavailable();
301 assert_err!(consensus.head("key").await);
302 }
303}