1use std::future::Future;
13use std::sync::{Arc, Mutex};
14use std::time::{Instant, UNIX_EPOCH};
15
16use anyhow::anyhow;
17use async_trait::async_trait;
18use bytes::Bytes;
19use mz_ore::bytes::SegmentedBytes;
20use rand::prelude::SmallRng;
21use rand::{Rng, SeedableRng};
22use tracing::trace;
23
24use crate::location::{
25 Blob, BlobMetadata, CaSResult, Consensus, Determinate, ExternalError, ResultStream, SeqNo,
26 VersionedData,
27};
28
29#[derive(Debug)]
30struct UnreliableCore {
31 rng: SmallRng,
32 should_happen: f64,
33 should_timeout: f64,
34 }
36
37#[derive(Clone, Debug)]
39pub struct UnreliableHandle {
40 core: Arc<Mutex<UnreliableCore>>,
41}
42
43impl Default for UnreliableHandle {
44 fn default() -> Self {
45 let seed = UNIX_EPOCH
46 .elapsed()
47 .map_or(0, |x| u64::from(x.subsec_nanos()));
48 Self::new(seed, 0.95, 0.05)
49 }
50}
51
52impl UnreliableHandle {
53 pub fn new(seed: u64, should_happen: f64, should_timeout: f64) -> Self {
55 assert!(should_happen >= 0.0);
56 assert!(should_happen <= 1.0);
57 assert!(should_timeout >= 0.0);
58 assert!(should_timeout <= 1.0);
59 let core = UnreliableCore {
60 rng: SmallRng::seed_from_u64(seed),
61 should_happen,
62 should_timeout,
63 };
64 UnreliableHandle {
65 core: Arc::new(Mutex::new(core)),
66 }
67 }
68
69 pub fn partially_available(&self, should_happen: f64, should_timeout: f64) {
71 assert!(should_happen >= 0.0);
72 assert!(should_happen <= 1.0);
73 assert!(should_timeout >= 0.0);
74 assert!(should_timeout <= 1.0);
75 let mut core = self.core.lock().expect("mutex poisoned");
76 core.should_happen = should_happen;
77 core.should_timeout = should_timeout;
78 }
79
80 pub fn totally_unavailable(&self) {
82 self.partially_available(0.0, 1.0);
83 }
84
85 pub fn totally_available(&self) {
87 self.partially_available(1.0, 0.0);
88 }
89
90 fn should_happen(&self) -> bool {
91 let mut core = self.core.lock().expect("mutex poisoned");
92 let should_happen = core.should_happen;
93 core.rng.gen_bool(should_happen)
94 }
95
96 fn should_timeout(&self) -> bool {
97 let mut core = self.core.lock().expect("mutex poisoned");
98 let should_timeout = core.should_timeout;
99 core.rng.gen_bool(should_timeout)
100 }
101
102 async fn run_op<R, F, WorkFn>(&self, name: &str, work_fn: WorkFn) -> Result<R, ExternalError>
103 where
104 F: Future<Output = Result<R, ExternalError>>,
105 WorkFn: FnOnce() -> F,
106 {
107 let (should_happen, should_timeout) = (self.should_happen(), self.should_timeout());
108 trace!(
109 "unreliable {} should_happen={} should_timeout={}",
110 name, should_happen, should_timeout,
111 );
112 match (should_happen, should_timeout) {
113 (true, true) => {
114 let _res = work_fn().await;
115 Err(ExternalError::new_timeout(Instant::now()))
116 }
117 (true, false) => work_fn().await,
118 (false, true) => Err(ExternalError::new_timeout(Instant::now())),
119 (false, false) => Err(ExternalError::Determinate(Determinate::new(anyhow!(
120 "unreliable"
121 )))),
122 }
123 }
124}
125
126#[derive(Debug)]
128pub struct UnreliableBlob {
129 handle: UnreliableHandle,
130 blob: Arc<dyn Blob>,
131}
132
133impl UnreliableBlob {
134 pub fn new(blob: Arc<dyn Blob>, handle: UnreliableHandle) -> Self {
136 UnreliableBlob { handle, blob }
137 }
138}
139
140#[async_trait]
141impl Blob for UnreliableBlob {
142 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
143 self.handle.run_op("get", || self.blob.get(key)).await
144 }
145
146 async fn list_keys_and_metadata(
147 &self,
148 key_prefix: &str,
149 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
150 ) -> Result<(), ExternalError> {
151 self.handle
152 .run_op("list_keys", || {
153 self.blob.list_keys_and_metadata(key_prefix, f)
154 })
155 .await
156 }
157
158 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
159 self.handle
160 .run_op("set", || self.blob.set(key, value))
161 .await
162 }
163
164 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
165 self.handle.run_op("delete", || self.blob.delete(key)).await
166 }
167
168 async fn restore(&self, key: &str) -> Result<(), ExternalError> {
169 self.handle
170 .run_op("restore", || self.blob.restore(key))
171 .await
172 }
173}
174
175#[derive(Debug)]
177pub struct UnreliableConsensus {
178 handle: UnreliableHandle,
179 consensus: Arc<dyn Consensus>,
180}
181
182impl UnreliableConsensus {
183 pub fn new(consensus: Arc<dyn Consensus>, handle: UnreliableHandle) -> Self {
185 UnreliableConsensus { consensus, handle }
186 }
187}
188
189#[async_trait]
190impl Consensus for UnreliableConsensus {
191 fn list_keys(&self) -> ResultStream<String> {
192 self.consensus.list_keys()
194 }
195
196 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
197 self.handle
198 .run_op("head", || self.consensus.head(key))
199 .await
200 }
201
202 async fn compare_and_set(
203 &self,
204 key: &str,
205 expected: Option<SeqNo>,
206 new: VersionedData,
207 ) -> Result<CaSResult, ExternalError> {
208 self.handle
209 .run_op("compare_and_set", || {
210 self.consensus.compare_and_set(key, expected, new)
211 })
212 .await
213 }
214
215 async fn scan(
216 &self,
217 key: &str,
218 from: SeqNo,
219 limit: usize,
220 ) -> Result<Vec<VersionedData>, ExternalError> {
221 self.handle
222 .run_op("scan", || self.consensus.scan(key, from, limit))
223 .await
224 }
225
226 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError> {
227 self.handle
228 .run_op("truncate", || self.consensus.truncate(key, seqno))
229 .await
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use mz_ore::{assert_err, assert_ok};
236
237 use crate::mem::{MemBlob, MemBlobConfig, MemConsensus};
238
239 use super::*;
240
241 #[mz_ore::test(tokio::test)]
242 #[cfg_attr(miri, ignore)] async fn unreliable_blob() {
244 let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
245 let handle = UnreliableHandle::default();
246 let blob = UnreliableBlob::new(blob, handle.clone());
247
248 {
250 (*handle.core.lock().expect("mutex poisoned")).rng = SmallRng::seed_from_u64(0);
251 }
252
253 let mut succeeded = 0;
255 for _ in 0..100 {
256 if blob.get("a").await.is_ok() {
257 succeeded += 1;
258 }
259 }
260 assert!(succeeded > 50 && succeeded < 99, "succeeded={}", succeeded);
263
264 handle.totally_available();
266 assert_ok!(blob.get("a").await);
267
268 handle.totally_unavailable();
270 assert_err!(blob.get("a").await);
271 }
272
273 #[mz_ore::test(tokio::test)]
274 #[cfg_attr(miri, ignore)] async fn unreliable_consensus() {
276 let consensus = Arc::new(MemConsensus::default());
277 let handle = UnreliableHandle::default();
278 let consensus = UnreliableConsensus::new(consensus, handle.clone());
279
280 {
282 (*handle.core.lock().expect("mutex poisoned")).rng = SmallRng::seed_from_u64(0);
283 }
284
285 let mut succeeded = 0;
287 for _ in 0..100 {
288 if consensus.head("key").await.is_ok() {
289 succeeded += 1;
290 }
291 }
292 assert!(succeeded > 50 && succeeded < 99, "succeeded={}", succeeded);
295
296 handle.totally_available();
298 assert_ok!(consensus.head("key").await);
299
300 handle.totally_unavailable();
302 assert_err!(consensus.head("key").await);
303 }
304}