1use std::sync::Mutex;
46use std::time::{Duration, Instant};
47
48use anyhow::anyhow;
49use async_trait::async_trait;
50use bytes::Bytes;
51use futures_util::{StreamExt, TryStreamExt};
52use mz_ore::bytes::SegmentedBytes;
53use mz_ore::cast::CastInto;
54use mz_ore::future::{TimeoutError, timeout};
55use mz_ore::url::SensitiveUrl;
56use serde::{Deserialize, Serialize};
57use tokio::io::{AsyncReadExt, AsyncWriteExt};
58use turmoil::net::{TcpListener, TcpStream};
59
60use crate::location::{
61 Blob, BlobMetadata, CaSResult, Consensus, Determinate, ExternalError, Indeterminate,
62 ResultStream, SeqNo, VersionedData,
63};
64use crate::mem::{MemBlob, MemBlobConfig, MemConsensus};
65
66const RPC_TIMEOUT: Duration = Duration::from_secs(1);
68
69async fn rpc_write<T>(conn: &mut TcpStream, msg: T) -> Result<(), ExternalError>
71where
72 T: Serialize,
73{
74 let bytes = serde_json::to_vec(&msg).unwrap();
75 conn.write_u64(bytes.len().cast_into()).await?;
76 conn.write_all(&bytes).await?;
77 Ok(())
78}
79
80async fn rpc_read<T>(conn: &mut TcpStream) -> Result<T, ExternalError>
82where
83 T: for<'a> Deserialize<'a>,
84{
85 let len = conn.read_u64().await?;
86 let mut bytes = vec![0; len.cast_into()];
87 conn.read_exact(&mut bytes).await?;
88 let resp = serde_json::from_slice(&bytes).unwrap();
89 Ok(resp)
90}
91
92#[derive(Serialize, Deserialize)]
94enum RpcError {
95 Determinate(String),
96 Indeterminate(String),
97}
98
99impl From<ExternalError> for RpcError {
100 fn from(error: ExternalError) -> Self {
101 match error {
102 ExternalError::Determinate(e) => Self::Determinate(e.to_string()),
103 ExternalError::Indeterminate(e) => Self::Indeterminate(e.to_string()),
104 }
105 }
106}
107
108impl From<RpcError> for ExternalError {
109 fn from(error: RpcError) -> Self {
110 match error {
111 RpcError::Determinate(s) => Self::Determinate(Determinate::new(anyhow!(s))),
112 RpcError::Indeterminate(s) => Self::Indeterminate(Indeterminate::new(anyhow!(s))),
113 }
114 }
115}
116
117#[derive(Clone, Debug)]
119pub struct ConsensusConfig {
120 addr: String,
121}
122
123impl ConsensusConfig {
124 pub fn new(url: &SensitiveUrl) -> Self {
126 let addr = format!(
127 "{}:{}",
128 url.host().expect("must have a host"),
129 url.port().expect("must have a port")
130 );
131
132 Self { addr }
133 }
134}
135
136#[derive(Debug)]
138pub struct TurmoilConsensus {
139 addr: String,
140 connections: Mutex<Vec<TcpStream>>,
145}
146
147impl TurmoilConsensus {
148 pub fn open(config: ConsensusConfig) -> Self {
150 Self {
151 addr: config.addr,
152 connections: Mutex::new(Vec::new()),
153 }
154 }
155
156 async fn call<R>(&self, cmd: ConsensusCommand) -> Result<R, ExternalError>
158 where
159 R: for<'a> Deserialize<'a>,
160 {
161 let res = timeout::<_, _, ExternalError>(RPC_TIMEOUT, async {
162 let conn = self.connections.lock().expect("poisoned").pop();
163 let mut conn = match conn {
164 Some(c) => c,
165 None => TcpStream::connect(&self.addr).await?,
166 };
167
168 rpc_write(&mut conn, cmd).await?;
169 let resp: Result<R, RpcError> = rpc_read(&mut conn).await?;
170
171 self.connections.lock().expect("poisoned").push(conn);
172
173 Ok(resp?)
174 })
175 .await;
176
177 match res {
178 Ok(resp) => Ok(resp),
179 Err(TimeoutError::DeadlineElapsed) => Err(ExternalError::new_timeout(Instant::now())),
180 Err(TimeoutError::Inner(e)) => Err(e),
181 }
182 }
183}
184
185#[async_trait]
186impl Consensus for TurmoilConsensus {
187 fn list_keys(&self) -> ResultStream<'_, String> {
188 async_stream::try_stream! {
189 let keys = self.call::<Vec<String>>(ConsensusCommand::ListKeys).await?;
190 for key in keys {
191 yield key;
192 }
193 }
194 .boxed()
195 }
196
197 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
198 self.call(ConsensusCommand::Head { key: key.into() }).await
199 }
200
201 async fn compare_and_set(
202 &self,
203 key: &str,
204 expected: Option<SeqNo>,
205 new: VersionedData,
206 ) -> Result<CaSResult, ExternalError> {
207 self.call(ConsensusCommand::CompareAndSet {
208 key: key.into(),
209 expected,
210 new,
211 })
212 .await
213 }
214
215 async fn scan(
216 &self,
217 key: &str,
218 from: SeqNo,
219 limit: usize,
220 ) -> Result<Vec<VersionedData>, ExternalError> {
221 self.call(ConsensusCommand::Scan {
222 key: key.into(),
223 from,
224 limit,
225 })
226 .await
227 }
228
229 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError> {
230 self.call(ConsensusCommand::Truncate {
231 key: key.into(),
232 seqno,
233 })
234 .await
235 }
236}
237
238#[derive(Debug, Serialize, Deserialize)]
239enum ConsensusCommand {
240 ListKeys,
241 Head {
242 key: String,
243 },
244 CompareAndSet {
245 key: String,
246 expected: Option<SeqNo>,
247 new: VersionedData,
248 },
249 Scan {
250 key: String,
251 from: SeqNo,
252 limit: usize,
253 },
254 Truncate {
255 key: String,
256 seqno: SeqNo,
257 },
258}
259
260#[derive(Clone, Debug)]
262pub struct ConsensusState(MemConsensus);
263
264impl ConsensusState {
265 pub fn new() -> Self {
267 Self(MemConsensus::default())
268 }
269}
270
271pub async fn serve_consensus(port: u16, state: ConsensusState) -> turmoil::Result {
275 let listener = TcpListener::bind(format!("0.0.0.0:{port}")).await?;
276
277 loop {
278 let (conn, peer) = listener.accept().await?;
279 mz_ore::task::spawn(
280 || format!("turmoil consensus connection: {peer}"),
281 serve_consensus_connection(conn, state.clone()),
282 );
283 }
284}
285
286async fn serve_consensus_connection(
287 mut conn: TcpStream,
288 state: ConsensusState,
289) -> Result<(), ExternalError> {
290 loop {
291 let cmd = rpc_read(&mut conn).await?;
292 match cmd {
293 ConsensusCommand::ListKeys => {
294 let keys = state.0.list_keys().try_collect::<Vec<_>>().await;
295 let keys = keys.map_err(RpcError::from);
296 rpc_write(&mut conn, keys).await?;
297 }
298 ConsensusCommand::Head { key } => {
299 let data = state.0.head(&key).await;
300 let data = data.map_err(RpcError::from);
301 rpc_write(&mut conn, data).await?;
302 }
303 ConsensusCommand::CompareAndSet { key, expected, new } => {
304 let result = state.0.compare_and_set(&key, expected, new).await;
305 let result = result.map_err(RpcError::from);
306 rpc_write(&mut conn, result).await?;
307 }
308 ConsensusCommand::Scan { key, from, limit } => {
309 let data = state.0.scan(&key, from, limit).await;
310 let data = data.map_err(RpcError::from);
311 rpc_write(&mut conn, data).await?;
312 }
313 ConsensusCommand::Truncate { key, seqno } => {
314 let count = state.0.truncate(&key, seqno).await;
315 let count = count.map_err(RpcError::from);
316 rpc_write(&mut conn, count).await?;
317 }
318 }
319 }
320}
321
322#[derive(Clone, Debug)]
323pub struct BlobConfig {
325 addr: String,
326}
327
328impl BlobConfig {
329 pub fn new(url: &SensitiveUrl) -> Self {
331 let addr = format!(
332 "{}:{}",
333 url.host().expect("must have a host"),
334 url.port().expect("must have a port")
335 );
336
337 Self { addr }
338 }
339}
340
341#[derive(Debug)]
343pub struct TurmoilBlob {
344 addr: String,
345 connections: Mutex<Vec<TcpStream>>,
350}
351
352impl TurmoilBlob {
353 pub fn open(config: BlobConfig) -> Self {
355 Self {
356 addr: config.addr,
357 connections: Mutex::new(Vec::new()),
358 }
359 }
360
361 async fn call<R>(&self, cmd: BlobCommand) -> Result<R, ExternalError>
363 where
364 R: for<'a> Deserialize<'a>,
365 {
366 let res = timeout::<_, _, ExternalError>(RPC_TIMEOUT, async {
367 let conn = self.connections.lock().expect("poisoned").pop();
368 let mut conn = match conn {
369 Some(c) => c,
370 None => TcpStream::connect(&self.addr).await?,
371 };
372
373 rpc_write(&mut conn, cmd).await?;
374 let resp: Result<R, RpcError> = rpc_read(&mut conn).await?;
375
376 self.connections.lock().expect("poisoned").push(conn);
377
378 Ok(resp?)
379 })
380 .await;
381
382 match res {
383 Ok(resp) => Ok(resp),
384 Err(TimeoutError::DeadlineElapsed) => Err(anyhow!("timeout").into()),
385 Err(TimeoutError::Inner(e)) => Err(e),
386 }
387 }
388}
389
390#[async_trait]
391impl Blob for TurmoilBlob {
392 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
393 self.call(BlobCommand::Get { key: key.into() }).await
394 }
395
396 async fn list_keys_and_metadata(
397 &self,
398 key_prefix: &str,
399 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
400 ) -> Result<(), ExternalError> {
401 let items = self
402 .call::<Vec<(String, u64)>>(BlobCommand::ListKeysAndMetadata {
403 key_prefix: key_prefix.into(),
404 })
405 .await?;
406
407 for (key, size_in_bytes) in items {
408 f(BlobMetadata {
409 key: &key,
410 size_in_bytes,
411 })
412 }
413
414 Ok(())
415 }
416
417 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
418 self.call(BlobCommand::Set {
419 key: key.into(),
420 value,
421 })
422 .await
423 }
424
425 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
426 self.call(BlobCommand::Delete { key: key.into() }).await
427 }
428
429 async fn restore(&self, key: &str) -> Result<(), ExternalError> {
430 self.call(BlobCommand::Restore { key: key.into() }).await
431 }
432}
433
434#[derive(Debug, Serialize, Deserialize)]
435enum BlobCommand {
436 Get { key: String },
437 ListKeysAndMetadata { key_prefix: String },
438 Set { key: String, value: Bytes },
439 Delete { key: String },
440 Restore { key: String },
441}
442
443#[derive(Clone, Debug)]
445pub struct BlobState(MemBlob);
446
447impl BlobState {
448 pub fn new() -> Self {
450 Self(MemBlob::open(MemBlobConfig::new(true)))
451 }
452}
453
454pub async fn serve_blob(port: u16, state: BlobState) -> turmoil::Result {
458 let listener = TcpListener::bind(format!("0.0.0.0:{port}")).await?;
459
460 loop {
461 let (conn, peer) = listener.accept().await?;
462 mz_ore::task::spawn(
463 || format!("turmoil blob connection: {peer}"),
464 serve_blob_connection(conn, state.clone()),
465 );
466 }
467}
468
469async fn serve_blob_connection(mut conn: TcpStream, state: BlobState) -> Result<(), ExternalError> {
470 loop {
471 let cmd = rpc_read(&mut conn).await?;
472 match cmd {
473 BlobCommand::Get { key } => {
474 let value = state.0.get(&key).await;
475 let value = value.map_err(RpcError::from);
476 rpc_write(&mut conn, value).await?;
477 }
478 BlobCommand::ListKeysAndMetadata { key_prefix } => {
479 let mut items = Vec::new();
480 let mut f = |m: BlobMetadata| {
481 items.push((m.key.to_string(), m.size_in_bytes));
482 };
483 let result = state.0.list_keys_and_metadata(&key_prefix, &mut f).await;
484 let result = result.map(|()| items).map_err(RpcError::from);
485 rpc_write(&mut conn, result).await?;
486 }
487 BlobCommand::Set { key, value } => {
488 let result = state.0.set(&key, value).await;
489 let result = result.map_err(RpcError::from);
490 rpc_write(&mut conn, result).await?;
491 }
492 BlobCommand::Delete { key } => {
493 let size = state.0.delete(&key).await;
494 let size = size.map_err(RpcError::from);
495 rpc_write(&mut conn, size).await?;
496 }
497 BlobCommand::Restore { key } => {
498 let result = state.0.restore(&key).await;
499 let result = result.map_err(RpcError::from);
500 rpc_write(&mut conn, result).await?;
501 }
502 }
503 }
504}