1use std::fmt;
13use std::thread::Thread;
14
15use anyhow::anyhow;
16use async_trait::async_trait;
17use crossbeam_channel::Sender;
18use itertools::Itertools;
19use tokio::sync::mpsc::UnboundedReceiver;
20
21use crate::client::{GenericClient, Partitionable, Partitioned};
22
23#[derive(Debug)]
28pub struct LocalClient<C, R> {
29 rx: UnboundedReceiver<R>,
30 tx: Sender<C>,
31 thread: Thread,
32}
33
34#[async_trait]
35impl<C, R> GenericClient<C, R> for LocalClient<C, R>
36where
37 C: fmt::Debug + Send,
38 R: fmt::Debug + Send,
39{
40 async fn send(&mut self, cmd: C) -> Result<(), anyhow::Error> {
41 self.tx.send(cmd).map_err(|_| anyhow!("receiver dropped"))?;
42 self.thread.unpark();
43
44 Ok(())
45 }
46
47 async fn recv(&mut self) -> Result<Option<R>, anyhow::Error> {
53 Ok(self.rx.recv().await)
55 }
56}
57
58impl<C, R> LocalClient<C, R> {
59 pub fn new(rx: UnboundedReceiver<R>, tx: Sender<C>, thread: Thread) -> Self {
61 Self { rx, tx, thread }
62 }
63
64 pub fn new_partitioned(
66 rxs: Vec<UnboundedReceiver<R>>,
67 txs: Vec<Sender<C>>,
68 threads: Vec<Thread>,
69 ) -> Partitioned<Self, C, R>
70 where
71 (C, R): Partitionable<C, R>,
72 {
73 let clients = rxs
74 .into_iter()
75 .zip_eq(txs)
76 .zip_eq(threads)
77 .map(|((rx, tx), thread)| LocalClient::new(rx, tx, thread))
78 .collect();
79 Partitioned::new(clients)
80 }
81}
82
83impl<C, R> Drop for LocalClient<C, R> {
84 fn drop(&mut self) {
85 self.thread.unpark();
86 }
87}