use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::net::SocketAddr;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use anyhow::{anyhow, Error};
use async_trait::async_trait;
use bytes::Bytes;
use futures::Stream;
use mz_dyncfg::Config;
use mz_ore::cast::CastFrom;
use mz_ore::collections::{HashMap, HashSet};
use mz_ore::metrics::MetricsRegistry;
use mz_ore::retry::RetryResult;
use mz_ore::task::JoinHandle;
use mz_persist::location::VersionedData;
use mz_proto::{ProtoType, RustType};
use prost::Message;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::Sender;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tokio_stream::wrappers::{BroadcastStream, ReceiverStream};
use tokio_stream::StreamExt;
use tonic::metadata::{AsciiMetadataKey, AsciiMetadataValue, MetadataMap};
use tonic::transport::Endpoint;
use tonic::{Extensions, Request, Response, Status, Streaming};
use tracing::{debug, error, info, info_span, warn, Instrument};
use crate::cache::{DynState, StateCache};
use crate::cfg::PersistConfig;
use crate::internal::metrics::{PubSubClientCallMetrics, PubSubServerMetrics};
use crate::internal::service::proto_persist_pub_sub_client::ProtoPersistPubSubClient;
use crate::internal::service::proto_persist_pub_sub_server::ProtoPersistPubSubServer;
use crate::internal::service::{
proto_persist_pub_sub_server, proto_pub_sub_message, ProtoPubSubMessage, ProtoPushDiff,
ProtoSubscribe, ProtoUnsubscribe,
};
use crate::metrics::Metrics;
use crate::ShardId;
pub(crate) const PUBSUB_CLIENT_ENABLED: Config<bool> = Config::new(
"persist_pubsub_client_enabled",
true,
"Whether to connect to the Persist PubSub service.",
);
pub(crate) const PUBSUB_PUSH_DIFF_ENABLED: Config<bool> = Config::new(
"persist_pubsub_push_diff_enabled",
true,
"Whether to push state diffs to Persist PubSub.",
);
pub(crate) const PUBSUB_SAME_PROCESS_DELEGATE_ENABLED: Config<bool> = Config::new(
"persist_pubsub_same_process_delegate_enabled",
true,
"Whether to push state diffs to Persist PubSub on the same process.",
);
pub trait PersistPubSubClient {
fn connect(
pubsub_config: PersistPubSubClientConfig,
metrics: Arc<Metrics>,
) -> PubSubClientConnection;
}
#[derive(Debug)]
pub struct PubSubClientConnection {
pub sender: Arc<dyn PubSubSender>,
pub receiver: Box<dyn PubSubReceiver>,
}
impl PubSubClientConnection {
pub fn new(sender: Arc<dyn PubSubSender>, receiver: Box<dyn PubSubReceiver>) -> Self {
Self { sender, receiver }
}
pub fn noop() -> Self {
Self {
sender: Arc::new(NoopPubSubSender),
receiver: Box::new(futures::stream::empty()),
}
}
}
pub trait PubSubSender: std::fmt::Debug + Send + Sync {
fn push_diff(&self, shard_id: &ShardId, diff: &VersionedData);
fn subscribe(self: Arc<Self>, shard_id: &ShardId) -> Arc<ShardSubscriptionToken>;
}
trait PubSubSenderInternal: Debug + Send + Sync {
fn push_diff(&self, shard_id: &ShardId, diff: &VersionedData);
fn subscribe(&self, shard_id: &ShardId);
fn unsubscribe(&self, shard_id: &ShardId);
}
pub trait PubSubReceiver:
Stream<Item = ProtoPubSubMessage> + Send + Unpin + std::fmt::Debug
{
}
impl<T> PubSubReceiver for T where
T: Stream<Item = ProtoPubSubMessage> + Send + Unpin + std::fmt::Debug
{
}
pub struct ShardSubscriptionToken {
pub(crate) shard_id: ShardId,
sender: Arc<dyn PubSubSenderInternal>,
}
impl Debug for ShardSubscriptionToken {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let ShardSubscriptionToken {
shard_id,
sender: _sender,
} = self;
write!(f, "ShardSubscriptionToken({})", shard_id)
}
}
impl Drop for ShardSubscriptionToken {
fn drop(&mut self) {
self.sender.unsubscribe(&self.shard_id);
}
}
pub const PERSIST_PUBSUB_CALLER_KEY: &str = "persist-pubsub-caller-id";
#[derive(Debug)]
pub struct PersistPubSubClientConfig {
pub url: String,
pub caller_id: String,
pub persist_cfg: PersistConfig,
}
#[derive(Debug)]
pub struct GrpcPubSubClient;
impl GrpcPubSubClient {
async fn reconnect_to_server_forever(
send_requests: tokio::sync::broadcast::Sender<ProtoPubSubMessage>,
receiver_input: &tokio::sync::mpsc::Sender<ProtoPubSubMessage>,
sender: Arc<SubscriptionTrackingSender>,
metadata: MetadataMap,
config: PersistPubSubClientConfig,
metrics: Arc<Metrics>,
) {
config.persist_cfg.configs_synced_once().await;
let mut is_first_connection_attempt = true;
loop {
metrics.pubsub_client.grpc_connection.connected.set(0);
if !PUBSUB_CLIENT_ENABLED.get(&config.persist_cfg) {
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
if is_first_connection_attempt {
is_first_connection_attempt = false;
} else {
tokio::time::sleep(config.persist_cfg.pubsub_reconnect_backoff).await;
}
info!("Connecting to Persist PubSub: {}", config.url);
let client = mz_ore::retry::Retry::default()
.clamp_backoff(config.persist_cfg.pubsub_connect_max_backoff)
.retry_async(|_| async {
metrics
.pubsub_client
.grpc_connection
.connect_call_attempt_count
.inc();
let endpoint = match Endpoint::from_str(&config.url) {
Ok(endpoint) => endpoint,
Err(err) => return RetryResult::FatalErr(err),
};
ProtoPersistPubSubClient::connect(
endpoint
.connect_timeout(config.persist_cfg.pubsub_connect_attempt_timeout)
.timeout(config.persist_cfg.pubsub_request_timeout),
)
.await
.into()
})
.await;
let mut client = match client {
Ok(client) => client,
Err(err) => {
error!("fatal error connecting to persist pubsub: {:?}", err);
return;
}
};
metrics
.pubsub_client
.grpc_connection
.connection_established_count
.inc();
metrics.pubsub_client.grpc_connection.connected.set(1);
info!("Connected to Persist PubSub: {}", config.url);
let mut broadcast = BroadcastStream::new(send_requests.subscribe());
let broadcast_errors = metrics
.pubsub_client
.grpc_connection
.broadcast_recv_lagged_count
.clone();
let pubsub_request = Request::from_parts(
metadata.clone(),
Extensions::default(),
async_stream::stream! {
while let Some(message) = broadcast.next().await {
debug!("sending pubsub message: {:?}", message);
match message {
Ok(message) => yield message,
Err(BroadcastStreamRecvError::Lagged(i)) => {
broadcast_errors.inc_by(i);
}
}
}
},
);
let responses = match client.pub_sub(pubsub_request).await {
Ok(response) => response.into_inner(),
Err(err) => {
warn!("pub_sub rpc error: {:?}", err);
continue;
}
};
sender.reconnect();
let stream_completed = GrpcPubSubClient::consume_grpc_stream(
responses,
receiver_input,
&config,
metrics.as_ref(),
)
.await;
match stream_completed {
Ok(_) => continue,
Err(err) => {
warn!("shutting down connection loop to Persist PubSub: {}", err);
return;
}
}
}
}
async fn consume_grpc_stream(
mut responses: Streaming<ProtoPubSubMessage>,
receiver_input: &Sender<ProtoPubSubMessage>,
config: &PersistPubSubClientConfig,
metrics: &Metrics,
) -> Result<(), Error> {
loop {
if !PUBSUB_CLIENT_ENABLED.get(&config.persist_cfg) {
return Ok(());
}
debug!("awaiting next pubsub response");
match responses.next().await {
Some(Ok(message)) => {
debug!("received pubsub message: {:?}", message);
match receiver_input.send(message).await {
Ok(_) => {}
Err(err) => {
return Err(anyhow!("closing pubsub grpc client connection: {}", err));
}
}
}
Some(Err(err)) => {
metrics.pubsub_client.grpc_connection.grpc_error_count.inc();
warn!("pubsub client error: {:?}", err);
return Ok(());
}
None => return Ok(()),
}
}
}
}
impl PersistPubSubClient for GrpcPubSubClient {
fn connect(config: PersistPubSubClientConfig, metrics: Arc<Metrics>) -> PubSubClientConnection {
let (send_requests, _) =
tokio::sync::broadcast::channel(config.persist_cfg.pubsub_client_sender_channel_size);
let (receiver_input, receiver_output) =
tokio::sync::mpsc::channel(config.persist_cfg.pubsub_client_receiver_channel_size);
let sender = Arc::new(SubscriptionTrackingSender::new(Arc::new(
GrpcPubSubSender {
metrics: Arc::clone(&metrics),
requests: send_requests.clone(),
},
)));
let pubsub_sender = Arc::clone(&sender);
mz_ore::task::spawn(
|| "persist::rpc::client::connection".to_string(),
async move {
let mut metadata = MetadataMap::new();
metadata.insert(
AsciiMetadataKey::from_static(PERSIST_PUBSUB_CALLER_KEY),
AsciiMetadataValue::try_from(&config.caller_id)
.unwrap_or_else(|_| AsciiMetadataValue::from_static("unknown")),
);
GrpcPubSubClient::reconnect_to_server_forever(
send_requests,
&receiver_input,
pubsub_sender,
metadata,
config,
metrics,
)
.await;
},
);
PubSubClientConnection {
sender,
receiver: Box::new(ReceiverStream::new(receiver_output)),
}
}
}
struct GrpcPubSubSender {
metrics: Arc<Metrics>,
requests: tokio::sync::broadcast::Sender<ProtoPubSubMessage>,
}
impl Debug for GrpcPubSubSender {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let GrpcPubSubSender {
metrics: _metrics,
requests: _requests,
} = self;
write!(f, "GrpcPubSubSender")
}
}
impl GrpcPubSubSender {
fn send(&self, message: proto_pub_sub_message::Message, metrics: &PubSubClientCallMetrics) {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("failed to get millis since epoch");
let message = ProtoPubSubMessage {
timestamp: Some(now.into_proto()),
message: Some(message),
};
let size = message.encoded_len();
match self.requests.send(message) {
Ok(_) => {
metrics.succeeded.inc();
metrics.bytes_sent.inc_by(u64::cast_from(size));
}
Err(err) => {
metrics.failed.inc();
debug!("error sending client message: {}", err);
}
}
}
}
impl PubSubSenderInternal for GrpcPubSubSender {
fn push_diff(&self, shard_id: &ShardId, diff: &VersionedData) {
self.send(
proto_pub_sub_message::Message::PushDiff(ProtoPushDiff {
shard_id: shard_id.into_proto(),
seqno: diff.seqno.into_proto(),
diff: diff.data.clone(),
}),
&self.metrics.pubsub_client.sender.push,
)
}
fn subscribe(&self, shard_id: &ShardId) {
self.send(
proto_pub_sub_message::Message::Subscribe(ProtoSubscribe {
shard_id: shard_id.into_proto(),
}),
&self.metrics.pubsub_client.sender.subscribe,
)
}
fn unsubscribe(&self, shard_id: &ShardId) {
self.send(
proto_pub_sub_message::Message::Unsubscribe(ProtoUnsubscribe {
shard_id: shard_id.into_proto(),
}),
&self.metrics.pubsub_client.sender.unsubscribe,
)
}
}
#[derive(Debug)]
struct SubscriptionTrackingSender {
delegate: Arc<dyn PubSubSenderInternal>,
subscribes: Arc<Mutex<BTreeMap<ShardId, Weak<ShardSubscriptionToken>>>>,
}
impl SubscriptionTrackingSender {
fn new(sender: Arc<dyn PubSubSenderInternal>) -> Self {
Self {
delegate: sender,
subscribes: Default::default(),
}
}
fn reconnect(&self) {
let mut subscribes = self.subscribes.lock().expect("lock");
subscribes.retain(|shard_id, token| {
if token.upgrade().is_none() {
false
} else {
debug!("reconnecting to: {}", shard_id);
self.delegate.subscribe(shard_id);
true
}
})
}
}
impl PubSubSender for SubscriptionTrackingSender {
fn push_diff(&self, shard_id: &ShardId, diff: &VersionedData) {
self.delegate.push_diff(shard_id, diff)
}
fn subscribe(self: Arc<Self>, shard_id: &ShardId) -> Arc<ShardSubscriptionToken> {
let mut subscribes = self.subscribes.lock().expect("lock");
if let Some(token) = subscribes.get(shard_id) {
match token.upgrade() {
None => assert!(subscribes.remove(shard_id).is_some()),
Some(token) => {
return Arc::clone(&token);
}
}
}
let pubsub_sender = Arc::clone(&self.delegate);
let token = Arc::new(ShardSubscriptionToken {
shard_id: *shard_id,
sender: pubsub_sender,
});
assert!(subscribes
.insert(*shard_id, Arc::downgrade(&token))
.is_none());
self.delegate.subscribe(shard_id);
token
}
}
#[derive(Debug)]
pub struct MetricsSameProcessPubSubSender {
delegate_subscribe: bool,
metrics: Arc<Metrics>,
delegate: Arc<dyn PubSubSender>,
}
impl MetricsSameProcessPubSubSender {
pub fn new(
cfg: &PersistConfig,
pubsub_sender: Arc<dyn PubSubSender>,
metrics: Arc<Metrics>,
) -> Self {
Self {
delegate_subscribe: PUBSUB_SAME_PROCESS_DELEGATE_ENABLED.get(cfg),
delegate: pubsub_sender,
metrics,
}
}
}
impl PubSubSender for MetricsSameProcessPubSubSender {
fn push_diff(&self, shard_id: &ShardId, diff: &VersionedData) {
self.delegate.push_diff(shard_id, diff);
self.metrics.pubsub_client.sender.push.succeeded.inc();
}
fn subscribe(self: Arc<Self>, shard_id: &ShardId) -> Arc<ShardSubscriptionToken> {
if self.delegate_subscribe {
let delegate = Arc::clone(&self.delegate);
delegate.subscribe(shard_id)
} else {
Arc::new(ShardSubscriptionToken {
shard_id: *shard_id,
sender: Arc::new(NoopPubSubSender),
})
}
}
}
#[derive(Debug)]
pub(crate) struct NoopPubSubSender;
impl PubSubSenderInternal for NoopPubSubSender {
fn push_diff(&self, _shard_id: &ShardId, _diff: &VersionedData) {}
fn subscribe(&self, _shard_id: &ShardId) {}
fn unsubscribe(&self, _shard_id: &ShardId) {}
}
impl PubSubSender for NoopPubSubSender {
fn push_diff(&self, _shard_id: &ShardId, _diff: &VersionedData) {}
fn subscribe(self: Arc<Self>, shard_id: &ShardId) -> Arc<ShardSubscriptionToken> {
Arc::new(ShardSubscriptionToken {
shard_id: *shard_id,
sender: self,
})
}
}
pub(crate) fn subscribe_state_cache_to_pubsub(
cache: Arc<StateCache>,
mut pubsub_receiver: Box<dyn PubSubReceiver>,
) -> JoinHandle<()> {
let mut state_refs: HashMap<ShardId, Weak<dyn DynState>> = HashMap::new();
let receiver_metrics = cache.metrics.pubsub_client.receiver.clone();
mz_ore::task::spawn(
|| "persist::rpc::client::state_cache_diff_apply",
async move {
while let Some(msg) = pubsub_receiver.next().await {
match msg.message {
Some(proto_pub_sub_message::Message::PushDiff(diff)) => {
receiver_metrics.push_received.inc();
let shard_id = diff.shard_id.into_rust().expect("valid shard id");
let diff = VersionedData {
seqno: diff.seqno.into_rust().expect("valid SeqNo"),
data: diff.diff,
};
debug!(
"applying pubsub diff {} {} {}",
shard_id,
diff.seqno,
diff.data.len()
);
let mut pushed_diff = false;
if let Some(state_ref) = state_refs.get(&shard_id) {
if let Some(state) = state_ref.upgrade() {
state.push_diff(diff.clone());
pushed_diff = true;
receiver_metrics.state_pushed_diff_fast_path.inc();
}
}
if !pushed_diff {
let state_ref = cache.get_state_weak(&shard_id);
match state_ref {
None => {
state_refs.remove(&shard_id);
}
Some(state_ref) => {
if let Some(state) = state_ref.upgrade() {
state.push_diff(diff);
pushed_diff = true;
state_refs.insert(shard_id, state_ref);
} else {
state_refs.remove(&shard_id);
}
}
}
if pushed_diff {
receiver_metrics.state_pushed_diff_slow_path_succeeded.inc();
} else {
receiver_metrics.state_pushed_diff_slow_path_failed.inc();
}
}
if let Some(send_timestamp) = msg.timestamp {
let send_timestamp =
send_timestamp.into_rust().expect("valid timestamp");
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("failed to get millis since epoch");
receiver_metrics
.approx_diff_latency_seconds
.observe((now.saturating_sub(send_timestamp)).as_secs_f64());
}
}
ref msg @ None | ref msg @ Some(_) => {
warn!("pubsub client received unexpected message: {:?}", msg);
receiver_metrics.unknown_message_received.inc();
}
}
}
},
)
}
#[derive(Debug)]
pub(crate) struct PubSubState {
connection_id_counter: AtomicUsize,
shard_subscribers:
Arc<RwLock<BTreeMap<ShardId, BTreeMap<usize, Sender<Result<ProtoPubSubMessage, Status>>>>>>,
connections: Arc<RwLock<HashSet<usize>>>,
metrics: Arc<PubSubServerMetrics>,
}
impl PubSubState {
fn new_connection(
self: Arc<Self>,
notifier: Sender<Result<ProtoPubSubMessage, Status>>,
) -> PubSubConnection {
let connection_id = self.connection_id_counter.fetch_add(1, Ordering::SeqCst);
{
debug!("inserting connid: {}", connection_id);
let mut connections = self.connections.write().expect("lock");
assert!(connections.insert(connection_id));
}
self.metrics.active_connections.inc();
PubSubConnection {
connection_id,
notifier,
state: self,
}
}
fn remove_connection(&self, connection_id: usize) {
let now = Instant::now();
{
debug!("removing connid: {}", connection_id);
let mut connections = self.connections.write().expect("lock");
assert!(
connections.remove(&connection_id),
"unknown connection id: {}",
connection_id
);
}
{
let mut subscribers = self.shard_subscribers.write().expect("lock poisoned");
subscribers.retain(|_shard, connections_for_shard| {
connections_for_shard.remove(&connection_id);
!connections_for_shard.is_empty()
});
}
self.metrics
.connection_cleanup_seconds
.inc_by(now.elapsed().as_secs_f64());
self.metrics.active_connections.dec();
}
fn push_diff(&self, connection_id: usize, shard_id: &ShardId, data: &VersionedData) {
let now = Instant::now();
self.metrics.push_call_count.inc();
assert!(
self.connections
.read()
.expect("lock")
.contains(&connection_id),
"unknown connection id: {}",
connection_id
);
let subscribers = self.shard_subscribers.read().expect("lock poisoned");
if let Some(subscribed_connections) = subscribers.get(shard_id) {
let mut num_sent = 0;
let mut data_size = 0;
for (subscribed_conn_id, tx) in subscribed_connections {
if *subscribed_conn_id == connection_id {
continue;
}
debug!(
"server forwarding req to {} conns {} {} {}",
subscribed_conn_id,
&shard_id,
data.seqno,
data.data.len()
);
let req = ProtoPubSubMessage {
timestamp: Some(
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("failed to get millis since epoch")
.into_proto(),
),
message: Some(proto_pub_sub_message::Message::PushDiff(ProtoPushDiff {
seqno: data.seqno.into_proto(),
shard_id: shard_id.to_string(),
diff: Bytes::clone(&data.data),
})),
};
data_size = req.encoded_len();
match tx.try_send(Ok(req)) {
Ok(_) => {
num_sent += 1;
}
Err(TrySendError::Full(_)) => {
self.metrics.broadcasted_diff_dropped_channel_full.inc();
}
Err(TrySendError::Closed(_)) => {}
};
}
self.metrics.broadcasted_diff_count.inc_by(num_sent);
self.metrics
.broadcasted_diff_bytes
.inc_by(num_sent * u64::cast_from(data_size));
}
self.metrics
.push_seconds
.inc_by(now.elapsed().as_secs_f64());
}
fn subscribe(
&self,
connection_id: usize,
notifier: Sender<Result<ProtoPubSubMessage, Status>>,
shard_id: &ShardId,
) {
let now = Instant::now();
self.metrics.subscribe_call_count.inc();
assert!(
self.connections
.read()
.expect("lock")
.contains(&connection_id),
"unknown connection id: {}",
connection_id
);
{
let mut subscribed_shards = self.shard_subscribers.write().expect("lock poisoned");
subscribed_shards
.entry(*shard_id)
.or_default()
.insert(connection_id, notifier);
}
self.metrics
.subscribe_seconds
.inc_by(now.elapsed().as_secs_f64());
}
fn unsubscribe(&self, connection_id: usize, shard_id: &ShardId) {
let now = Instant::now();
self.metrics.unsubscribe_call_count.inc();
assert!(
self.connections
.read()
.expect("lock")
.contains(&connection_id),
"unknown connection id: {}",
connection_id
);
{
let mut subscribed_shards = self.shard_subscribers.write().expect("lock poisoned");
if let Entry::Occupied(mut entry) = subscribed_shards.entry(*shard_id) {
let subscribed_connections = entry.get_mut();
subscribed_connections.remove(&connection_id);
if subscribed_connections.is_empty() {
entry.remove_entry();
}
}
}
self.metrics
.unsubscribe_seconds
.inc_by(now.elapsed().as_secs_f64());
}
#[cfg(test)]
fn new_for_test() -> Self {
Self {
connection_id_counter: AtomicUsize::new(0),
shard_subscribers: Default::default(),
connections: Default::default(),
metrics: Arc::new(PubSubServerMetrics::new(&MetricsRegistry::new())),
}
}
#[cfg(test)]
fn active_connections(&self) -> HashSet<usize> {
self.connections.read().expect("lock").clone()
}
#[cfg(test)]
fn subscriptions(&self, connection_id: usize) -> HashSet<ShardId> {
let mut shards = HashSet::new();
let subscribers = self.shard_subscribers.read().expect("lock");
for (shard, subscribed_connections) in subscribers.iter() {
if subscribed_connections.contains_key(&connection_id) {
shards.insert(*shard);
}
}
shards
}
#[cfg(test)]
fn shard_subscription_counts(&self) -> mz_ore::collections::HashMap<ShardId, usize> {
let mut shards = mz_ore::collections::HashMap::new();
let subscribers = self.shard_subscribers.read().expect("lock");
for (shard, subscribed_connections) in subscribers.iter() {
shards.insert(*shard, subscribed_connections.len());
}
shards
}
}
#[derive(Debug)]
pub struct PersistGrpcPubSubServer {
cfg: PersistConfig,
state: Arc<PubSubState>,
}
impl PersistGrpcPubSubServer {
pub fn new(cfg: &PersistConfig, metrics_registry: &MetricsRegistry) -> Self {
let metrics = PubSubServerMetrics::new(metrics_registry);
let state = Arc::new(PubSubState {
connection_id_counter: AtomicUsize::new(0),
shard_subscribers: Default::default(),
connections: Default::default(),
metrics: Arc::new(metrics),
});
PersistGrpcPubSubServer {
cfg: cfg.clone(),
state,
}
}
pub fn new_same_process_connection(&self) -> PubSubClientConnection {
let (tx, rx) = tokio::sync::mpsc::channel(self.cfg.pubsub_client_receiver_channel_size);
let sender: Arc<dyn PubSubSender> = Arc::new(SubscriptionTrackingSender::new(Arc::new(
Arc::clone(&self.state).new_connection(tx),
)));
PubSubClientConnection {
sender,
receiver: Box::new(
ReceiverStream::new(rx).map(|x| x.expect("cannot receive grpc errors locally")),
),
}
}
pub async fn serve(self, listen_addr: SocketAddr) -> Result<(), anyhow::Error> {
tonic::transport::Server::builder()
.add_service(ProtoPersistPubSubServer::new(self).max_decoding_message_size(usize::MAX))
.serve(listen_addr)
.await?;
Ok(())
}
pub async fn serve_with_stream(
self,
listener: tokio_stream::wrappers::TcpListenerStream,
) -> Result<(), anyhow::Error> {
tonic::transport::Server::builder()
.add_service(ProtoPersistPubSubServer::new(self))
.serve_with_incoming(listener)
.await?;
Ok(())
}
}
#[async_trait]
impl proto_persist_pub_sub_server::ProtoPersistPubSub for PersistGrpcPubSubServer {
type PubSubStream = Pin<Box<dyn Stream<Item = Result<ProtoPubSubMessage, Status>> + Send>>;
#[mz_ore::instrument(name = "persist::rpc::server", level = "info")]
async fn pub_sub(
&self,
request: Request<Streaming<ProtoPubSubMessage>>,
) -> Result<Response<Self::PubSubStream>, Status> {
let caller_id = request
.metadata()
.get(AsciiMetadataKey::from_static(PERSIST_PUBSUB_CALLER_KEY))
.map(|key| key.to_str().ok())
.flatten()
.map(|key| key.to_string())
.unwrap_or_else(|| "unknown".to_string());
info!("Received Persist PubSub connection from: {:?}", caller_id);
let mut in_stream = request.into_inner();
let (tx, rx) = tokio::sync::mpsc::channel(self.cfg.pubsub_server_connection_channel_size);
let caller = caller_id.clone();
let cfg = Arc::clone(&self.cfg.configs);
let server_state = Arc::clone(&self.state);
let connection_span = info_span!("connection", caller_id);
mz_ore::task::spawn(
|| format!("persist_pubsub_connection({})", caller),
async move {
let connection = server_state.new_connection(tx);
while let Some(result) = in_stream.next().await {
let req = match result {
Ok(req) => req,
Err(err) => {
warn!("pubsub connection err: {}", err);
break;
}
};
match req.message {
None => {
warn!("received empty message from: {}", caller_id);
}
Some(proto_pub_sub_message::Message::PushDiff(req)) => {
let shard_id = req.shard_id.parse().expect("valid shard id");
let diff = VersionedData {
seqno: req.seqno.into_rust().expect("valid seqno"),
data: req.diff.clone(),
};
if PUBSUB_PUSH_DIFF_ENABLED.get(&cfg) {
connection.push_diff(&shard_id, &diff);
}
}
Some(proto_pub_sub_message::Message::Subscribe(diff)) => {
let shard_id = diff.shard_id.parse().expect("valid shard id");
connection.subscribe(&shard_id);
}
Some(proto_pub_sub_message::Message::Unsubscribe(diff)) => {
let shard_id = diff.shard_id.parse().expect("valid shard id");
connection.unsubscribe(&shard_id);
}
}
}
info!("Persist PubSub connection ended: {:?}", caller_id);
}
.instrument(connection_span),
);
let out_stream: Self::PubSubStream = Box::pin(ReceiverStream::new(rx));
Ok(Response::new(out_stream))
}
}
#[derive(Debug)]
pub(crate) struct PubSubConnection {
connection_id: usize,
notifier: Sender<Result<ProtoPubSubMessage, Status>>,
state: Arc<PubSubState>,
}
impl PubSubSenderInternal for PubSubConnection {
fn push_diff(&self, shard_id: &ShardId, diff: &VersionedData) {
self.state.push_diff(self.connection_id, shard_id, diff)
}
fn subscribe(&self, shard_id: &ShardId) {
self.state
.subscribe(self.connection_id, self.notifier.clone(), shard_id)
}
fn unsubscribe(&self, shard_id: &ShardId) {
self.state.unsubscribe(self.connection_id, shard_id)
}
}
impl Drop for PubSubConnection {
fn drop(&mut self) {
self.state.remove_connection(self.connection_id)
}
}
#[cfg(test)]
mod pubsub_state {
use std::str::FromStr;
use std::sync::Arc;
use std::sync::LazyLock;
use bytes::Bytes;
use mz_ore::collections::HashSet;
use mz_persist::location::{SeqNo, VersionedData};
use mz_proto::RustType;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::Receiver;
use tonic::Status;
use crate::internal::service::proto_pub_sub_message::Message;
use crate::internal::service::ProtoPubSubMessage;
use crate::rpc::{PubSubSenderInternal, PubSubState};
use crate::ShardId;
const SHARD_ID_0: LazyLock<ShardId> =
LazyLock::new(|| ShardId::from_str("s00000000-0000-0000-0000-000000000000").unwrap());
const SHARD_ID_1: LazyLock<ShardId> =
LazyLock::new(|| ShardId::from_str("s11111111-1111-1111-1111-111111111111").unwrap());
const VERSIONED_DATA_0: VersionedData = VersionedData {
seqno: SeqNo(0),
data: Bytes::from_static(&[0, 1, 2, 3]),
};
const VERSIONED_DATA_1: VersionedData = VersionedData {
seqno: SeqNo(1),
data: Bytes::from_static(&[4, 5, 6, 7]),
};
#[mz_ore::test]
#[should_panic(expected = "unknown connection id: 100")]
fn test_zero_connections_push_diff() {
let state = Arc::new(PubSubState::new_for_test());
state.push_diff(100, &SHARD_ID_0, &VERSIONED_DATA_0);
}
#[mz_ore::test]
#[should_panic(expected = "unknown connection id: 100")]
fn test_zero_connections_subscribe() {
let state = Arc::new(PubSubState::new_for_test());
let (tx, _) = tokio::sync::mpsc::channel(100);
state.subscribe(100, tx, &SHARD_ID_0);
}
#[mz_ore::test]
#[should_panic(expected = "unknown connection id: 100")]
fn test_zero_connections_unsubscribe() {
let state = Arc::new(PubSubState::new_for_test());
state.unsubscribe(100, &SHARD_ID_0);
}
#[mz_ore::test]
#[should_panic(expected = "unknown connection id: 100")]
fn test_zero_connections_remove() {
let state = Arc::new(PubSubState::new_for_test());
state.remove_connection(100)
}
#[mz_ore::test]
fn test_single_connection() {
let state = Arc::new(PubSubState::new_for_test());
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
let connection = Arc::clone(&state).new_connection(tx);
assert_eq!(
state.active_connections(),
HashSet::from([connection.connection_id])
);
assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
connection.push_diff(
&SHARD_ID_0,
&VersionedData {
seqno: SeqNo::minimum(),
data: Bytes::new(),
},
);
assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
connection.subscribe(&SHARD_ID_0);
assert_eq!(
state.subscriptions(connection.connection_id),
HashSet::from([SHARD_ID_0.clone()])
);
connection.unsubscribe(&SHARD_ID_0);
assert!(state.subscriptions(connection.connection_id).is_empty());
connection.subscribe(&SHARD_ID_0);
connection.subscribe(&SHARD_ID_1);
assert_eq!(
state.subscriptions(connection.connection_id),
HashSet::from([*SHARD_ID_0, *SHARD_ID_1])
);
connection.subscribe(&SHARD_ID_0);
connection.subscribe(&SHARD_ID_0);
assert_eq!(
state.subscriptions(connection.connection_id),
HashSet::from([*SHARD_ID_0, *SHARD_ID_1])
);
let connection_id = connection.connection_id;
drop(connection);
assert!(state.subscriptions(connection_id).is_empty());
assert!(state.active_connections().is_empty());
}
#[mz_ore::test]
fn test_many_connection() {
let state = Arc::new(PubSubState::new_for_test());
let (tx1, mut rx1) = tokio::sync::mpsc::channel(100);
let conn1 = Arc::clone(&state).new_connection(tx1);
let (tx2, mut rx2) = tokio::sync::mpsc::channel(100);
let conn2 = Arc::clone(&state).new_connection(tx2);
let (tx3, mut rx3) = tokio::sync::mpsc::channel(100);
let conn3 = Arc::clone(&state).new_connection(tx3);
conn1.subscribe(&SHARD_ID_0);
conn2.subscribe(&SHARD_ID_0);
conn2.subscribe(&SHARD_ID_1);
assert_eq!(
state.active_connections(),
HashSet::from([
conn1.connection_id,
conn2.connection_id,
conn3.connection_id
])
);
conn3.push_diff(&SHARD_ID_0, &VERSIONED_DATA_0);
assert_push(&mut rx1, &SHARD_ID_0, &VERSIONED_DATA_0);
assert_push(&mut rx2, &SHARD_ID_0, &VERSIONED_DATA_0);
assert!(matches!(rx3.try_recv(), Err(TryRecvError::Empty)));
conn1.push_diff(&SHARD_ID_0, &VERSIONED_DATA_0);
assert!(matches!(rx1.try_recv(), Err(TryRecvError::Empty)));
assert_push(&mut rx2, &SHARD_ID_0, &VERSIONED_DATA_0);
assert!(matches!(rx3.try_recv(), Err(TryRecvError::Empty)));
conn3.push_diff(&SHARD_ID_1, &VERSIONED_DATA_1);
assert!(matches!(rx1.try_recv(), Err(TryRecvError::Empty)));
assert_push(&mut rx2, &SHARD_ID_1, &VERSIONED_DATA_1);
assert!(matches!(rx3.try_recv(), Err(TryRecvError::Empty)));
conn2.unsubscribe(&SHARD_ID_1);
conn3.push_diff(&SHARD_ID_1, &VERSIONED_DATA_1);
assert!(matches!(rx1.try_recv(), Err(TryRecvError::Empty)));
assert!(matches!(rx2.try_recv(), Err(TryRecvError::Empty)));
assert!(matches!(rx3.try_recv(), Err(TryRecvError::Empty)));
let conn1_id = conn1.connection_id;
drop(conn1);
conn3.push_diff(&SHARD_ID_0, &VERSIONED_DATA_0);
assert!(matches!(rx1.try_recv(), Err(TryRecvError::Disconnected)));
assert_push(&mut rx2, &SHARD_ID_0, &VERSIONED_DATA_0);
assert!(matches!(rx3.try_recv(), Err(TryRecvError::Empty)));
assert!(state.subscriptions(conn1_id).is_empty());
assert_eq!(
state.subscriptions(conn2.connection_id),
HashSet::from([*SHARD_ID_0])
);
assert_eq!(state.subscriptions(conn3.connection_id), HashSet::new());
assert_eq!(
state.active_connections(),
HashSet::from([conn2.connection_id, conn3.connection_id])
);
}
fn assert_push(
rx: &mut Receiver<Result<ProtoPubSubMessage, Status>>,
shard: &ShardId,
data: &VersionedData,
) {
let message = rx
.try_recv()
.expect("message in channel")
.expect("pubsub")
.message
.expect("proto contains message");
match message {
Message::PushDiff(x) => {
assert_eq!(x.shard_id, shard.into_proto());
assert_eq!(x.seqno, data.seqno.into_proto());
assert_eq!(x.diff, data.data);
}
Message::Subscribe(_) | Message::Unsubscribe(_) => panic!("unexpected message type"),
};
}
}
#[cfg(test)]
mod grpc {
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use bytes::Bytes;
use futures_util::FutureExt;
use mz_dyncfg::ConfigUpdates;
use mz_ore::assert_none;
use mz_ore::collections::HashMap;
use mz_ore::metrics::MetricsRegistry;
use mz_persist::location::{SeqNo, VersionedData};
use mz_proto::RustType;
use std::sync::LazyLock;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tokio_stream::StreamExt;
use crate::cfg::PersistConfig;
use crate::internal::service::proto_pub_sub_message::Message;
use crate::internal::service::ProtoPubSubMessage;
use crate::metrics::Metrics;
use crate::rpc::{
GrpcPubSubClient, PersistGrpcPubSubServer, PersistPubSubClient, PersistPubSubClientConfig,
PubSubState, PUBSUB_CLIENT_ENABLED,
};
use crate::ShardId;
static SHARD_ID_0: LazyLock<ShardId> =
LazyLock::new(|| ShardId::from_str("s00000000-0000-0000-0000-000000000000").unwrap());
static SHARD_ID_1: LazyLock<ShardId> =
LazyLock::new(|| ShardId::from_str("s11111111-1111-1111-1111-111111111111").unwrap());
const VERSIONED_DATA_0: VersionedData = VersionedData {
seqno: SeqNo(0),
data: Bytes::from_static(&[0, 1, 2, 3]),
};
const VERSIONED_DATA_1: VersionedData = VersionedData {
seqno: SeqNo(1),
data: Bytes::from_static(&[4, 5, 6, 7]),
};
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const SUBSCRIPTIONS_TIMEOUT: Duration = Duration::from_secs(3);
const SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(2);
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn grpc_server() {
let metrics = Arc::new(Metrics::new(
&test_persist_config(),
&MetricsRegistry::new(),
));
let server_runtime = tokio::runtime::Runtime::new().expect("server runtime");
let client_runtime = tokio::runtime::Runtime::new().expect("client runtime");
let (addr, tcp_listener_stream) = server_runtime.block_on(new_tcp_listener());
let server_state = server_runtime.block_on(spawn_server(tcp_listener_stream));
{
let _guard = client_runtime.enter();
mz_ore::task::spawn(|| "client".to_string(), async move {
let client = GrpcPubSubClient::connect(
PersistPubSubClientConfig {
url: format!("http://{}", addr),
caller_id: "client".to_string(),
persist_cfg: test_persist_config(),
},
metrics,
);
let _token = client.sender.subscribe(&SHARD_ID_0);
tokio::time::sleep(Duration::MAX).await;
});
}
server_runtime.block_on(async {
poll_until_true(CONNECT_TIMEOUT, || {
server_state.active_connections().len() == 1
})
.await;
poll_until_true(SUBSCRIPTIONS_TIMEOUT, || {
server_state.shard_subscription_counts() == HashMap::from([(*SHARD_ID_0, 1)])
})
.await
});
client_runtime.shutdown_timeout(SERVER_SHUTDOWN_TIMEOUT);
server_runtime.block_on(async {
poll_until_true(CONNECT_TIMEOUT, || {
server_state.active_connections().is_empty()
})
.await;
poll_until_true(SUBSCRIPTIONS_TIMEOUT, || {
server_state.shard_subscription_counts() == HashMap::new()
})
.await
});
}
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn grpc_client_sender_reconnects() {
let metrics = Arc::new(Metrics::new(
&test_persist_config(),
&MetricsRegistry::new(),
));
let server_runtime = tokio::runtime::Runtime::new().expect("server runtime");
let client_runtime = tokio::runtime::Runtime::new().expect("client runtime");
let (addr, tcp_listener_stream) = server_runtime.block_on(new_tcp_listener());
let client = client_runtime.block_on(async {
GrpcPubSubClient::connect(
PersistPubSubClientConfig {
url: format!("http://{}", addr),
caller_id: "client".to_string(),
persist_cfg: test_persist_config(),
},
metrics,
)
});
let _token = Arc::clone(&client.sender).subscribe(&SHARD_ID_0);
let _token_2 = Arc::clone(&client.sender).subscribe(&SHARD_ID_1);
drop(_token_2);
let server_state = server_runtime.block_on(spawn_server(tcp_listener_stream));
server_runtime.block_on(async {
poll_until_true(CONNECT_TIMEOUT, || {
server_state.active_connections().len() == 1
})
.await;
poll_until_true(SUBSCRIPTIONS_TIMEOUT, || {
server_state.shard_subscription_counts() == HashMap::from([(*SHARD_ID_0, 1)])
})
.await;
});
server_runtime.shutdown_timeout(SERVER_SHUTDOWN_TIMEOUT);
let _token_2 = Arc::clone(&client.sender).subscribe(&SHARD_ID_1);
let server_runtime = tokio::runtime::Runtime::new().expect("server runtime");
let tcp_listener_stream = server_runtime.block_on(async {
TcpListenerStream::new(
TcpListener::bind(addr)
.await
.expect("can bind to previous addr"),
)
});
let server_state = server_runtime.block_on(spawn_server(tcp_listener_stream));
server_runtime.block_on(async {
poll_until_true(CONNECT_TIMEOUT, || {
server_state.active_connections().len() == 1
})
.await;
poll_until_true(SUBSCRIPTIONS_TIMEOUT, || {
server_state.shard_subscription_counts()
== HashMap::from([(*SHARD_ID_0, 1), (*SHARD_ID_1, 1)])
})
.await;
});
}
#[mz_ore::test(tokio::test(flavor = "multi_thread"))]
#[cfg_attr(miri, ignore)] async fn grpc_client_sender_subscription_tokens() {
let metrics = Arc::new(Metrics::new(
&test_persist_config(),
&MetricsRegistry::new(),
));
let (addr, tcp_listener_stream) = new_tcp_listener().await;
let server_state = spawn_server(tcp_listener_stream).await;
let client = GrpcPubSubClient::connect(
PersistPubSubClientConfig {
url: format!("http://{}", addr),
caller_id: "client".to_string(),
persist_cfg: test_persist_config(),
},
metrics,
);
poll_until_true(CONNECT_TIMEOUT, || {
server_state.active_connections().len() == 1
})
.await;
let token = Arc::clone(&client.sender).subscribe(&SHARD_ID_0);
poll_until_true(SUBSCRIPTIONS_TIMEOUT, || {
server_state.shard_subscription_counts() == HashMap::from([(*SHARD_ID_0, 1)])
})
.await;
drop(token);
poll_until_true(SUBSCRIPTIONS_TIMEOUT, || {
server_state.shard_subscription_counts() == HashMap::new()
})
.await;
let token = Arc::clone(&client.sender).subscribe(&SHARD_ID_0);
poll_until_true(SUBSCRIPTIONS_TIMEOUT, || {
server_state.shard_subscription_counts() == HashMap::from([(*SHARD_ID_0, 1)])
})
.await;
let token2 = Arc::clone(&client.sender).subscribe(&SHARD_ID_0);
let token3 = Arc::clone(&client.sender).subscribe(&SHARD_ID_0);
assert_eq!(Arc::strong_count(&token), 3);
poll_until_true(SUBSCRIPTIONS_TIMEOUT, || {
server_state.shard_subscription_counts() == HashMap::from([(*SHARD_ID_0, 1)])
})
.await;
drop(token);
drop(token2);
drop(token3);
poll_until_true(SUBSCRIPTIONS_TIMEOUT, || {
server_state.shard_subscription_counts() == HashMap::new()
})
.await;
let _token0 = Arc::clone(&client.sender).subscribe(&SHARD_ID_0);
let _token1 = Arc::clone(&client.sender).subscribe(&SHARD_ID_1);
poll_until_true(SUBSCRIPTIONS_TIMEOUT, || {
server_state.shard_subscription_counts()
== HashMap::from([(*SHARD_ID_0, 1), (*SHARD_ID_1, 1)])
})
.await;
}
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn grpc_client_receiver() {
let metrics = Arc::new(Metrics::new(
&PersistConfig::new_for_tests(),
&MetricsRegistry::new(),
));
let server_runtime = tokio::runtime::Runtime::new().expect("server runtime");
let client_runtime = tokio::runtime::Runtime::new().expect("client runtime");
let (addr, tcp_listener_stream) = server_runtime.block_on(new_tcp_listener());
let mut client_1 = client_runtime.block_on(async {
GrpcPubSubClient::connect(
PersistPubSubClientConfig {
url: format!("http://{}", addr),
caller_id: "client_1".to_string(),
persist_cfg: test_persist_config(),
},
Arc::clone(&metrics),
)
});
let mut client_2 = client_runtime.block_on(async {
GrpcPubSubClient::connect(
PersistPubSubClientConfig {
url: format!("http://{}", addr),
caller_id: "client_2".to_string(),
persist_cfg: test_persist_config(),
},
metrics,
)
});
assert_none!(client_1.receiver.next().now_or_never());
assert_none!(client_2.receiver.next().now_or_never());
let server_state = server_runtime.block_on(spawn_server(tcp_listener_stream));
server_runtime.block_on(poll_until_true(CONNECT_TIMEOUT, || {
server_state.active_connections().len() == 2
}));
assert_none!(client_1.receiver.next().now_or_never());
assert_none!(client_2.receiver.next().now_or_never());
let _token_client_1 = Arc::clone(&client_1.sender).subscribe(&SHARD_ID_0);
let _token_client_2 = Arc::clone(&client_2.sender).subscribe(&SHARD_ID_0);
server_runtime.block_on(poll_until_true(SUBSCRIPTIONS_TIMEOUT, || {
server_state.shard_subscription_counts() == HashMap::from([(*SHARD_ID_0, 2)])
}));
client_1.sender.push_diff(&SHARD_ID_0, &VERSIONED_DATA_1);
assert_none!(client_1.receiver.next().now_or_never());
client_runtime.block_on(async {
assert_push(
client_2.receiver.next().await.expect("has diff"),
&SHARD_ID_0,
&VERSIONED_DATA_1,
)
});
server_runtime.shutdown_timeout(SERVER_SHUTDOWN_TIMEOUT);
assert_none!(client_1.receiver.next().now_or_never());
assert_none!(client_2.receiver.next().now_or_never());
let server_runtime = tokio::runtime::Runtime::new().expect("server runtime");
let tcp_listener_stream = server_runtime.block_on(async {
TcpListenerStream::new(
TcpListener::bind(addr)
.await
.expect("can bind to previous addr"),
)
});
let server_state = server_runtime.block_on(spawn_server(tcp_listener_stream));
server_runtime.block_on(async {
poll_until_true(CONNECT_TIMEOUT, || {
server_state.active_connections().len() == 2
})
.await;
poll_until_true(SUBSCRIPTIONS_TIMEOUT, || {
server_state.shard_subscription_counts() == HashMap::from([(*SHARD_ID_0, 2)])
})
.await;
});
client_2.sender.push_diff(&SHARD_ID_0, &VERSIONED_DATA_0);
client_runtime.block_on(async {
assert_push(
client_1.receiver.next().await.expect("has diff"),
&SHARD_ID_0,
&VERSIONED_DATA_0,
)
});
assert_none!(client_2.receiver.next().now_or_never());
}
async fn new_tcp_listener() -> (SocketAddr, TcpListenerStream) {
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0));
let tcp_listener = TcpListener::bind(addr).await.expect("tcp listener");
(
tcp_listener.local_addr().expect("bound to local address"),
TcpListenerStream::new(tcp_listener),
)
}
#[allow(clippy::unused_async)]
async fn spawn_server(tcp_listener_stream: TcpListenerStream) -> Arc<PubSubState> {
let server = PersistGrpcPubSubServer::new(&test_persist_config(), &MetricsRegistry::new());
let server_state = Arc::clone(&server.state);
let _server_task = mz_ore::task::spawn(|| "server".to_string(), async move {
server.serve_with_stream(tcp_listener_stream).await
});
server_state
}
async fn poll_until_true<F>(timeout: Duration, f: F)
where
F: Fn() -> bool,
{
let now = Instant::now();
loop {
if f() {
return;
}
if now.elapsed() > timeout {
panic!("timed out");
}
tokio::time::sleep(Duration::from_millis(1)).await;
}
}
fn assert_push(message: ProtoPubSubMessage, shard: &ShardId, data: &VersionedData) {
let message = message.message.expect("proto contains message");
match message {
Message::PushDiff(x) => {
assert_eq!(x.shard_id, shard.into_proto());
assert_eq!(x.seqno, data.seqno.into_proto());
assert_eq!(x.diff, data.data);
}
Message::Subscribe(_) | Message::Unsubscribe(_) => panic!("unexpected message type"),
};
}
fn test_persist_config() -> PersistConfig {
let mut cfg = PersistConfig::new_for_tests();
cfg.pubsub_reconnect_backoff = Duration::ZERO;
let mut updates = ConfigUpdates::default();
updates.add(&PUBSUB_CLIENT_ENABLED, true);
cfg.apply_from(&updates);
cfg
}
}