Skip to main content

mz_service/
local.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Process-local transport for the [client](crate::client) module.
11
12use std::fmt;
13use std::thread::Thread;
14
15use anyhow::anyhow;
16use async_trait::async_trait;
17use itertools::Itertools;
18use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
19
20use crate::client::{GenericClient, Partitionable, Partitioned};
21
22/// A client to a thread in the same process.
23///
24/// The thread is unparked on every call to [`send`](LocalClient::send) and on
25/// `Drop`.
26#[derive(Debug)]
27pub struct LocalClient<C, R> {
28    rx: UnboundedReceiver<R>,
29    tx: UnboundedSender<C>,
30    thread: Thread,
31}
32
33#[async_trait]
34impl<C, R> GenericClient<C, R> for LocalClient<C, R>
35where
36    C: fmt::Debug + Send,
37    R: fmt::Debug + Send,
38{
39    async fn send(&mut self, cmd: C) -> Result<(), anyhow::Error> {
40        self.tx.send(cmd).map_err(|_| anyhow!("receiver dropped"))?;
41        self.thread.unpark();
42
43        Ok(())
44    }
45
46    /// # Cancel safety
47    ///
48    /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
49    /// statement and some other branch completes first, it is guaranteed that no messages were
50    /// received by this client.
51    async fn recv(&mut self) -> Result<Option<R>, anyhow::Error> {
52        // `mpsc::UnboundedReceiver::recv` is documented as cancel safe.
53        Ok(self.rx.recv().await)
54    }
55}
56
57impl<C, R> LocalClient<C, R> {
58    /// Create a new instance of [`LocalClient`] from its parts.
59    pub fn new(rx: UnboundedReceiver<R>, tx: UnboundedSender<C>, thread: Thread) -> Self {
60        Self { rx, tx, thread }
61    }
62
63    /// Create a new partitioned local client from parts for each client.
64    pub fn new_partitioned(
65        rxs: Vec<UnboundedReceiver<R>>,
66        txs: Vec<UnboundedSender<C>>,
67        threads: Vec<Thread>,
68    ) -> Partitioned<Self, C, R>
69    where
70        (C, R): Partitionable<C, R>,
71    {
72        let clients = rxs
73            .into_iter()
74            .zip_eq(txs)
75            .zip_eq(threads)
76            .map(|((rx, tx), thread)| LocalClient::new(rx, tx, thread))
77            .collect();
78        Partitioned::new(clients)
79    }
80}
81
82impl<C, R> Drop for LocalClient<C, R> {
83    fn drop(&mut self) {
84        self.thread.unpark();
85    }
86}