mz_persist/
unreliable.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Test utilities for injecting latency and errors.
11
12use std::future::Future;
13use std::sync::{Arc, Mutex};
14use std::time::{Instant, UNIX_EPOCH};
15
16use anyhow::anyhow;
17use async_trait::async_trait;
18use bytes::Bytes;
19use mz_ore::bytes::SegmentedBytes;
20use rand::prelude::SmallRng;
21use rand::{Rng, SeedableRng};
22use tracing::trace;
23
24use crate::location::{
25    Blob, BlobMetadata, CaSResult, Consensus, Determinate, ExternalError, ResultStream, SeqNo,
26    VersionedData,
27};
28
29#[derive(Debug)]
30struct UnreliableCore {
31    rng: SmallRng,
32    should_happen: f64,
33    should_timeout: f64,
34    // TODO: Delays, what else?
35}
36
37/// A handle for controlling the behavior of an unreliable delegate.
38#[derive(Clone, Debug)]
39pub struct UnreliableHandle {
40    core: Arc<Mutex<UnreliableCore>>,
41}
42
43impl Default for UnreliableHandle {
44    fn default() -> Self {
45        let seed = UNIX_EPOCH
46            .elapsed()
47            .map_or(0, |x| u64::from(x.subsec_nanos()));
48        Self::new(seed, 0.95, 0.05)
49    }
50}
51
52impl UnreliableHandle {
53    /// Returns a new [UnreliableHandle].
54    pub fn new(seed: u64, should_happen: f64, should_timeout: f64) -> Self {
55        assert!(should_happen >= 0.0);
56        assert!(should_happen <= 1.0);
57        assert!(should_timeout >= 0.0);
58        assert!(should_timeout <= 1.0);
59        let core = UnreliableCore {
60            rng: SmallRng::seed_from_u64(seed),
61            should_happen,
62            should_timeout,
63        };
64        UnreliableHandle {
65            core: Arc::new(Mutex::new(core)),
66        }
67    }
68
69    /// Cause all later calls to sometimes return an error.
70    pub fn partially_available(&self, should_happen: f64, should_timeout: f64) {
71        assert!(should_happen >= 0.0);
72        assert!(should_happen <= 1.0);
73        assert!(should_timeout >= 0.0);
74        assert!(should_timeout <= 1.0);
75        let mut core = self.core.lock().expect("mutex poisoned");
76        core.should_happen = should_happen;
77        core.should_timeout = should_timeout;
78    }
79
80    /// Cause all later calls to return an error.
81    pub fn totally_unavailable(&self) {
82        self.partially_available(0.0, 1.0);
83    }
84
85    /// Cause all later calls to succeed.
86    pub fn totally_available(&self) {
87        self.partially_available(1.0, 0.0);
88    }
89
90    fn should_happen(&self) -> bool {
91        let mut core = self.core.lock().expect("mutex poisoned");
92        let should_happen = core.should_happen;
93        core.rng.gen_bool(should_happen)
94    }
95
96    fn should_timeout(&self) -> bool {
97        let mut core = self.core.lock().expect("mutex poisoned");
98        let should_timeout = core.should_timeout;
99        core.rng.gen_bool(should_timeout)
100    }
101
102    async fn run_op<R, F, WorkFn>(&self, name: &str, work_fn: WorkFn) -> Result<R, ExternalError>
103    where
104        F: Future<Output = Result<R, ExternalError>>,
105        WorkFn: FnOnce() -> F,
106    {
107        let (should_happen, should_timeout) = (self.should_happen(), self.should_timeout());
108        trace!(
109            "unreliable {} should_happen={} should_timeout={}",
110            name, should_happen, should_timeout,
111        );
112        match (should_happen, should_timeout) {
113            (true, true) => {
114                let _res = work_fn().await;
115                Err(ExternalError::new_timeout(Instant::now()))
116            }
117            (true, false) => work_fn().await,
118            (false, true) => Err(ExternalError::new_timeout(Instant::now())),
119            (false, false) => Err(ExternalError::Determinate(Determinate::new(anyhow!(
120                "unreliable"
121            )))),
122        }
123    }
124}
125
126/// An unreliable delegate to [Blob].
127#[derive(Debug)]
128pub struct UnreliableBlob {
129    handle: UnreliableHandle,
130    blob: Arc<dyn Blob>,
131}
132
133impl UnreliableBlob {
134    /// Returns a new [UnreliableBlob].
135    pub fn new(blob: Arc<dyn Blob>, handle: UnreliableHandle) -> Self {
136        UnreliableBlob { handle, blob }
137    }
138}
139
140#[async_trait]
141impl Blob for UnreliableBlob {
142    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
143        self.handle.run_op("get", || self.blob.get(key)).await
144    }
145
146    async fn list_keys_and_metadata(
147        &self,
148        key_prefix: &str,
149        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
150    ) -> Result<(), ExternalError> {
151        self.handle
152            .run_op("list_keys", || {
153                self.blob.list_keys_and_metadata(key_prefix, f)
154            })
155            .await
156    }
157
158    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
159        self.handle
160            .run_op("set", || self.blob.set(key, value))
161            .await
162    }
163
164    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
165        self.handle.run_op("delete", || self.blob.delete(key)).await
166    }
167
168    async fn restore(&self, key: &str) -> Result<(), ExternalError> {
169        self.handle
170            .run_op("restore", || self.blob.restore(key))
171            .await
172    }
173}
174
175/// An unreliable delegate to [Consensus].
176#[derive(Debug)]
177pub struct UnreliableConsensus {
178    handle: UnreliableHandle,
179    consensus: Arc<dyn Consensus>,
180}
181
182impl UnreliableConsensus {
183    /// Returns a new [UnreliableConsensus].
184    pub fn new(consensus: Arc<dyn Consensus>, handle: UnreliableHandle) -> Self {
185        UnreliableConsensus { consensus, handle }
186    }
187}
188
189#[async_trait]
190impl Consensus for UnreliableConsensus {
191    fn list_keys(&self) -> ResultStream<String> {
192        // TODO: run_op for streams
193        self.consensus.list_keys()
194    }
195
196    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
197        self.handle
198            .run_op("head", || self.consensus.head(key))
199            .await
200    }
201
202    async fn compare_and_set(
203        &self,
204        key: &str,
205        expected: Option<SeqNo>,
206        new: VersionedData,
207    ) -> Result<CaSResult, ExternalError> {
208        self.handle
209            .run_op("compare_and_set", || {
210                self.consensus.compare_and_set(key, expected, new)
211            })
212            .await
213    }
214
215    async fn scan(
216        &self,
217        key: &str,
218        from: SeqNo,
219        limit: usize,
220    ) -> Result<Vec<VersionedData>, ExternalError> {
221        self.handle
222            .run_op("scan", || self.consensus.scan(key, from, limit))
223            .await
224    }
225
226    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError> {
227        self.handle
228            .run_op("truncate", || self.consensus.truncate(key, seqno))
229            .await
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use mz_ore::{assert_err, assert_ok};
236
237    use crate::mem::{MemBlob, MemBlobConfig, MemConsensus};
238
239    use super::*;
240
241    #[mz_ore::test(tokio::test)]
242    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
243    async fn unreliable_blob() {
244        let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
245        let handle = UnreliableHandle::default();
246        let blob = UnreliableBlob::new(blob, handle.clone());
247
248        // Use a fixed seed so this test doesn't flake.
249        {
250            (*handle.core.lock().expect("mutex poisoned")).rng = SmallRng::seed_from_u64(0);
251        }
252
253        // By default, it's partially reliable.
254        let mut succeeded = 0;
255        for _ in 0..100 {
256            if blob.get("a").await.is_ok() {
257                succeeded += 1;
258            }
259        }
260        // Intentionally have pretty loose bounds so this assertion doesn't
261        // become a maintenance burden if the rng impl changes.
262        assert!(succeeded > 50 && succeeded < 99, "succeeded={}", succeeded);
263
264        // Reliable doesn't error.
265        handle.totally_available();
266        assert_ok!(blob.get("a").await);
267
268        // Unreliable does error.
269        handle.totally_unavailable();
270        assert_err!(blob.get("a").await);
271    }
272
273    #[mz_ore::test(tokio::test)]
274    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
275    async fn unreliable_consensus() {
276        let consensus = Arc::new(MemConsensus::default());
277        let handle = UnreliableHandle::default();
278        let consensus = UnreliableConsensus::new(consensus, handle.clone());
279
280        // Use a fixed seed so this test doesn't flake.
281        {
282            (*handle.core.lock().expect("mutex poisoned")).rng = SmallRng::seed_from_u64(0);
283        }
284
285        // By default, it's partially reliable.
286        let mut succeeded = 0;
287        for _ in 0..100 {
288            if consensus.head("key").await.is_ok() {
289                succeeded += 1;
290            }
291        }
292        // Intentionally have pretty loose bounds so this assertion doesn't
293        // become a maintenance burden if the rng impl changes.
294        assert!(succeeded > 50 && succeeded < 99, "succeeded={}", succeeded);
295
296        // Reliable doesn't error.
297        handle.totally_available();
298        assert_ok!(consensus.head("key").await);
299
300        // Unreliable does error.
301        handle.totally_unavailable();
302        assert_err!(consensus.head("key").await);
303    }
304}