mz_service/
local.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Process-local transport for the [client](crate::client) module.
11
12use std::fmt;
13use std::thread::Thread;
14
15use async_trait::async_trait;
16use crossbeam_channel::Sender;
17use itertools::Itertools;
18use tokio::sync::mpsc::UnboundedReceiver;
19
20use crate::client::{GenericClient, Partitionable, Partitioned};
21
22/// A client to a thread in the same process.
23///
24/// The thread is unparked on every call to [`send`](LocalClient::send) and on
25/// `Drop`.
26#[derive(Debug)]
27pub struct LocalClient<C, R> {
28    rx: UnboundedReceiver<R>,
29    tx: Sender<C>,
30    thread: Thread,
31}
32
33#[async_trait]
34impl<C, R> GenericClient<C, R> for LocalClient<C, R>
35where
36    C: fmt::Debug + Send,
37    R: fmt::Debug + Send,
38{
39    async fn send(&mut self, cmd: C) -> Result<(), anyhow::Error> {
40        self.tx
41            .send(cmd)
42            .expect("worker command receiver should not drop first");
43
44        self.thread.unpark();
45
46        Ok(())
47    }
48
49    /// # Cancel safety
50    ///
51    /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
52    /// statement and some other branch completes first, it is guaranteed that no messages were
53    /// received by this client.
54    async fn recv(&mut self) -> Result<Option<R>, anyhow::Error> {
55        // `mpsc::UnboundedReceiver::recv` is documented as cancel safe.
56        Ok(self.rx.recv().await)
57    }
58}
59
60impl<C, R> LocalClient<C, R> {
61    /// Create a new instance of [`LocalClient`] from its parts.
62    pub fn new(rx: UnboundedReceiver<R>, tx: Sender<C>, thread: Thread) -> Self {
63        Self { rx, tx, thread }
64    }
65
66    /// Create a new partitioned local client from parts for each client.
67    pub fn new_partitioned(
68        rxs: Vec<UnboundedReceiver<R>>,
69        txs: Vec<Sender<C>>,
70        threads: Vec<Thread>,
71    ) -> Partitioned<Self, C, R>
72    where
73        (C, R): Partitionable<C, R>,
74    {
75        let clients = rxs
76            .into_iter()
77            .zip_eq(txs)
78            .zip_eq(threads)
79            .map(|((rx, tx), thread)| LocalClient::new(rx, tx, thread))
80            .collect();
81        Partitioned::new(clients)
82    }
83}
84
85impl<C, R> Drop for LocalClient<C, R> {
86    fn drop(&mut self) {
87        self.thread.unpark();
88    }
89}