1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Process-local transport for the [client](crate::client) module.
use std::fmt;
use std::thread::Thread;
use async_trait::async_trait;
use crossbeam_channel::Sender;
use itertools::Itertools;
use timely::scheduling::SyncActivator;
use tokio::sync::mpsc::UnboundedReceiver;
use crate::client::{GenericClient, Partitionable, Partitioned};
pub trait Activatable {
fn activate(&self);
}
impl Activatable for SyncActivator {
fn activate(&self) {
self.activate().unwrap()
}
}
impl Activatable for Thread {
fn activate(&self) {
self.unpark()
}
}
/// A client to a thread in the same process.
///
/// The thread is unparked on every call to [`send`](LocalClient::send) and on
/// `Drop`.
#[derive(Debug)]
pub struct LocalClient<C, R, A: Activatable> {
rx: UnboundedReceiver<R>,
tx: Sender<C>,
tx_activator: A,
}
#[async_trait]
impl<C, R, A> GenericClient<C, R> for LocalClient<C, R, A>
where
C: fmt::Debug + Send,
R: fmt::Debug + Send,
A: fmt::Debug + Activatable + Send,
{
async fn send(&mut self, cmd: C) -> Result<(), anyhow::Error> {
self.tx
.send(cmd)
.expect("worker command receiver should not drop first");
self.tx_activator.activate();
Ok(())
}
/// # Cancel safety
///
/// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
/// statement and some other branch completes first, it is guaranteed that no messages were
/// received by this client.
async fn recv(&mut self) -> Result<Option<R>, anyhow::Error> {
// `mpsc::UnboundedReceiver::recv` is documented as cancel safe.
Ok(self.rx.recv().await)
}
}
impl<C, R, A: Activatable> LocalClient<C, R, A> {
/// Create a new instance of [`LocalClient`] from its parts.
pub fn new(rx: UnboundedReceiver<R>, tx: Sender<C>, tx_activator: A) -> Self {
Self {
rx,
tx,
tx_activator,
}
}
/// Create a new partitioned local client from parts for each client.
pub fn new_partitioned(
rxs: Vec<UnboundedReceiver<R>>,
txs: Vec<Sender<C>>,
tx_activators: Vec<A>,
) -> Partitioned<Self, C, R>
where
(C, R): Partitionable<C, R>,
{
let clients = rxs
.into_iter()
.zip_eq(txs)
.zip_eq(tx_activators)
.map(|((rx, tx), tx_activator)| LocalClient::new(rx, tx, tx_activator))
.collect();
Partitioned::new(clients)
}
}
// We implement `Drop` so that we can wake each of the threads and have them
// notice the drop.
impl<C, R, A: Activatable> Drop for LocalClient<C, R, A> {
fn drop(&mut self) {
// Drop the thread handle.
let (tx, _rx) = crossbeam_channel::unbounded();
self.tx = tx;
// Unpark the thread once the handle is dropped, so that it can observe
// the emptiness.
self.tx_activator.activate();
}
}