mz_service/
local.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Process-local transport for the [client](crate::client) module.
11
12use std::fmt;
13use std::thread::Thread;
14
15use anyhow::anyhow;
16use async_trait::async_trait;
17use crossbeam_channel::Sender;
18use itertools::Itertools;
19use tokio::sync::mpsc::UnboundedReceiver;
20
21use crate::client::{GenericClient, Partitionable, Partitioned};
22
23/// A client to a thread in the same process.
24///
25/// The thread is unparked on every call to [`send`](LocalClient::send) and on
26/// `Drop`.
27#[derive(Debug)]
28pub struct LocalClient<C, R> {
29    rx: UnboundedReceiver<R>,
30    tx: Sender<C>,
31    thread: Thread,
32}
33
34#[async_trait]
35impl<C, R> GenericClient<C, R> for LocalClient<C, R>
36where
37    C: fmt::Debug + Send,
38    R: fmt::Debug + Send,
39{
40    async fn send(&mut self, cmd: C) -> Result<(), anyhow::Error> {
41        self.tx.send(cmd).map_err(|_| anyhow!("receiver dropped"))?;
42        self.thread.unpark();
43
44        Ok(())
45    }
46
47    /// # Cancel safety
48    ///
49    /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
50    /// statement and some other branch completes first, it is guaranteed that no messages were
51    /// received by this client.
52    async fn recv(&mut self) -> Result<Option<R>, anyhow::Error> {
53        // `mpsc::UnboundedReceiver::recv` is documented as cancel safe.
54        Ok(self.rx.recv().await)
55    }
56}
57
58impl<C, R> LocalClient<C, R> {
59    /// Create a new instance of [`LocalClient`] from its parts.
60    pub fn new(rx: UnboundedReceiver<R>, tx: Sender<C>, thread: Thread) -> Self {
61        Self { rx, tx, thread }
62    }
63
64    /// Create a new partitioned local client from parts for each client.
65    pub fn new_partitioned(
66        rxs: Vec<UnboundedReceiver<R>>,
67        txs: Vec<Sender<C>>,
68        threads: Vec<Thread>,
69    ) -> Partitioned<Self, C, R>
70    where
71        (C, R): Partitionable<C, R>,
72    {
73        let clients = rxs
74            .into_iter()
75            .zip_eq(txs)
76            .zip_eq(threads)
77            .map(|((rx, tx), thread)| LocalClient::new(rx, tx, thread))
78            .collect();
79        Partitioned::new(clients)
80    }
81}
82
83impl<C, R> Drop for LocalClient<C, R> {
84    fn drop(&mut self) {
85        self.thread.unpark();
86    }
87}