persistcli/maelstrom/
api.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types used in the Maelstrom API
11//!
12//! These are documented in Maelstrom's [protocol], [workloads], and [services]
13//! docs.
14//!
15//! [protocol]: https://github.com/jepsen-io/maelstrom/blob/v0.2.1/doc/protocol.md
16//! [workloads]: https://github.com/jepsen-io/maelstrom/blob/v0.2.1/doc/workloads.md
17//! [services]: https://github.com/jepsen-io/maelstrom/blob/v0.2.1/doc/services.md
18
19use std::str::FromStr;
20
21use num_enum::{IntoPrimitive, TryFromPrimitive};
22use serde::{Deserialize, Serialize};
23use serde_json::Value;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
26pub struct MsgId(pub usize);
27
28impl MsgId {
29    pub fn next(&self) -> Self {
30        MsgId(self.0 + 1)
31    }
32}
33
34#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
35pub struct NodeId(pub String);
36
37// A struct that exactly matches the structure of the Maelstrom json. Used as an
38// intermediary so we can map it into real rust enums (as opposed to the value
39// of the first field determining the structure of the other two).
40#[derive(Debug, Clone, Serialize, Deserialize)]
41struct TxnOpHelper(pub String, pub u64, pub Value);
42
43#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
44#[serde(try_from = "TxnOpHelper", into = "TxnOpHelper")]
45pub enum ReqTxnOp {
46    Append { key: u64, val: u64 },
47    Read { key: u64 },
48}
49
50#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
51#[serde(into = "TxnOpHelper")]
52pub enum ResTxnOp {
53    Append { key: u64, val: u64 },
54    Read { key: u64, val: Vec<u64> },
55}
56
57/// <https://github.com/jepsen-io/maelstrom/blob/v0.2.1/doc/protocol.md#message-bodies>
58#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
59#[serde(tag = "type")]
60pub enum Body {
61    #[serde(rename = "error")]
62    Error {
63        #[serde(skip_serializing_if = "Option::is_none")]
64        msg_id: Option<MsgId>,
65        in_reply_to: MsgId,
66        code: ErrorCode,
67        text: String,
68    },
69
70    #[serde(rename = "init")]
71    ReqInit {
72        msg_id: MsgId,
73        node_id: NodeId,
74        node_ids: Vec<NodeId>,
75    },
76    #[serde(rename = "init_ok")]
77    ResInit { msg_id: MsgId, in_reply_to: MsgId },
78
79    #[serde(rename = "txn")]
80    ReqTxn { msg_id: MsgId, txn: Vec<ReqTxnOp> },
81    #[serde(rename = "txn_ok")]
82    ResTxn {
83        msg_id: MsgId,
84        in_reply_to: MsgId,
85        txn: Vec<ResTxnOp>,
86    },
87
88    #[serde(rename = "read")]
89    ReqLinKvRead { msg_id: MsgId, key: Value },
90    #[serde(rename = "read_ok")]
91    ResLinKvRead {
92        #[serde(skip_serializing_if = "Option::is_none")]
93        msg_id: Option<MsgId>,
94        in_reply_to: MsgId,
95        value: Value,
96    },
97
98    #[serde(rename = "write")]
99    ReqLinKvWrite {
100        msg_id: MsgId,
101        key: Value,
102        value: Value,
103    },
104    #[serde(rename = "write_ok")]
105    ResLinKvWrite {
106        #[serde(skip_serializing_if = "Option::is_none")]
107        msg_id: Option<MsgId>,
108        in_reply_to: MsgId,
109    },
110
111    #[serde(rename = "cas")]
112    ReqLinKvCaS {
113        msg_id: MsgId,
114        key: Value,
115        from: Value,
116        to: Value,
117        #[serde(skip_serializing_if = "Option::is_none")]
118        create_if_not_exists: Option<bool>,
119    },
120    #[serde(rename = "cas_ok")]
121    ResLinKvCaS {
122        #[serde(skip_serializing_if = "Option::is_none")]
123        msg_id: Option<MsgId>,
124        in_reply_to: MsgId,
125    },
126}
127
128impl Body {
129    pub fn in_reply_to(&self) -> Option<MsgId> {
130        match self {
131            Body::Error { in_reply_to, .. } => Some(*in_reply_to),
132            Body::ResLinKvRead { in_reply_to, .. } => Some(*in_reply_to),
133            Body::ResLinKvWrite { in_reply_to, .. } => Some(*in_reply_to),
134            Body::ResLinKvCaS { in_reply_to, .. } => Some(*in_reply_to),
135            _ => None,
136        }
137    }
138}
139
140/// <https://github.com/jepsen-io/maelstrom/blob/v0.2.1/doc/protocol.md#messages>
141#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
142pub struct Msg {
143    pub src: NodeId,
144    pub dest: NodeId,
145    pub body: Body,
146}
147
148impl FromStr for Msg {
149    type Err = String;
150
151    fn from_str(s: &str) -> Result<Self, Self::Err> {
152        serde_json::from_str(s).map_err(|err| err.to_string())
153    }
154}
155
156impl std::fmt::Display for Msg {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.write_str(&serde_json::to_string(self).expect("msg wasn't json-able"))
159    }
160}
161
162#[derive(Debug, Clone, PartialEq, Eq)]
163pub struct MaelstromError {
164    pub code: ErrorCode,
165    pub text: String,
166}
167
168impl std::fmt::Display for MaelstromError {
169    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170        write!(f, "{:?}: {}", self.code, self.text)
171    }
172}
173
174impl std::error::Error for MaelstromError {}
175
176/// <https://github.com/jepsen-io/maelstrom/blob/v0.2.1/doc/protocol.md#errors>
177#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, IntoPrimitive, TryFromPrimitive)]
178#[serde(try_from = "usize", into = "usize")]
179#[repr(usize)]
180pub enum ErrorCode {
181    Timeout = 0,
182    NodeNotFound = 1,
183    NotSupported = 10,
184    TemporarilyUnavailable = 11,
185    MalformedRequest = 12,
186    Crash = 13,
187    Abort = 14,
188    KeyDoesNotExist = 20,
189    KeyAlreadyExists = 21,
190    PreconditionFailed = 22,
191    TxnConflict = 30,
192}
193
194mod from_impls {
195    use std::fmt::Debug;
196
197    use mz_persist::location::{ExternalError, Indeterminate};
198    use mz_persist_client::error::InvalidUsage;
199    use serde_json::Value;
200    use tracing::debug;
201
202    use crate::maelstrom::api::{ErrorCode, MaelstromError, ReqTxnOp, ResTxnOp, TxnOpHelper};
203
204    impl TryFrom<TxnOpHelper> for ReqTxnOp {
205        type Error = String;
206
207        fn try_from(value: TxnOpHelper) -> Result<Self, Self::Error> {
208            let TxnOpHelper(f, key, val) = value;
209            match f.as_str() {
210                "r" => match val {
211                    Value::Null => Ok(ReqTxnOp::Read { key }),
212                    x => Err(format!("unexpected read value: {}", x)),
213                },
214                "append" => match val {
215                    Value::Number(x) => match x.as_u64() {
216                        Some(val) => Ok(ReqTxnOp::Append { key, val }),
217                        None => Err(format!("unexpected append value: {}", x)),
218                    },
219                    x => Err(format!("unexpected append value: {}", x)),
220                },
221                x => Err(format!("format txn type: {}", x)),
222            }
223        }
224    }
225
226    impl From<ReqTxnOp> for TxnOpHelper {
227        fn from(value: ReqTxnOp) -> Self {
228            match value {
229                ReqTxnOp::Read { key } => TxnOpHelper("r".into(), key, Value::Null),
230                ReqTxnOp::Append { key, val } => {
231                    TxnOpHelper("append".into(), key, Value::from(val))
232                }
233            }
234        }
235    }
236
237    impl From<ResTxnOp> for TxnOpHelper {
238        fn from(value: ResTxnOp) -> Self {
239            match value {
240                ResTxnOp::Read { key, val } => TxnOpHelper("r".into(), key, Value::from(val)),
241                ResTxnOp::Append { key, val } => {
242                    TxnOpHelper("append".into(), key, Value::from(val))
243                }
244            }
245        }
246    }
247
248    impl From<Indeterminate> for MaelstromError {
249        fn from(x: Indeterminate) -> Self {
250            MaelstromError {
251                code: ErrorCode::Crash,
252                text: x.to_string(),
253            }
254        }
255    }
256
257    impl From<ExternalError> for MaelstromError {
258        fn from(x: ExternalError) -> Self {
259            if x.is_timeout() {
260                // Toss a debug log in here so that, with RUST_BACKTRACE=1, we
261                // can more easily see where timeouts are coming from. Perhaps
262                // better to to attach the backtrace to the MaelstromError.
263                debug!("creating timeout: {:?}", x);
264                MaelstromError {
265                    code: ErrorCode::Timeout,
266                    text: x.to_string(),
267                }
268            } else {
269                MaelstromError {
270                    code: ErrorCode::Crash,
271                    text: x.to_string(),
272                }
273            }
274        }
275    }
276
277    impl<T: Debug> From<InvalidUsage<T>> for MaelstromError {
278        fn from(x: InvalidUsage<T>) -> Self {
279            // ErrorCode::Abort means this definitely didn't happen. Maelstrom
280            // will call us out on it if this is a lie.
281            MaelstromError {
282                code: ErrorCode::Abort,
283                text: x.to_string(),
284            }
285        }
286    }
287}