1use std::fmt;
13use std::thread::Thread;
14
15use anyhow::anyhow;
16use async_trait::async_trait;
17use itertools::Itertools;
18use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
19
20use crate::client::{GenericClient, Partitionable, Partitioned};
21
22#[derive(Debug)]
27pub struct LocalClient<C, R> {
28 rx: UnboundedReceiver<R>,
29 tx: UnboundedSender<C>,
30 thread: Thread,
31}
32
33#[async_trait]
34impl<C, R> GenericClient<C, R> for LocalClient<C, R>
35where
36 C: fmt::Debug + Send,
37 R: fmt::Debug + Send,
38{
39 async fn send(&mut self, cmd: C) -> Result<(), anyhow::Error> {
40 self.tx.send(cmd).map_err(|_| anyhow!("receiver dropped"))?;
41 self.thread.unpark();
42
43 Ok(())
44 }
45
46 async fn recv(&mut self) -> Result<Option<R>, anyhow::Error> {
52 Ok(self.rx.recv().await)
54 }
55}
56
57impl<C, R> LocalClient<C, R> {
58 pub fn new(rx: UnboundedReceiver<R>, tx: UnboundedSender<C>, thread: Thread) -> Self {
60 Self { rx, tx, thread }
61 }
62
63 pub fn new_partitioned(
65 rxs: Vec<UnboundedReceiver<R>>,
66 txs: Vec<UnboundedSender<C>>,
67 threads: Vec<Thread>,
68 ) -> Partitioned<Self, C, R>
69 where
70 (C, R): Partitionable<C, R>,
71 {
72 let clients = rxs
73 .into_iter()
74 .zip_eq(txs)
75 .zip_eq(threads)
76 .map(|((rx, tx), thread)| LocalClient::new(rx, tx, thread))
77 .collect();
78 Partitioned::new(clients)
79 }
80}
81
82impl<C, R> Drop for LocalClient<C, R> {
83 fn drop(&mut self) {
84 self.thread.unpark();
85 }
86}