use std::collections::{BTreeMap, BTreeSet};
use std::num::NonZeroI64;
use std::time::Duration;
use anyhow::anyhow;
use differential_dataflow::lattice::Lattice;
use futures::{Stream, StreamExt};
use mz_build_info::BuildInfo;
use mz_cluster_client::client::{ClusterReplicaLocation, ClusterStartupEpoch, TimelyConfig};
use mz_ore::now::NowFn;
use mz_ore::retry::Retry;
use mz_ore::soft_panic_or_log;
use mz_ore::task::AbortOnDropHandle;
use mz_persist_types::Codec64;
use mz_repr::GlobalId;
use mz_service::client::{GenericClient, Partitioned};
use mz_service::params::GrpcClientParameters;
use mz_storage_client::client::{
RunIngestionCommand, RunSinkCommand, Status, StorageClient, StorageCommand, StorageGrpcClient,
StorageResponse,
};
use mz_storage_client::metrics::RehydratingStorageClientMetrics;
use mz_storage_types::parameters::StorageParameters;
use timely::progress::{Antichain, Timestamp};
use timely::PartialOrder;
use tokio::select;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::warn;
#[derive(Debug)]
pub struct RehydratingStorageClient<T> {
command_tx: UnboundedSender<RehydrationCommand<T>>,
response_rx: UnboundedReceiverStream<StorageResponse<T>>,
_task: AbortOnDropHandle<()>,
}
type PartitionedClient<T> = Partitioned<StorageGrpcClient, StorageCommand<T>, StorageResponse<T>>;
impl<T> RehydratingStorageClient<T>
where
T: Timestamp + Lattice + Codec64,
StorageGrpcClient: StorageClient<T>,
{
pub fn new(
build_info: &'static BuildInfo,
metrics: RehydratingStorageClientMetrics,
envd_epoch: NonZeroI64,
grpc_client_params: GrpcClientParameters,
now: NowFn,
) -> RehydratingStorageClient<T> {
let (command_tx, command_rx) = unbounded_channel();
let (response_tx, response_rx) = unbounded_channel();
let mut task = RehydrationTask {
build_info,
command_rx,
response_tx,
sources: BTreeMap::new(),
sinks: BTreeMap::new(),
uppers: BTreeMap::new(),
sinces: BTreeMap::new(),
initialized: false,
current_epoch: ClusterStartupEpoch::new(envd_epoch, 0),
config: Default::default(),
metrics,
grpc_client_params,
now,
};
let task = mz_ore::task::spawn(|| "rehydration", async move { task.run().await });
RehydratingStorageClient {
command_tx,
response_rx: UnboundedReceiverStream::new(response_rx),
_task: task.abort_on_drop(),
}
}
pub fn connect(&mut self, location: ClusterReplicaLocation) {
self.command_tx
.send(RehydrationCommand::Connect { location })
.expect("rehydration task should not drop first");
}
pub fn reset(&mut self) {
self.command_tx
.send(RehydrationCommand::Reset)
.expect("rehydration task should not drop first");
}
pub fn send(&mut self, cmd: StorageCommand<T>) {
self.command_tx
.send(RehydrationCommand::Send(cmd))
.expect("rehydration task should not drop first");
}
pub fn response_stream(&mut self) -> impl Stream<Item = StorageResponse<T>> + '_ {
&mut self.response_rx
}
}
#[derive(Debug, Clone)]
enum RehydrationCommand<T> {
Connect {
location: ClusterReplicaLocation,
},
Send(StorageCommand<T>),
Reset,
}
struct RehydrationTask<T>
where
T: Timestamp + Lattice + Codec64,
{
build_info: &'static BuildInfo,
command_rx: UnboundedReceiver<RehydrationCommand<T>>,
response_tx: UnboundedSender<StorageResponse<T>>,
sources: BTreeMap<GlobalId, RunIngestionCommand>,
sinks: BTreeMap<GlobalId, RunSinkCommand<T>>,
uppers: BTreeMap<GlobalId, Antichain<T>>,
sinces: BTreeMap<GlobalId, Antichain<T>>,
initialized: bool,
current_epoch: ClusterStartupEpoch,
config: StorageParameters,
metrics: RehydratingStorageClientMetrics,
grpc_client_params: GrpcClientParameters,
now: NowFn,
}
enum RehydrationTaskState<T: Timestamp + Lattice> {
AwaitAddress,
Rehydrate {
location: ClusterReplicaLocation,
},
Pump {
location: ClusterReplicaLocation,
client: PartitionedClient<T>,
},
Done,
}
impl<T> RehydrationTask<T>
where
T: Timestamp + Lattice + Codec64,
StorageGrpcClient: StorageClient<T>,
{
async fn run(&mut self) {
let mut state = RehydrationTaskState::AwaitAddress;
loop {
state = match state {
RehydrationTaskState::AwaitAddress => self.step_await_address().await,
RehydrationTaskState::Rehydrate { location } => self.step_rehydrate(location).await,
RehydrationTaskState::Pump { location, client } => {
self.step_pump(location, client).await
}
RehydrationTaskState::Done => break,
}
}
}
async fn step_await_address(&mut self) -> RehydrationTaskState<T> {
if self.initialized {
self.update_paused_statuses();
}
loop {
match self.command_rx.recv().await {
None => break RehydrationTaskState::Done,
Some(RehydrationCommand::Connect { location }) => {
break RehydrationTaskState::Rehydrate { location };
}
Some(RehydrationCommand::Send(command)) => {
self.absorb_command(&command);
if self.initialized {
self.update_paused_statuses();
}
}
Some(RehydrationCommand::Reset) => {}
}
}
}
async fn step_rehydrate(
&mut self,
location: ClusterReplicaLocation,
) -> RehydrationTaskState<T> {
let stream = Retry::default()
.clamp_backoff(Duration::from_secs(1))
.into_retry_stream();
tokio::pin!(stream);
let (client, timely_command) = loop {
let state = stream.next().await.expect("infinite stream");
loop {
match self.command_rx.try_recv() {
Ok(RehydrationCommand::Connect { location }) => {
return RehydrationTaskState::Rehydrate { location };
}
Ok(RehydrationCommand::Send(command)) => {
self.absorb_command(&command);
}
Ok(RehydrationCommand::Reset) => {
return RehydrationTaskState::AwaitAddress;
}
Err(TryRecvError::Disconnected) => return RehydrationTaskState::Done,
Err(TryRecvError::Empty) => break,
}
}
let timely_config = TimelyConfig {
workers: location.workers,
process: 0,
addresses: location.dataflow_addrs.clone(),
arrangement_exert_proportionality: 1337,
};
let dests = location
.ctl_addrs
.clone()
.into_iter()
.map(|addr| (addr, self.metrics.clone()))
.collect();
let version = self.build_info.semver_version();
let client =
StorageGrpcClient::connect_partitioned(dests, version, &self.grpc_client_params)
.await;
let client = match client {
Ok(client) => client,
Err(e) => {
if state.i >= mz_service::retry::INFO_MIN_RETRIES {
tracing::info!(
"error connecting to {:?} for storage, retrying in {:?}: {e:#}",
location,
state.next_backoff.unwrap()
);
} else {
tracing::debug!(
"error connecting to {:?} for storage, retrying in {:?}: {e:#}",
location,
state.next_backoff.unwrap()
);
}
continue;
}
};
let new_epoch = ClusterStartupEpoch::new(
self.current_epoch.envd(),
self.current_epoch.replica() + 1,
);
self.current_epoch = new_epoch;
let timely_command = StorageCommand::CreateTimely {
config: timely_config,
epoch: new_epoch,
};
break (client, timely_command);
};
let mut commands = vec![
timely_command,
StorageCommand::UpdateConfiguration(self.config.clone()),
StorageCommand::RunIngestions(self.sources.values().cloned().collect()),
StorageCommand::RunSinks(self.sinks.values().cloned().collect()),
StorageCommand::AllowCompaction(
self.sinces
.iter()
.map(|(id, since)| (*id, since.clone()))
.collect(),
),
];
if self.initialized {
commands.push(StorageCommand::InitializationComplete)
}
self.send_commands(location, client, commands).await
}
async fn step_pump(
&mut self,
location: ClusterReplicaLocation,
mut client: PartitionedClient<T>,
) -> RehydrationTaskState<T> {
select! {
command = self.command_rx.recv() => match command {
None => RehydrationTaskState::Done,
Some(RehydrationCommand::Connect { location }) => RehydrationTaskState::Rehydrate { location },
Some(RehydrationCommand::Send(command)) => {
self.absorb_command(&command);
self.send_commands(location, client, vec![command]).await
}
Some(RehydrationCommand::Reset) => {
RehydrationTaskState::AwaitAddress
}
},
response = client.recv() => {
let response = match response.transpose() {
None => {
Err(anyhow!("storage cluster unexpectedly gracefully terminated connection"))
}
Some(response) => response,
};
self.send_response(location, client, response)
}
}
}
fn update_paused_statuses(&self) {
let _ = self.response_tx.send(StorageResponse::StatusUpdates(
self.sources
.keys()
.map(|id| mz_storage_client::client::StatusUpdate {
id: *id,
status: Status::Paused,
timestamp: mz_ore::now::to_datetime((self.now)()),
error: None,
hints: BTreeSet::from([
"There is currently no replica running this source".to_string()
]),
namespaced_errors: Default::default(),
})
.collect(),
));
let _ = self.response_tx.send(StorageResponse::StatusUpdates(
self.sinks
.keys()
.map(|id| mz_storage_client::client::StatusUpdate {
id: *id,
status: Status::Paused,
timestamp: mz_ore::now::to_datetime((self.now)()),
error: None,
hints: BTreeSet::from([
"There is currently no replica running this source".to_string()
]),
namespaced_errors: Default::default(),
})
.collect(),
));
}
async fn send_commands(
&mut self,
location: ClusterReplicaLocation,
mut client: PartitionedClient<T>,
commands: impl IntoIterator<Item = StorageCommand<T>>,
) -> RehydrationTaskState<T> {
for command in commands {
if let Err(e) = client.send(command).await {
return self.send_response(location.clone(), client, Err(e));
}
}
RehydrationTaskState::Pump { location, client }
}
fn send_response(
&mut self,
location: ClusterReplicaLocation,
client: PartitionedClient<T>,
response: Result<StorageResponse<T>, anyhow::Error>,
) -> RehydrationTaskState<T> {
match response {
Ok(response) => {
if let Some(response) = self.absorb_response(response) {
if self.response_tx.send(response).is_err() {
RehydrationTaskState::Done
} else {
RehydrationTaskState::Pump { location, client }
}
} else {
RehydrationTaskState::Pump { location, client }
}
}
Err(e) => {
warn!("storage cluster produced error, reconnecting: {e:#}");
RehydrationTaskState::Rehydrate { location }
}
}
}
fn absorb_command(&mut self, command: &StorageCommand<T>) {
match command {
StorageCommand::CreateTimely { .. } => {
}
StorageCommand::InitializationComplete => self.initialized = true,
StorageCommand::UpdateConfiguration(params) => {
self.config.update(params.clone());
}
StorageCommand::RunIngestions(ingestions) => {
for ingestion in ingestions {
self.sources.insert(ingestion.id, ingestion.clone());
for id in ingestion.description.subsource_ids() {
self.uppers
.entry(id)
.or_insert(Antichain::from_elem(T::minimum()));
}
}
}
StorageCommand::RunSinks(exports) => {
for export in exports {
self.sinks.insert(export.id, export.clone());
self.uppers
.entry(export.id)
.or_insert(Antichain::from_elem(T::minimum()));
}
}
StorageCommand::AllowCompaction(frontiers) => {
self.sinces.extend(frontiers.iter().cloned());
for (id, frontier) in frontiers {
match self.sinks.get_mut(id) {
Some(export) => {
export.description.as_of.clone_from(frontier);
}
None if self.uppers.contains_key(id) => continue,
None => soft_panic_or_log!("AllowCompaction command for non-existent {id}"),
}
}
}
}
}
fn absorb_response(&mut self, response: StorageResponse<T>) -> Option<StorageResponse<T>> {
match response {
StorageResponse::FrontierUppers(list) => {
let mut new_uppers = Vec::new();
for (id, new_upper) in list {
let reported = match self.uppers.get_mut(&id) {
Some(reported) => reported,
None => {
soft_panic_or_log!("Reference to absent collection: {id}");
continue;
}
};
if PartialOrder::less_than(reported, &new_upper) {
reported.clone_from(&new_upper);
new_uppers.push((id, new_upper));
}
}
if !new_uppers.is_empty() {
Some(StorageResponse::FrontierUppers(new_uppers))
} else {
None
}
}
StorageResponse::DroppedIds(dropped_ids) => {
tracing::debug!("dropped IDs: {:?}", dropped_ids);
for id in dropped_ids.iter() {
self.sources.remove(id);
self.sinks.remove(id);
self.uppers.remove(id);
self.sinces.remove(id);
}
Some(StorageResponse::DroppedIds(dropped_ids))
}
StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
Some(StorageResponse::StatisticsUpdates(source_stats, sink_stats))
}
StorageResponse::StatusUpdates(updates) => {
Some(StorageResponse::StatusUpdates(updates))
}
}
}
}