1use std::str::FromStr;
20
21use num_enum::{IntoPrimitive, TryFromPrimitive};
22use serde::{Deserialize, Serialize};
23use serde_json::Value;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
26pub struct MsgId(pub usize);
27
28impl MsgId {
29 pub fn next(&self) -> Self {
30 MsgId(self.0 + 1)
31 }
32}
33
34#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
35pub struct NodeId(pub String);
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
41struct TxnOpHelper(pub String, pub u64, pub Value);
42
43#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
44#[serde(try_from = "TxnOpHelper", into = "TxnOpHelper")]
45pub enum ReqTxnOp {
46 Append { key: u64, val: u64 },
47 Read { key: u64 },
48}
49
50#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
51#[serde(into = "TxnOpHelper")]
52pub enum ResTxnOp {
53 Append { key: u64, val: u64 },
54 Read { key: u64, val: Vec<u64> },
55}
56
57#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
59#[serde(tag = "type")]
60pub enum Body {
61 #[serde(rename = "error")]
62 Error {
63 #[serde(skip_serializing_if = "Option::is_none")]
64 msg_id: Option<MsgId>,
65 in_reply_to: MsgId,
66 code: ErrorCode,
67 text: String,
68 },
69
70 #[serde(rename = "init")]
71 ReqInit {
72 msg_id: MsgId,
73 node_id: NodeId,
74 node_ids: Vec<NodeId>,
75 },
76 #[serde(rename = "init_ok")]
77 ResInit { msg_id: MsgId, in_reply_to: MsgId },
78
79 #[serde(rename = "txn")]
80 ReqTxn { msg_id: MsgId, txn: Vec<ReqTxnOp> },
81 #[serde(rename = "txn_ok")]
82 ResTxn {
83 msg_id: MsgId,
84 in_reply_to: MsgId,
85 txn: Vec<ResTxnOp>,
86 },
87
88 #[serde(rename = "read")]
89 ReqLinKvRead { msg_id: MsgId, key: Value },
90 #[serde(rename = "read_ok")]
91 ResLinKvRead {
92 #[serde(skip_serializing_if = "Option::is_none")]
93 msg_id: Option<MsgId>,
94 in_reply_to: MsgId,
95 value: Value,
96 },
97
98 #[serde(rename = "write")]
99 ReqLinKvWrite {
100 msg_id: MsgId,
101 key: Value,
102 value: Value,
103 },
104 #[serde(rename = "write_ok")]
105 ResLinKvWrite {
106 #[serde(skip_serializing_if = "Option::is_none")]
107 msg_id: Option<MsgId>,
108 in_reply_to: MsgId,
109 },
110
111 #[serde(rename = "cas")]
112 ReqLinKvCaS {
113 msg_id: MsgId,
114 key: Value,
115 from: Value,
116 to: Value,
117 #[serde(skip_serializing_if = "Option::is_none")]
118 create_if_not_exists: Option<bool>,
119 },
120 #[serde(rename = "cas_ok")]
121 ResLinKvCaS {
122 #[serde(skip_serializing_if = "Option::is_none")]
123 msg_id: Option<MsgId>,
124 in_reply_to: MsgId,
125 },
126}
127
128impl Body {
129 pub fn in_reply_to(&self) -> Option<MsgId> {
130 match self {
131 Body::Error { in_reply_to, .. } => Some(*in_reply_to),
132 Body::ResLinKvRead { in_reply_to, .. } => Some(*in_reply_to),
133 Body::ResLinKvWrite { in_reply_to, .. } => Some(*in_reply_to),
134 Body::ResLinKvCaS { in_reply_to, .. } => Some(*in_reply_to),
135 _ => None,
136 }
137 }
138}
139
140#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
142pub struct Msg {
143 pub src: NodeId,
144 pub dest: NodeId,
145 pub body: Body,
146}
147
148impl FromStr for Msg {
149 type Err = String;
150
151 fn from_str(s: &str) -> Result<Self, Self::Err> {
152 serde_json::from_str(s).map_err(|err| err.to_string())
153 }
154}
155
156impl std::fmt::Display for Msg {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.write_str(&serde_json::to_string(self).expect("msg wasn't json-able"))
159 }
160}
161
162#[derive(Debug, Clone, PartialEq, Eq)]
163pub struct MaelstromError {
164 pub code: ErrorCode,
165 pub text: String,
166}
167
168impl std::fmt::Display for MaelstromError {
169 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170 write!(f, "{:?}: {}", self.code, self.text)
171 }
172}
173
174impl std::error::Error for MaelstromError {}
175
176#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, IntoPrimitive, TryFromPrimitive)]
178#[serde(try_from = "usize", into = "usize")]
179#[repr(usize)]
180pub enum ErrorCode {
181 Timeout = 0,
182 NodeNotFound = 1,
183 NotSupported = 10,
184 TemporarilyUnavailable = 11,
185 MalformedRequest = 12,
186 Crash = 13,
187 Abort = 14,
188 KeyDoesNotExist = 20,
189 KeyAlreadyExists = 21,
190 PreconditionFailed = 22,
191 TxnConflict = 30,
192}
193
194mod from_impls {
195 use std::fmt::Debug;
196
197 use mz_persist::location::{ExternalError, Indeterminate};
198 use mz_persist_client::error::InvalidUsage;
199 use serde_json::Value;
200 use tracing::debug;
201
202 use crate::maelstrom::api::{ErrorCode, MaelstromError, ReqTxnOp, ResTxnOp, TxnOpHelper};
203
204 impl TryFrom<TxnOpHelper> for ReqTxnOp {
205 type Error = String;
206
207 fn try_from(value: TxnOpHelper) -> Result<Self, Self::Error> {
208 let TxnOpHelper(f, key, val) = value;
209 match f.as_str() {
210 "r" => match val {
211 Value::Null => Ok(ReqTxnOp::Read { key }),
212 x => Err(format!("unexpected read value: {}", x)),
213 },
214 "append" => match val {
215 Value::Number(x) => match x.as_u64() {
216 Some(val) => Ok(ReqTxnOp::Append { key, val }),
217 None => Err(format!("unexpected append value: {}", x)),
218 },
219 x => Err(format!("unexpected append value: {}", x)),
220 },
221 x => Err(format!("format txn type: {}", x)),
222 }
223 }
224 }
225
226 impl From<ReqTxnOp> for TxnOpHelper {
227 fn from(value: ReqTxnOp) -> Self {
228 match value {
229 ReqTxnOp::Read { key } => TxnOpHelper("r".into(), key, Value::Null),
230 ReqTxnOp::Append { key, val } => {
231 TxnOpHelper("append".into(), key, Value::from(val))
232 }
233 }
234 }
235 }
236
237 impl From<ResTxnOp> for TxnOpHelper {
238 fn from(value: ResTxnOp) -> Self {
239 match value {
240 ResTxnOp::Read { key, val } => TxnOpHelper("r".into(), key, Value::from(val)),
241 ResTxnOp::Append { key, val } => {
242 TxnOpHelper("append".into(), key, Value::from(val))
243 }
244 }
245 }
246 }
247
248 impl From<Indeterminate> for MaelstromError {
249 fn from(x: Indeterminate) -> Self {
250 MaelstromError {
251 code: ErrorCode::Crash,
252 text: x.to_string(),
253 }
254 }
255 }
256
257 impl From<ExternalError> for MaelstromError {
258 fn from(x: ExternalError) -> Self {
259 if x.is_timeout() {
260 debug!("creating timeout: {:?}", x);
264 MaelstromError {
265 code: ErrorCode::Timeout,
266 text: x.to_string(),
267 }
268 } else {
269 MaelstromError {
270 code: ErrorCode::Crash,
271 text: x.to_string(),
272 }
273 }
274 }
275 }
276
277 impl<T: Debug> From<InvalidUsage<T>> for MaelstromError {
278 fn from(x: InvalidUsage<T>) -> Self {
279 MaelstromError {
282 code: ErrorCode::Abort,
283 text: x.to_string(),
284 }
285 }
286 }
287}