1use std::sync::Mutex;
46use std::time::{Duration, Instant};
47
48use anyhow::anyhow;
49use async_trait::async_trait;
50use bytes::Bytes;
51use futures_util::{StreamExt, TryStreamExt};
52use mz_ore::bytes::SegmentedBytes;
53use mz_ore::cast::CastInto;
54use mz_ore::future::{TimeoutError, timeout};
55use mz_ore::url::SensitiveUrl;
56use serde::{Deserialize, Serialize};
57use tokio::io::{AsyncReadExt, AsyncWriteExt};
58use turmoil::net::{TcpListener, TcpStream};
59
60use crate::location::{
61 Blob, BlobMetadata, CaSResult, Consensus, Determinate, ExternalError, Indeterminate,
62 ResultStream, SeqNo, VersionedData,
63};
64use crate::mem::{MemBlob, MemBlobConfig, MemConsensus};
65
66const RPC_TIMEOUT: Duration = Duration::from_secs(1);
68
69async fn rpc_write<T>(conn: &mut TcpStream, msg: T) -> Result<(), ExternalError>
71where
72 T: Serialize,
73{
74 let bytes = serde_json::to_vec(&msg).unwrap();
75 conn.write_u64(bytes.len().cast_into()).await?;
76 conn.write_all(&bytes).await?;
77 Ok(())
78}
79
80async fn rpc_read<T>(conn: &mut TcpStream) -> Result<T, ExternalError>
82where
83 T: for<'a> Deserialize<'a>,
84{
85 let len = conn.read_u64().await?;
86 let mut bytes = vec![0; len.cast_into()];
87 conn.read_exact(&mut bytes).await?;
88 let resp = serde_json::from_slice(&bytes).unwrap();
89 Ok(resp)
90}
91
92#[derive(Serialize, Deserialize)]
94enum RpcError {
95 Determinate(String),
96 Indeterminate(String),
97}
98
99impl From<ExternalError> for RpcError {
100 fn from(error: ExternalError) -> Self {
101 match error {
102 ExternalError::Determinate(e) => Self::Determinate(e.to_string()),
103 ExternalError::Indeterminate(e) => Self::Indeterminate(e.to_string()),
104 }
105 }
106}
107
108impl From<RpcError> for ExternalError {
109 fn from(error: RpcError) -> Self {
110 match error {
111 RpcError::Determinate(s) => Self::Determinate(Determinate::new(anyhow!(s))),
112 RpcError::Indeterminate(s) => Self::Indeterminate(Indeterminate::new(anyhow!(s))),
113 }
114 }
115}
116
117#[derive(Clone, Debug)]
119pub struct ConsensusConfig {
120 addr: String,
121}
122
123impl ConsensusConfig {
124 pub fn new(url: &SensitiveUrl) -> Self {
126 let addr = format!(
127 "{}:{}",
128 url.host().expect("must have a host"),
129 url.port().expect("must have a port")
130 );
131
132 Self { addr }
133 }
134}
135
136#[derive(Debug)]
138pub struct TurmoilConsensus {
139 addr: String,
140 connections: Mutex<Vec<TcpStream>>,
145}
146
147impl TurmoilConsensus {
148 pub fn open(config: ConsensusConfig) -> Self {
150 Self {
151 addr: config.addr,
152 connections: Mutex::new(Vec::new()),
153 }
154 }
155
156 async fn call<R>(&self, cmd: ConsensusCommand) -> Result<R, ExternalError>
158 where
159 R: for<'a> Deserialize<'a>,
160 {
161 let res = timeout::<_, _, ExternalError>(RPC_TIMEOUT, async {
162 let conn = self.connections.lock().expect("poisoned").pop();
163 let mut conn = match conn {
164 Some(c) => c,
165 None => TcpStream::connect(&self.addr).await?,
166 };
167
168 rpc_write(&mut conn, cmd).await?;
169 let resp: Result<R, RpcError> = rpc_read(&mut conn).await?;
170
171 self.connections.lock().expect("poisoned").push(conn);
172
173 Ok(resp?)
174 })
175 .await;
176
177 match res {
178 Ok(resp) => Ok(resp),
179 Err(TimeoutError::DeadlineElapsed) => Err(ExternalError::new_timeout(Instant::now())),
180 Err(TimeoutError::Inner(e)) => Err(e),
181 }
182 }
183}
184
185#[async_trait]
186impl Consensus for TurmoilConsensus {
187 fn list_keys(&self) -> ResultStream<'_, String> {
188 async_stream::try_stream! {
189 let keys = self.call::<Vec<String>>(ConsensusCommand::ListKeys).await?;
190 for key in keys {
191 yield key;
192 }
193 }
194 .boxed()
195 }
196
197 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
198 self.call(ConsensusCommand::Head { key: key.into() }).await
199 }
200
201 async fn compare_and_set(
202 &self,
203 key: &str,
204 new: VersionedData,
205 ) -> Result<CaSResult, ExternalError> {
206 self.call(ConsensusCommand::CompareAndSet {
207 key: key.into(),
208 new,
209 })
210 .await
211 }
212
213 async fn scan(
214 &self,
215 key: &str,
216 from: SeqNo,
217 limit: usize,
218 ) -> Result<Vec<VersionedData>, ExternalError> {
219 self.call(ConsensusCommand::Scan {
220 key: key.into(),
221 from,
222 limit,
223 })
224 .await
225 }
226
227 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
228 self.call(ConsensusCommand::Truncate {
229 key: key.into(),
230 seqno,
231 })
232 .await
233 }
234}
235
236#[derive(Debug, Serialize, Deserialize)]
237enum ConsensusCommand {
238 ListKeys,
239 Head {
240 key: String,
241 },
242 CompareAndSet {
243 key: String,
244 new: VersionedData,
245 },
246 Scan {
247 key: String,
248 from: SeqNo,
249 limit: usize,
250 },
251 Truncate {
252 key: String,
253 seqno: SeqNo,
254 },
255}
256
257#[derive(Clone, Debug)]
259pub struct ConsensusState(MemConsensus);
260
261impl ConsensusState {
262 pub fn new() -> Self {
264 Self(MemConsensus::default())
265 }
266}
267
268pub async fn serve_consensus(port: u16, state: ConsensusState) -> turmoil::Result {
272 let listener = TcpListener::bind(format!("0.0.0.0:{port}")).await?;
273
274 loop {
275 let (conn, peer) = listener.accept().await?;
276 mz_ore::task::spawn(
277 || format!("turmoil consensus connection: {peer}"),
278 serve_consensus_connection(conn, state.clone()),
279 );
280 }
281}
282
283async fn serve_consensus_connection(
284 mut conn: TcpStream,
285 state: ConsensusState,
286) -> Result<(), ExternalError> {
287 loop {
288 let cmd = rpc_read(&mut conn).await?;
289 match cmd {
290 ConsensusCommand::ListKeys => {
291 let keys = state.0.list_keys().try_collect::<Vec<_>>().await;
292 let keys = keys.map_err(RpcError::from);
293 rpc_write(&mut conn, keys).await?;
294 }
295 ConsensusCommand::Head { key } => {
296 let data = state.0.head(&key).await;
297 let data = data.map_err(RpcError::from);
298 rpc_write(&mut conn, data).await?;
299 }
300 ConsensusCommand::CompareAndSet { key, new } => {
301 let result = state.0.compare_and_set(&key, new).await;
302 let result = result.map_err(RpcError::from);
303 rpc_write(&mut conn, result).await?;
304 }
305 ConsensusCommand::Scan { key, from, limit } => {
306 let data = state.0.scan(&key, from, limit).await;
307 let data = data.map_err(RpcError::from);
308 rpc_write(&mut conn, data).await?;
309 }
310 ConsensusCommand::Truncate { key, seqno } => {
311 let count = state.0.truncate(&key, seqno).await;
312 let count = count.map_err(RpcError::from);
313 rpc_write(&mut conn, count).await?;
314 }
315 }
316 }
317}
318
319#[derive(Clone, Debug)]
320pub struct BlobConfig {
322 addr: String,
323}
324
325impl BlobConfig {
326 pub fn new(url: &SensitiveUrl) -> Self {
328 let addr = format!(
329 "{}:{}",
330 url.host().expect("must have a host"),
331 url.port().expect("must have a port")
332 );
333
334 Self { addr }
335 }
336}
337
338#[derive(Debug)]
340pub struct TurmoilBlob {
341 addr: String,
342 connections: Mutex<Vec<TcpStream>>,
347}
348
349impl TurmoilBlob {
350 pub fn open(config: BlobConfig) -> Self {
352 Self {
353 addr: config.addr,
354 connections: Mutex::new(Vec::new()),
355 }
356 }
357
358 async fn call<R>(&self, cmd: BlobCommand) -> Result<R, ExternalError>
360 where
361 R: for<'a> Deserialize<'a>,
362 {
363 let res = timeout::<_, _, ExternalError>(RPC_TIMEOUT, async {
364 let conn = self.connections.lock().expect("poisoned").pop();
365 let mut conn = match conn {
366 Some(c) => c,
367 None => TcpStream::connect(&self.addr).await?,
368 };
369
370 rpc_write(&mut conn, cmd).await?;
371 let resp: Result<R, RpcError> = rpc_read(&mut conn).await?;
372
373 self.connections.lock().expect("poisoned").push(conn);
374
375 Ok(resp?)
376 })
377 .await;
378
379 match res {
380 Ok(resp) => Ok(resp),
381 Err(TimeoutError::DeadlineElapsed) => Err(anyhow!("timeout").into()),
382 Err(TimeoutError::Inner(e)) => Err(e),
383 }
384 }
385}
386
387#[async_trait]
388impl Blob for TurmoilBlob {
389 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
390 self.call(BlobCommand::Get { key: key.into() }).await
391 }
392
393 async fn list_keys_and_metadata(
394 &self,
395 key_prefix: &str,
396 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
397 ) -> Result<(), ExternalError> {
398 let items = self
399 .call::<Vec<(String, u64)>>(BlobCommand::ListKeysAndMetadata {
400 key_prefix: key_prefix.into(),
401 })
402 .await?;
403
404 for (key, size_in_bytes) in items {
405 f(BlobMetadata {
406 key: &key,
407 size_in_bytes,
408 })
409 }
410
411 Ok(())
412 }
413
414 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
415 self.call(BlobCommand::Set {
416 key: key.into(),
417 value,
418 })
419 .await
420 }
421
422 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
423 self.call(BlobCommand::Delete { key: key.into() }).await
424 }
425
426 async fn restore(&self, key: &str) -> Result<(), ExternalError> {
427 self.call(BlobCommand::Restore { key: key.into() }).await
428 }
429}
430
431#[derive(Debug, Serialize, Deserialize)]
432enum BlobCommand {
433 Get { key: String },
434 ListKeysAndMetadata { key_prefix: String },
435 Set { key: String, value: Bytes },
436 Delete { key: String },
437 Restore { key: String },
438}
439
440#[derive(Clone, Debug)]
442pub struct BlobState(MemBlob);
443
444impl BlobState {
445 pub fn new() -> Self {
447 Self(MemBlob::open(MemBlobConfig::new(true)))
448 }
449}
450
451pub async fn serve_blob(port: u16, state: BlobState) -> turmoil::Result {
455 let listener = TcpListener::bind(format!("0.0.0.0:{port}")).await?;
456
457 loop {
458 let (conn, peer) = listener.accept().await?;
459 mz_ore::task::spawn(
460 || format!("turmoil blob connection: {peer}"),
461 serve_blob_connection(conn, state.clone()),
462 );
463 }
464}
465
466async fn serve_blob_connection(mut conn: TcpStream, state: BlobState) -> Result<(), ExternalError> {
467 loop {
468 let cmd = rpc_read(&mut conn).await?;
469 match cmd {
470 BlobCommand::Get { key } => {
471 let value = state.0.get(&key).await;
472 let value = value.map_err(RpcError::from);
473 rpc_write(&mut conn, value).await?;
474 }
475 BlobCommand::ListKeysAndMetadata { key_prefix } => {
476 let mut items = Vec::new();
477 let mut f = |m: BlobMetadata| {
478 items.push((m.key.to_string(), m.size_in_bytes));
479 };
480 let result = state.0.list_keys_and_metadata(&key_prefix, &mut f).await;
481 let result = result.map(|()| items).map_err(RpcError::from);
482 rpc_write(&mut conn, result).await?;
483 }
484 BlobCommand::Set { key, value } => {
485 let result = state.0.set(&key, value).await;
486 let result = result.map_err(RpcError::from);
487 rpc_write(&mut conn, result).await?;
488 }
489 BlobCommand::Delete { key } => {
490 let size = state.0.delete(&key).await;
491 let size = size.map_err(RpcError::from);
492 rpc_write(&mut conn, size).await?;
493 }
494 BlobCommand::Restore { key } => {
495 let result = state.0.restore(&key).await;
496 let result = result.map_err(RpcError::from);
497 rpc_write(&mut conn, result).await?;
498 }
499 }
500 }
501}