1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Traits for client–server communication independent of transport layer.
//!
//! These traits are designed for servers where where commands must be sharded
//! among several worker threads or processes.
use std::fmt;
use std::pin::Pin;
use async_trait::async_trait;
use futures::stream::{Stream, StreamExt};
use tokio_stream::StreamMap;
use tracing::trace;
/// A generic client to a server that receives commands and asynchronously
/// produces responses.
#[async_trait]
pub trait GenericClient<C, R>: fmt::Debug + Send {
/// Sends a command to the dataflow server.
///
/// The command can error for various reasons.
async fn send(&mut self, cmd: C) -> Result<(), anyhow::Error>;
/// Receives the next response from the dataflow server.
///
/// This method blocks until the next response is available.
///
/// A return value of `Ok(Some(_))` transmits a response.
///
/// A return value of `Ok(None)` indicates graceful termination of the
/// connection. The owner of the client should not call `recv` again.
///
/// A return value of `Err(_)` indicates an unrecoverable error. After
/// observing an error, the owner of the client must drop the client.
///
/// Implementations of this method **must** be [cancellation safe]. That
/// means that work must not be lost if the future returned by this method
/// is dropped.
///
/// [cancellation safe]: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
async fn recv(&mut self) -> Result<Option<R>, anyhow::Error>;
/// Returns an adapter that treats the client as a stream.
///
/// The stream produces the responses that would be produced by repeated
/// calls to `recv`.
///
/// # Cancel safety
///
/// The returned stream is cancel safe. If `stream.next()` is used as the event in a
/// [`tokio::select!`] statement and some other branch completes first, it is guaranteed that
/// no messages were received by this client.
fn as_stream<'a>(
&'a mut self,
) -> Pin<Box<dyn Stream<Item = Result<R, anyhow::Error>> + Send + 'a>>
where
R: Send + 'a,
{
Box::pin(async_stream::stream!({
loop {
// `GenericClient::recv` is required to be cancel safe.
match self.recv().await {
Ok(Some(response)) => yield Ok(response),
Err(error) => yield Err(error),
Ok(None) => {
return;
}
}
}
}))
}
}
#[async_trait]
impl<C, R> GenericClient<C, R> for Box<dyn GenericClient<C, R>>
where
C: Send,
{
async fn send(&mut self, cmd: C) -> Result<(), anyhow::Error> {
(**self).send(cmd).await
}
/// # Cancel safety
///
/// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
/// statement and some other branch completes first, it is guaranteed that no messages were
/// received by this client.
async fn recv(&mut self) -> Result<Option<R>, anyhow::Error> {
// `GenericClient::recv` is required to be cancel safe.
(**self).recv().await
}
}
/// A client whose implementation is partitioned across a number of other
/// clients.
///
/// Such a client needs to broadcast (partitioned) commands to all of its
/// clients, and await responses from each of the client partitions before it
/// can respond.
#[derive(Debug)]
pub struct Partitioned<P, C, R>
where
(C, R): Partitionable<C, R>,
{
/// The individual partitions representing per-worker clients.
pub parts: Vec<P>,
/// The partitioned state.
state: <(C, R) as Partitionable<C, R>>::PartitionedState,
}
impl<P, C, R> Partitioned<P, C, R>
where
(C, R): Partitionable<C, R>,
{
/// Create a client partitioned across multiple client shards.
pub fn new(parts: Vec<P>) -> Self {
Self {
state: <(C, R) as Partitionable<C, R>>::new(parts.len()),
parts,
}
}
}
#[async_trait]
impl<P, C, R> GenericClient<C, R> for Partitioned<P, C, R>
where
P: GenericClient<C, R>,
(C, R): Partitionable<C, R>,
C: fmt::Debug + Send,
R: fmt::Debug + Send,
{
async fn send(&mut self, cmd: C) -> Result<(), anyhow::Error> {
trace!(command = ?cmd, "splitting command");
let cmd_parts = self.state.split_command(cmd);
for (index, (shard, cmd_part)) in self.parts.iter_mut().zip(cmd_parts).enumerate() {
if let Some(cmd) = cmd_part {
trace!(shard = ?index, command = ?cmd, "sending command");
shard.send(cmd).await?;
}
}
Ok(())
}
/// # Cancel safety
///
/// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
/// statement and some other branch completes first, it is guaranteed that no messages were
/// received by this client.
async fn recv(&mut self) -> Result<Option<R>, anyhow::Error> {
let mut stream: StreamMap<_, _> = self
.parts
.iter_mut()
.map(|shard| shard.as_stream())
.enumerate()
.collect();
// `stream` is a cancel safe stream: It only awaits streams created with
// `GenericClient::as_stream`, which is documented to produce cancel safe streams.
// Thus no messages are lost if `stream` is dropped while awaiting its next element.
while let Some((index, response)) = stream.next().await {
match response {
Err(e) => {
return Err(e);
}
Ok(response) => {
trace!(shard = ?index, response = ?response, "received response");
if let Some(response) = self.state.absorb_response(index, response) {
trace!(response = ?response, "returning response");
return response.map(Some);
}
}
}
}
// Indicate completion of the communication.
Ok(None)
}
}
/// A trait for command–response pairs that can be partitioned across multiple
/// workers via [`Partitioned`].
pub trait Partitionable<C, R> {
/// The type which functions as the state machine for the partitioning.
type PartitionedState: PartitionedState<C, R>;
/// Construct a [`PartitionedState`] for the command–response pair.
fn new(parts: usize) -> Self::PartitionedState;
}
/// A state machine for a partitioned client that partitions commands across and
/// amalgamates responses from multiple partitions.
pub trait PartitionedState<C, R>: fmt::Debug + Send {
/// Splits a command into multiple partitions.
fn split_command(&mut self, command: C) -> Vec<Option<C>>;
/// Absorbs a response from a single partition.
///
/// If responses from all partitions have been absorbed, returns an
/// amalgamated response.
fn absorb_response(&mut self, shard_id: usize, response: R)
-> Option<Result<R, anyhow::Error>>;
}