1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! Process-local transport for the [client](crate::client) module.
1112use std::fmt;
13use std::thread::Thread;
1415use async_trait::async_trait;
16use crossbeam_channel::Sender;
17use itertools::Itertools;
18use tokio::sync::mpsc::UnboundedReceiver;
1920use crate::client::{GenericClient, Partitionable, Partitioned};
2122/// A client to a thread in the same process.
23///
24/// The thread is unparked on every call to [`send`](LocalClient::send) and on
25/// `Drop`.
26#[derive(Debug)]
27pub struct LocalClient<C, R> {
28 rx: UnboundedReceiver<R>,
29 tx: Sender<C>,
30 thread: Thread,
31}
3233#[async_trait]
34impl<C, R> GenericClient<C, R> for LocalClient<C, R>
35where
36C: fmt::Debug + Send,
37 R: fmt::Debug + Send,
38{
39async fn send(&mut self, cmd: C) -> Result<(), anyhow::Error> {
40self.tx
41 .send(cmd)
42 .expect("worker command receiver should not drop first");
4344self.thread.unpark();
4546Ok(())
47 }
4849/// # Cancel safety
50 ///
51 /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
52 /// statement and some other branch completes first, it is guaranteed that no messages were
53 /// received by this client.
54async fn recv(&mut self) -> Result<Option<R>, anyhow::Error> {
55// `mpsc::UnboundedReceiver::recv` is documented as cancel safe.
56Ok(self.rx.recv().await)
57 }
58}
5960impl<C, R> LocalClient<C, R> {
61/// Create a new instance of [`LocalClient`] from its parts.
62pub fn new(rx: UnboundedReceiver<R>, tx: Sender<C>, thread: Thread) -> Self {
63Self { rx, tx, thread }
64 }
6566/// Create a new partitioned local client from parts for each client.
67pub fn new_partitioned(
68 rxs: Vec<UnboundedReceiver<R>>,
69 txs: Vec<Sender<C>>,
70 threads: Vec<Thread>,
71 ) -> Partitioned<Self, C, R>
72where
73(C, R): Partitionable<C, R>,
74 {
75let clients = rxs
76 .into_iter()
77 .zip_eq(txs)
78 .zip_eq(threads)
79 .map(|((rx, tx), thread)| LocalClient::new(rx, tx, thread))
80 .collect();
81 Partitioned::new(clients)
82 }
83}
8485impl<C, R> Drop for LocalClient<C, R> {
86fn drop(&mut self) {
87self.thread.unpark();
88 }
89}