mz_persist/
turmoil.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! [Consensus] and [Blob] implementations suitable for use with [turmoil].
11//!
12//! Both implementations function by forwarding calls to a server backend via
13//! [turmoil::net::TcpStream]s. The server backend keeps a simple in-memory state (implemented
14//! using [MemConsensus] and [MemBlob], respectively), which it uses to respond to RPCs.
15//!
16//! In a turmoil test, both the consensus and the blob server should be run as separate hosts. This
17//! gives turmoil the ability to partition and crash them independently, for the maximum amount of
18//! interesting interleaving.
19//!
20//! ```
21//! use mz_persist::turmoil::{BlobState, ConsensusState, serve_blob, serve_consensus};
22//!
23//! let mut sim = turmoil::Builder::new().build();
24//!
25//! sim.host("consensus", {
26//!     let state = ConsensusState::new();
27//!     move || serve_consensus(7000, state.clone())
28//! });
29//!
30//! sim.host("blob", {
31//!     let state = BlobState::new();
32//!     move || serve_blob(7000, state.clone())
33//! });
34//! ```
35//!
36//! To connect to these servers, use the following `PersistLocation`:
37//!
38//! ```ignore
39//! PersistLocation {
40//!     blob_uri: "turmoil://blob:7000".parse().unwrap(),
41//!     consensus_uri: "turmoil://consensus:7000".parse().unwrap(),
42//! }
43//! ```
44
45use std::sync::Mutex;
46use std::time::{Duration, Instant};
47
48use anyhow::anyhow;
49use async_trait::async_trait;
50use bytes::Bytes;
51use futures_util::{StreamExt, TryStreamExt};
52use mz_ore::bytes::SegmentedBytes;
53use mz_ore::cast::CastInto;
54use mz_ore::future::{TimeoutError, timeout};
55use mz_ore::url::SensitiveUrl;
56use serde::{Deserialize, Serialize};
57use tokio::io::{AsyncReadExt, AsyncWriteExt};
58use turmoil::net::{TcpListener, TcpStream};
59
60use crate::location::{
61    Blob, BlobMetadata, CaSResult, Consensus, Determinate, ExternalError, Indeterminate,
62    ResultStream, SeqNo, VersionedData,
63};
64use crate::mem::{MemBlob, MemBlobConfig, MemConsensus};
65
66/// Timeout for RPC calls.
67const RPC_TIMEOUT: Duration = Duration::from_secs(1);
68
69/// Write an RPC message to the given connection.
70async fn rpc_write<T>(conn: &mut TcpStream, msg: T) -> Result<(), ExternalError>
71where
72    T: Serialize,
73{
74    let bytes = serde_json::to_vec(&msg).unwrap();
75    conn.write_u64(bytes.len().cast_into()).await?;
76    conn.write_all(&bytes).await?;
77    Ok(())
78}
79
80/// Read an RPC message from the given connection.
81async fn rpc_read<T>(conn: &mut TcpStream) -> Result<T, ExternalError>
82where
83    T: for<'a> Deserialize<'a>,
84{
85    let len = conn.read_u64().await?;
86    let mut bytes = vec![0; len.cast_into()];
87    conn.read_exact(&mut bytes).await?;
88    let resp = serde_json::from_slice(&bytes).unwrap();
89    Ok(resp)
90}
91
92/// Serializable representation of `ExternalError`.
93#[derive(Serialize, Deserialize)]
94enum RpcError {
95    Determinate(String),
96    Indeterminate(String),
97}
98
99impl From<ExternalError> for RpcError {
100    fn from(error: ExternalError) -> Self {
101        match error {
102            ExternalError::Determinate(e) => Self::Determinate(e.to_string()),
103            ExternalError::Indeterminate(e) => Self::Indeterminate(e.to_string()),
104        }
105    }
106}
107
108impl From<RpcError> for ExternalError {
109    fn from(error: RpcError) -> Self {
110        match error {
111            RpcError::Determinate(s) => Self::Determinate(Determinate::new(anyhow!(s))),
112            RpcError::Indeterminate(s) => Self::Indeterminate(Indeterminate::new(anyhow!(s))),
113        }
114    }
115}
116
117/// Configuration for a [TurmoilConsensus].
118#[derive(Clone, Debug)]
119pub struct ConsensusConfig {
120    addr: String,
121}
122
123impl ConsensusConfig {
124    /// Construct a [ConsensusConfig] from a URL.
125    pub fn new(url: &SensitiveUrl) -> Self {
126        let addr = format!(
127            "{}:{}",
128            url.host().expect("must have a host"),
129            url.port().expect("must have a port")
130        );
131
132        Self { addr }
133    }
134}
135
136/// A [Consensus] implementation for use in turmoil tests.
137#[derive(Debug)]
138pub struct TurmoilConsensus {
139    addr: String,
140    /// Healthy connections to the consensus server.
141    ///
142    /// When making a call, we take one of these, perform the RPC, and then put it back if it's
143    /// still healthy afterwards. This way we avoid having to hold a mutex across await points.
144    connections: Mutex<Vec<TcpStream>>,
145}
146
147impl TurmoilConsensus {
148    /// Open a [TurmoilConsensus] instance.
149    pub fn open(config: ConsensusConfig) -> Self {
150        Self {
151            addr: config.addr,
152            connections: Mutex::new(Vec::new()),
153        }
154    }
155
156    /// Call the remote consensus server.
157    async fn call<R>(&self, cmd: ConsensusCommand) -> Result<R, ExternalError>
158    where
159        R: for<'a> Deserialize<'a>,
160    {
161        let res = timeout::<_, _, ExternalError>(RPC_TIMEOUT, async {
162            let conn = self.connections.lock().expect("poisoned").pop();
163            let mut conn = match conn {
164                Some(c) => c,
165                None => TcpStream::connect(&self.addr).await?,
166            };
167
168            rpc_write(&mut conn, cmd).await?;
169            let resp: Result<R, RpcError> = rpc_read(&mut conn).await?;
170
171            self.connections.lock().expect("poisoned").push(conn);
172
173            Ok(resp?)
174        })
175        .await;
176
177        match res {
178            Ok(resp) => Ok(resp),
179            Err(TimeoutError::DeadlineElapsed) => Err(ExternalError::new_timeout(Instant::now())),
180            Err(TimeoutError::Inner(e)) => Err(e),
181        }
182    }
183}
184
185#[async_trait]
186impl Consensus for TurmoilConsensus {
187    fn list_keys(&self) -> ResultStream<'_, String> {
188        async_stream::try_stream! {
189            let keys = self.call::<Vec<String>>(ConsensusCommand::ListKeys).await?;
190            for key in keys {
191                yield key;
192            }
193        }
194        .boxed()
195    }
196
197    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
198        self.call(ConsensusCommand::Head { key: key.into() }).await
199    }
200
201    async fn compare_and_set(
202        &self,
203        key: &str,
204        expected: Option<SeqNo>,
205        new: VersionedData,
206    ) -> Result<CaSResult, ExternalError> {
207        self.call(ConsensusCommand::CompareAndSet {
208            key: key.into(),
209            expected,
210            new,
211        })
212        .await
213    }
214
215    async fn scan(
216        &self,
217        key: &str,
218        from: SeqNo,
219        limit: usize,
220    ) -> Result<Vec<VersionedData>, ExternalError> {
221        self.call(ConsensusCommand::Scan {
222            key: key.into(),
223            from,
224            limit,
225        })
226        .await
227    }
228
229    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError> {
230        self.call(ConsensusCommand::Truncate {
231            key: key.into(),
232            seqno,
233        })
234        .await
235    }
236}
237
238#[derive(Debug, Serialize, Deserialize)]
239enum ConsensusCommand {
240    ListKeys,
241    Head {
242        key: String,
243    },
244    CompareAndSet {
245        key: String,
246        expected: Option<SeqNo>,
247        new: VersionedData,
248    },
249    Scan {
250        key: String,
251        from: SeqNo,
252        limit: usize,
253    },
254    Truncate {
255        key: String,
256        seqno: SeqNo,
257    },
258}
259
260/// State of a turmoil consensus server.
261#[derive(Clone, Debug)]
262pub struct ConsensusState(MemConsensus);
263
264impl ConsensusState {
265    /// Create a new [ConsensusState].
266    pub fn new() -> Self {
267        Self(MemConsensus::default())
268    }
269}
270
271/// Run a turmoil consensus server.
272///
273/// Intended to be used as the host logic passed to [turmoil::Sim::host].
274pub async fn serve_consensus(port: u16, state: ConsensusState) -> turmoil::Result {
275    let listener = TcpListener::bind(format!("0.0.0.0:{port}")).await?;
276
277    loop {
278        let (conn, peer) = listener.accept().await?;
279        mz_ore::task::spawn(
280            || format!("turmoil consensus connection: {peer}"),
281            serve_consensus_connection(conn, state.clone()),
282        );
283    }
284}
285
286async fn serve_consensus_connection(
287    mut conn: TcpStream,
288    state: ConsensusState,
289) -> Result<(), ExternalError> {
290    loop {
291        let cmd = rpc_read(&mut conn).await?;
292        match cmd {
293            ConsensusCommand::ListKeys => {
294                let keys = state.0.list_keys().try_collect::<Vec<_>>().await;
295                let keys = keys.map_err(RpcError::from);
296                rpc_write(&mut conn, keys).await?;
297            }
298            ConsensusCommand::Head { key } => {
299                let data = state.0.head(&key).await;
300                let data = data.map_err(RpcError::from);
301                rpc_write(&mut conn, data).await?;
302            }
303            ConsensusCommand::CompareAndSet { key, expected, new } => {
304                let result = state.0.compare_and_set(&key, expected, new).await;
305                let result = result.map_err(RpcError::from);
306                rpc_write(&mut conn, result).await?;
307            }
308            ConsensusCommand::Scan { key, from, limit } => {
309                let data = state.0.scan(&key, from, limit).await;
310                let data = data.map_err(RpcError::from);
311                rpc_write(&mut conn, data).await?;
312            }
313            ConsensusCommand::Truncate { key, seqno } => {
314                let count = state.0.truncate(&key, seqno).await;
315                let count = count.map_err(RpcError::from);
316                rpc_write(&mut conn, count).await?;
317            }
318        }
319    }
320}
321
322#[derive(Clone, Debug)]
323/// Configuration for a [TurmoilBlob].
324pub struct BlobConfig {
325    addr: String,
326}
327
328impl BlobConfig {
329    /// Construct a [BlobConfig] from a URL.
330    pub fn new(url: &SensitiveUrl) -> Self {
331        let addr = format!(
332            "{}:{}",
333            url.host().expect("must have a host"),
334            url.port().expect("must have a port")
335        );
336
337        Self { addr }
338    }
339}
340
341/// A [Blob] implementation for use in turmoil tests.
342#[derive(Debug)]
343pub struct TurmoilBlob {
344    addr: String,
345    /// Healthy connections to the blob server.
346    ///
347    /// When making a call, we take one of these, perform the RPC, and then put it back if it's
348    /// still healthy afterwards. This way we avoid having to hold a mutex across await points.
349    connections: Mutex<Vec<TcpStream>>,
350}
351
352impl TurmoilBlob {
353    /// Open a [TurmoilBlob] instance.
354    pub fn open(config: BlobConfig) -> Self {
355        Self {
356            addr: config.addr,
357            connections: Mutex::new(Vec::new()),
358        }
359    }
360
361    /// Call the remote blob server.
362    async fn call<R>(&self, cmd: BlobCommand) -> Result<R, ExternalError>
363    where
364        R: for<'a> Deserialize<'a>,
365    {
366        let res = timeout::<_, _, ExternalError>(RPC_TIMEOUT, async {
367            let conn = self.connections.lock().expect("poisoned").pop();
368            let mut conn = match conn {
369                Some(c) => c,
370                None => TcpStream::connect(&self.addr).await?,
371            };
372
373            rpc_write(&mut conn, cmd).await?;
374            let resp: Result<R, RpcError> = rpc_read(&mut conn).await?;
375
376            self.connections.lock().expect("poisoned").push(conn);
377
378            Ok(resp?)
379        })
380        .await;
381
382        match res {
383            Ok(resp) => Ok(resp),
384            Err(TimeoutError::DeadlineElapsed) => Err(anyhow!("timeout").into()),
385            Err(TimeoutError::Inner(e)) => Err(e),
386        }
387    }
388}
389
390#[async_trait]
391impl Blob for TurmoilBlob {
392    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
393        self.call(BlobCommand::Get { key: key.into() }).await
394    }
395
396    async fn list_keys_and_metadata(
397        &self,
398        key_prefix: &str,
399        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
400    ) -> Result<(), ExternalError> {
401        let items = self
402            .call::<Vec<(String, u64)>>(BlobCommand::ListKeysAndMetadata {
403                key_prefix: key_prefix.into(),
404            })
405            .await?;
406
407        for (key, size_in_bytes) in items {
408            f(BlobMetadata {
409                key: &key,
410                size_in_bytes,
411            })
412        }
413
414        Ok(())
415    }
416
417    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
418        self.call(BlobCommand::Set {
419            key: key.into(),
420            value,
421        })
422        .await
423    }
424
425    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
426        self.call(BlobCommand::Delete { key: key.into() }).await
427    }
428
429    async fn restore(&self, key: &str) -> Result<(), ExternalError> {
430        self.call(BlobCommand::Restore { key: key.into() }).await
431    }
432}
433
434#[derive(Debug, Serialize, Deserialize)]
435enum BlobCommand {
436    Get { key: String },
437    ListKeysAndMetadata { key_prefix: String },
438    Set { key: String, value: Bytes },
439    Delete { key: String },
440    Restore { key: String },
441}
442
443/// State of a turmoil blob server.
444#[derive(Clone, Debug)]
445pub struct BlobState(MemBlob);
446
447impl BlobState {
448    /// Create a new [BlobState].
449    pub fn new() -> Self {
450        Self(MemBlob::open(MemBlobConfig::new(true)))
451    }
452}
453
454/// Run a turmoil blob server.
455///
456/// Intended to be used as the host logic passed to [turmoil::Sim::host].
457pub async fn serve_blob(port: u16, state: BlobState) -> turmoil::Result {
458    let listener = TcpListener::bind(format!("0.0.0.0:{port}")).await?;
459
460    loop {
461        let (conn, peer) = listener.accept().await?;
462        mz_ore::task::spawn(
463            || format!("turmoil blob connection: {peer}"),
464            serve_blob_connection(conn, state.clone()),
465        );
466    }
467}
468
469async fn serve_blob_connection(mut conn: TcpStream, state: BlobState) -> Result<(), ExternalError> {
470    loop {
471        let cmd = rpc_read(&mut conn).await?;
472        match cmd {
473            BlobCommand::Get { key } => {
474                let value = state.0.get(&key).await;
475                let value = value.map_err(RpcError::from);
476                rpc_write(&mut conn, value).await?;
477            }
478            BlobCommand::ListKeysAndMetadata { key_prefix } => {
479                let mut items = Vec::new();
480                let mut f = |m: BlobMetadata| {
481                    items.push((m.key.to_string(), m.size_in_bytes));
482                };
483                let result = state.0.list_keys_and_metadata(&key_prefix, &mut f).await;
484                let result = result.map(|()| items).map_err(RpcError::from);
485                rpc_write(&mut conn, result).await?;
486            }
487            BlobCommand::Set { key, value } => {
488                let result = state.0.set(&key, value).await;
489                let result = result.map_err(RpcError::from);
490                rpc_write(&mut conn, result).await?;
491            }
492            BlobCommand::Delete { key } => {
493                let size = state.0.delete(&key).await;
494                let size = size.map_err(RpcError::from);
495                rpc_write(&mut conn, size).await?;
496            }
497            BlobCommand::Restore { key } => {
498                let result = state.0.restore(&key).await;
499                let result = result.map_err(RpcError::from);
500                rpc_write(&mut conn, result).await?;
501            }
502        }
503    }
504}