use std::collections::{btree_map, BTreeMap, BTreeSet};
use std::time::{Duration, Instant};
use futures::FutureExt;
use maplit::btreemap;
use mz_catalog::memory::objects::ClusterReplicaProcessStatus;
use mz_controller::clusters::{ClusterEvent, ClusterStatus};
use mz_controller::ControllerResponse;
use mz_ore::instrument;
use mz_ore::now::EpochMillis;
use mz_ore::option::OptionExt;
use mz_ore::tracing::OpenTelemetryContext;
use mz_ore::{soft_assert_or_log, task};
use mz_persist_client::usage::ShardsUsageReferenced;
use mz_repr::{Datum, Row};
use mz_sql::ast::Statement;
use mz_sql::pure::PurifiedStatement;
use mz_storage_client::controller::IntrospectionType;
use mz_storage_types::controller::CollectionMetadata;
use opentelemetry::trace::TraceContextExt;
use rand::{rngs, Rng, SeedableRng};
use serde_json::json;
use tracing::{event, info_span, warn, Instrument, Level};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
use crate::catalog::{BuiltinTableUpdate, Op};
use crate::command::Command;
use crate::coord::{
AlterConnectionValidationReady, ClusterReplicaStatuses, Coordinator,
CreateConnectionValidationReady, Message, PurifiedStatementReady, WatchSetResponse,
};
use crate::telemetry::{EventDetails, SegmentClientExt};
use crate::{AdapterNotice, TimestampContext};
impl Coordinator {
#[instrument]
pub(crate) async fn handle_message(&mut self, msg: Message) -> () {
match msg {
Message::Command(otel_ctx, cmd) => {
let span = tracing::info_span!("message_command").or_current();
span.in_scope(|| otel_ctx.attach_as_parent());
self.message_command(cmd).instrument(span).await
}
Message::ControllerReady => {
let Coordinator {
controller,
catalog,
..
} = self;
let storage_metadata = catalog.state().storage_metadata();
if let Some(m) = controller
.process(storage_metadata)
.expect("`process` never returns an error")
{
self.message_controller(m).boxed_local().await
}
}
Message::PurifiedStatementReady(ready) => {
self.message_purified_statement_ready(ready)
.boxed_local()
.await
}
Message::CreateConnectionValidationReady(ready) => {
self.message_create_connection_validation_ready(ready)
.boxed_local()
.await
}
Message::AlterConnectionValidationReady(ready) => {
self.message_alter_connection_validation_ready(ready)
.boxed_local()
.await
}
Message::TryDeferred {
conn_id,
acquired_lock,
} => self.try_deferred(conn_id, acquired_lock).await,
Message::GroupCommitInitiate(span, permit) => {
tracing::Span::current().add_link(span.context().span().span_context().clone());
self.try_group_commit(permit)
.instrument(span)
.boxed_local()
.await
}
Message::AdvanceTimelines => {
self.advance_timelines().boxed_local().await;
}
Message::ClusterEvent(event) => self.message_cluster_event(event).boxed_local().await,
Message::CancelPendingPeeks { conn_id } => {
self.cancel_pending_peeks(&conn_id);
}
Message::LinearizeReads => {
self.message_linearize_reads().boxed_local().await;
}
Message::StorageUsageSchedule => {
self.schedule_storage_usage_collection().boxed_local().await;
}
Message::StorageUsageFetch => {
self.storage_usage_fetch().boxed_local().await;
}
Message::StorageUsageUpdate(sizes) => {
self.storage_usage_update(sizes).boxed_local().await;
}
Message::StorageUsagePrune(expired) => {
self.storage_usage_prune(expired).boxed_local().await;
}
Message::RetireExecute {
otel_ctx,
data,
reason,
} => {
otel_ctx.attach_as_parent();
self.retire_execution(reason, data);
}
Message::ExecuteSingleStatementTransaction {
ctx,
otel_ctx,
stmt,
params,
} => {
otel_ctx.attach_as_parent();
self.sequence_execute_single_statement_transaction(ctx, stmt, params)
.boxed_local()
.await;
}
Message::PeekStageReady { ctx, span, stage } => {
self.sequence_staged(ctx, span, stage).boxed_local().await;
}
Message::CreateIndexStageReady { ctx, span, stage } => {
self.sequence_staged(ctx, span, stage).boxed_local().await;
}
Message::CreateViewStageReady { ctx, span, stage } => {
self.sequence_staged(ctx, span, stage).boxed_local().await;
}
Message::CreateMaterializedViewStageReady { ctx, span, stage } => {
self.sequence_staged(ctx, span, stage).boxed_local().await;
}
Message::SubscribeStageReady { ctx, span, stage } => {
self.sequence_staged(ctx, span, stage).boxed_local().await;
}
Message::IntrospectionSubscribeStageReady { span, stage } => {
self.sequence_staged((), span, stage).boxed_local().await;
}
Message::ExplainTimestampStageReady { ctx, span, stage } => {
self.sequence_staged(ctx, span, stage).boxed_local().await;
}
Message::SecretStageReady { ctx, span, stage } => {
self.sequence_staged(ctx, span, stage).boxed_local().await;
}
Message::ClusterStageReady { ctx, span, stage } => {
self.sequence_staged(ctx, span, stage).boxed_local().await;
}
Message::DrainStatementLog => {
self.drain_statement_log();
}
Message::PrivateLinkVpcEndpointEvents(events) => {
if !self.controller.read_only() {
self.controller
.storage
.append_introspection_updates(
mz_storage_client::controller::IntrospectionType::PrivatelinkConnectionStatusHistory,
events
.into_iter()
.map(|e| (mz_repr::Row::from(e), 1))
.collect(),
);
}
}
Message::CheckSchedulingPolicies => {
self.check_scheduling_policies().boxed_local().await;
}
Message::SchedulingDecisions(decisions) => {
self.handle_scheduling_decisions(decisions)
.boxed_local()
.await;
}
Message::DeferredStatementReady => {
self.handle_deferred_statement().boxed_local().await;
}
}
}
#[mz_ore::instrument(level = "debug")]
pub async fn storage_usage_fetch(&mut self) {
let internal_cmd_tx = self.internal_cmd_tx.clone();
let client = self.storage_usage_client.clone();
let live_shards: BTreeSet<_> = self
.controller
.storage
.active_collection_metadatas()
.into_iter()
.flat_map(|(_id, collection_metadata)| {
let CollectionMetadata {
data_shard,
remap_shard,
status_shard,
persist_location: _,
relation_desc: _,
txns_shard: _,
} = collection_metadata;
[remap_shard, status_shard, Some(data_shard)].into_iter()
})
.filter_map(|shard| shard)
.collect();
let collection_metric = self
.metrics
.storage_usage_collection_time_seconds
.with_label_values(&[]);
task::spawn(|| "storage_usage_fetch", async move {
let collection_metric_timer = collection_metric.start_timer();
let shard_sizes = client.shards_usage_referenced(live_shards).await;
collection_metric_timer.observe_duration();
if let Err(e) = internal_cmd_tx.send(Message::StorageUsageUpdate(shard_sizes)) {
warn!("internal_cmd_rx dropped before we could send: {:?}", e);
}
});
}
#[mz_ore::instrument(level = "debug")]
async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced) {
let collection_timestamp = if self.controller.read_only() {
self.peek_local_write_ts().await.into()
} else {
self.get_local_write_ts().await.timestamp.into()
};
let ops = shards_usage
.by_shard
.into_iter()
.map(|(shard_id, shard_usage)| Op::WeirdStorageUsageUpdates {
object_id: Some(shard_id.to_string()),
size_bytes: shard_usage.size_bytes(),
collection_timestamp,
})
.collect();
match self.catalog_transact_inner(None, ops).await {
Ok(table_updates) => {
let internal_cmd_tx = self.internal_cmd_tx.clone();
let task_span =
info_span!(parent: None, "coord::storage_usage_update::table_updates");
OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
task::spawn(|| "storage_usage_update_table_updates", async move {
table_updates.instrument(task_span).await;
if let Err(e) = internal_cmd_tx.send(Message::StorageUsageSchedule) {
warn!("internal_cmd_rx dropped before we could send: {e:?}");
}
});
}
Err(err) => tracing::warn!("Failed to update storage metrics: {:?}", err),
}
}
#[mz_ore::instrument(level = "debug")]
async fn storage_usage_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
let (fut, _) = self.builtin_table_update().execute(expired).await;
task::spawn(|| "storage_usage_pruning_apply", async move {
fut.await;
});
}
pub async fn schedule_storage_usage_collection(&self) {
const SEED_LEN: usize = 32;
let mut seed = [0; SEED_LEN];
for (i, byte) in self
.catalog()
.state()
.config()
.environment_id
.organization_id()
.as_bytes()
.into_iter()
.take(SEED_LEN)
.enumerate()
{
seed[i] = *byte;
}
let storage_usage_collection_interval_ms: EpochMillis =
EpochMillis::try_from(self.storage_usage_collection_interval.as_millis())
.expect("storage usage collection interval must fit into u64");
let offset =
rngs::SmallRng::from_seed(seed).gen_range(0..storage_usage_collection_interval_ms);
let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
let previous_collection_ts =
(now_ts - (now_ts % storage_usage_collection_interval_ms)) + offset;
let next_collection_ts = if previous_collection_ts > now_ts {
previous_collection_ts
} else {
previous_collection_ts + storage_usage_collection_interval_ms
};
let next_collection_interval = Duration::from_millis(next_collection_ts - now_ts);
let internal_cmd_tx = self.internal_cmd_tx.clone();
task::spawn(|| "storage_usage_collection", async move {
tokio::time::sleep(next_collection_interval).await;
if internal_cmd_tx.send(Message::StorageUsageFetch).is_err() {
}
});
}
#[mz_ore::instrument(level = "debug")]
async fn message_command(&mut self, cmd: Command) {
self.handle_command(cmd).await;
}
#[mz_ore::instrument(level = "debug")]
async fn message_controller(&mut self, message: ControllerResponse) {
event!(Level::TRACE, message = format!("{:?}", message));
match message {
ControllerResponse::PeekNotification(uuid, response, otel_ctx) => {
self.handle_peek_notification(uuid, response, otel_ctx);
}
ControllerResponse::SubscribeResponse(sink_id, response) => {
if let Some(ActiveComputeSink::Subscribe(active_subscribe)) =
self.active_compute_sinks.get_mut(&sink_id)
{
let finished = active_subscribe.process_response(response);
if finished {
self.retire_compute_sinks(btreemap! {
sink_id => ActiveComputeSinkRetireReason::Finished,
})
.await;
}
soft_assert_or_log!(
!self.introspection_subscribes.contains_key(&sink_id),
"`sink_id` {sink_id} unexpectedly found in both `active_subscribes` \
and `introspection_subscribes`",
);
} else if self.introspection_subscribes.contains_key(&sink_id) {
self.handle_introspection_subscribe_batch(sink_id, response)
.await;
} else {
}
}
ControllerResponse::CopyToResponse(sink_id, response) => {
match self.drop_compute_sink(sink_id).await {
Some(ActiveComputeSink::CopyTo(active_copy_to)) => {
active_copy_to.retire_with_response(response);
}
_ => {
}
}
}
ControllerResponse::ComputeReplicaMetrics(replica_id, new) => {
let m = match self
.transient_replica_metadata
.entry(replica_id)
.or_insert_with(|| Some(Default::default()))
{
None => return,
Some(md) => &mut md.metrics,
};
let old = std::mem::replace(m, Some(new.clone()));
if old.as_ref() != Some(&new) {
let retractions = old.map(|old| {
self.catalog()
.state()
.pack_replica_metric_updates(replica_id, &old, -1)
});
let insertions = self
.catalog()
.state()
.pack_replica_metric_updates(replica_id, &new, 1);
let updates = if let Some(retractions) = retractions {
retractions
.into_iter()
.chain(insertions.into_iter())
.collect()
} else {
insertions
};
let updates = self
.catalog()
.state()
.resolve_builtin_table_updates(updates);
self.builtin_table_update().background(updates);
}
}
ControllerResponse::WatchSetFinished(ws_ids) => {
let now = self.now();
for ws_id in ws_ids {
let Some((conn_id, rsp)) = self.installed_watch_sets.remove(&ws_id) else {
continue;
};
self.connection_watch_sets
.get_mut(&conn_id)
.expect("corrupted coordinator state: unknown connection id")
.remove(&ws_id);
if self.connection_watch_sets[&conn_id].is_empty() {
self.connection_watch_sets.remove(&conn_id);
}
match rsp {
WatchSetResponse::StatementDependenciesReady(id, ev) => {
self.record_statement_lifecycle_event(&id, &ev, now);
}
WatchSetResponse::AlterSinkReady(ctx) => {
self.sequence_alter_sink_finish(ctx).await;
}
}
}
}
}
}
#[mz_ore::instrument(level = "debug")]
async fn message_purified_statement_ready(
&mut self,
PurifiedStatementReady {
ctx,
result,
params,
mut plan_validity,
original_stmt,
otel_ctx,
}: PurifiedStatementReady,
) {
otel_ctx.attach_as_parent();
if plan_validity.check(self.catalog()).is_err() {
self.handle_execute_inner(original_stmt, params, ctx).await;
return;
}
let purified_statement = match result {
Ok(ok) => ok,
Err(e) => return ctx.retire(Err(e)),
};
let plan = match purified_statement {
PurifiedStatement::PurifiedCreateSource {
create_progress_subsource_stmt,
create_source_stmt,
subsources,
available_source_references,
} => {
self.plan_purified_create_source(
&ctx,
params,
create_progress_subsource_stmt,
create_source_stmt,
subsources,
available_source_references,
)
.await
}
PurifiedStatement::PurifiedAlterSourceAddSubsources {
source_name,
options,
subsources,
} => {
self.plan_purified_alter_source_add_subsource(
ctx.session(),
params,
source_name,
options,
subsources,
)
.await
}
PurifiedStatement::PurifiedAlterSourceRefreshReferences {
source_name,
available_source_references,
} => self.plan_purified_alter_source_refresh_references(
ctx.session(),
params,
source_name,
available_source_references,
),
o @ (PurifiedStatement::PurifiedAlterSource { .. }
| PurifiedStatement::PurifiedCreateSink(..)
| PurifiedStatement::PurifiedCreateTableFromSource { .. }) => {
let stmt = match o {
PurifiedStatement::PurifiedAlterSource { alter_source_stmt } => {
Statement::AlterSource(alter_source_stmt)
}
PurifiedStatement::PurifiedCreateTableFromSource { stmt } => {
Statement::CreateTableFromSource(stmt)
}
PurifiedStatement::PurifiedCreateSink(stmt) => Statement::CreateSink(stmt),
PurifiedStatement::PurifiedCreateSource { .. }
| PurifiedStatement::PurifiedAlterSourceAddSubsources { .. }
| PurifiedStatement::PurifiedAlterSourceRefreshReferences { .. } => {
unreachable!("not part of exterior match stmt")
}
};
let catalog = self.catalog().for_session(ctx.session());
let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &stmt);
self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids)
.map(|plan| (plan, resolved_ids))
}
};
match plan {
Ok((plan, resolved_ids)) => self.sequence_plan(ctx, plan, resolved_ids).await,
Err(e) => ctx.retire(Err(e)),
}
}
#[mz_ore::instrument(level = "debug")]
async fn message_create_connection_validation_ready(
&mut self,
CreateConnectionValidationReady {
mut ctx,
result,
connection_id,
connection_gid,
mut plan_validity,
otel_ctx,
resolved_ids,
}: CreateConnectionValidationReady,
) {
otel_ctx.attach_as_parent();
if let Err(e) = plan_validity.check(self.catalog()) {
let _ = self.secrets_controller.delete(connection_id).await;
return ctx.retire(Err(e));
}
let plan = match result {
Ok(ok) => ok,
Err(e) => {
let _ = self.secrets_controller.delete(connection_id).await;
return ctx.retire(Err(e));
}
};
let result = self
.sequence_create_connection_stage_finish(
ctx.session_mut(),
connection_id,
connection_gid,
plan,
resolved_ids,
)
.await;
ctx.retire(result);
}
#[mz_ore::instrument(level = "debug")]
async fn message_alter_connection_validation_ready(
&mut self,
AlterConnectionValidationReady {
mut ctx,
result,
connection_id,
connection_gid: _,
mut plan_validity,
otel_ctx,
resolved_ids: _,
}: AlterConnectionValidationReady,
) {
otel_ctx.attach_as_parent();
if let Err(e) = plan_validity.check(self.catalog()) {
return ctx.retire(Err(e));
}
let conn = match result {
Ok(ok) => ok,
Err(e) => {
return ctx.retire(Err(e));
}
};
let result = self
.sequence_alter_connection_stage_finish(ctx.session_mut(), connection_id, conn)
.await;
ctx.retire(result);
}
#[mz_ore::instrument(level = "debug")]
async fn message_cluster_event(&mut self, event: ClusterEvent) {
event!(Level::TRACE, event = format!("{:?}", event));
if let Some(segment_client) = &self.segment_client {
let env_id = &self.catalog().config().environment_id;
let mut properties = json!({
"cluster_id": event.cluster_id.to_string(),
"replica_id": event.replica_id.to_string(),
"process_id": event.process_id,
"status": event.status.as_kebab_case_str(),
});
match event.status {
ClusterStatus::Online => (),
ClusterStatus::Offline(reason) => {
let properties = match &mut properties {
serde_json::Value::Object(map) => map,
_ => unreachable!(),
};
properties.insert(
"reason".into(),
json!(reason.display_or("unknown").to_string()),
);
}
};
segment_client.environment_track(
env_id,
"Cluster Changed Status",
properties,
EventDetails {
timestamp: Some(event.time),
..Default::default()
},
);
}
let Some(replica_statues) = self
.cluster_replica_statuses
.try_get_cluster_replica_statuses(event.cluster_id, event.replica_id)
else {
return;
};
if event.status != replica_statues[&event.process_id].status {
if !self.controller.read_only() {
let offline_reason = match event.status {
ClusterStatus::Online => None,
ClusterStatus::Offline(None) => None,
ClusterStatus::Offline(Some(reason)) => Some(reason.to_string()),
};
let row = Row::pack_slice(&[
Datum::String(&event.replica_id.to_string()),
Datum::UInt64(event.process_id),
Datum::String(event.status.as_kebab_case_str()),
Datum::from(offline_reason.as_deref()),
Datum::TimestampTz(event.time.try_into().expect("must fit")),
]);
self.controller.storage.append_introspection_updates(
IntrospectionType::ReplicaStatusHistory,
vec![(row, 1)],
);
}
let old_replica_status =
ClusterReplicaStatuses::cluster_replica_status(replica_statues);
let old_process_status = replica_statues
.get(&event.process_id)
.expect("Process exists");
let builtin_table_retraction =
self.catalog().state().pack_cluster_replica_status_update(
event.replica_id,
event.process_id,
old_process_status,
-1,
);
let builtin_table_retraction = self
.catalog()
.state()
.resolve_builtin_table_update(builtin_table_retraction);
let new_process_status = ClusterReplicaProcessStatus {
status: event.status,
time: event.time,
};
let builtin_table_addition = self.catalog().state().pack_cluster_replica_status_update(
event.replica_id,
event.process_id,
&new_process_status,
1,
);
let builtin_table_addition = self
.catalog()
.state()
.resolve_builtin_table_update(builtin_table_addition);
self.cluster_replica_statuses.ensure_cluster_status(
event.cluster_id,
event.replica_id,
event.process_id,
new_process_status,
);
let builtin_table_updates = vec![builtin_table_retraction, builtin_table_addition];
self.builtin_table_update()
.execute(builtin_table_updates)
.await
.0
.instrument(info_span!("coord::message_cluster_event::table_updates"))
.await;
let cluster = self.catalog().get_cluster(event.cluster_id);
let replica = cluster.replica(event.replica_id).expect("Replica exists");
let new_replica_status = self
.cluster_replica_statuses
.get_cluster_replica_status(event.cluster_id, event.replica_id);
if old_replica_status != new_replica_status {
self.broadcast_notice(AdapterNotice::ClusterReplicaStatusChanged {
cluster: cluster.name.clone(),
replica: replica.name.clone(),
status: new_replica_status,
time: event.time,
});
}
}
}
#[mz_ore::instrument(level = "debug")]
async fn message_linearize_reads(&mut self) {
let mut shortest_wait = Duration::MAX;
let mut ready_txns = Vec::new();
let mut cached_oracle_ts = BTreeMap::new();
for (conn_id, mut read_txn) in std::mem::take(&mut self.pending_linearize_read_txns) {
if let TimestampContext::TimelineTimestamp {
timeline,
chosen_ts,
oracle_ts,
} = read_txn.timestamp_context()
{
let oracle_ts = match oracle_ts {
Some(oracle_ts) => oracle_ts,
None => {
ready_txns.push(read_txn);
continue;
}
};
if chosen_ts <= oracle_ts {
ready_txns.push(read_txn);
continue;
}
let current_oracle_ts = cached_oracle_ts.entry(timeline.clone());
let current_oracle_ts = match current_oracle_ts {
btree_map::Entry::Vacant(entry) => {
let timestamp_oracle = self.get_timestamp_oracle(timeline);
let read_ts = timestamp_oracle.read_ts().await;
entry.insert(read_ts.clone());
read_ts
}
btree_map::Entry::Occupied(entry) => entry.get().clone(),
};
if *chosen_ts <= current_oracle_ts {
ready_txns.push(read_txn);
} else {
let wait =
Duration::from_millis(chosen_ts.saturating_sub(current_oracle_ts).into());
if wait < shortest_wait {
shortest_wait = wait;
}
read_txn.num_requeues += 1;
self.pending_linearize_read_txns.insert(conn_id, read_txn);
}
} else {
ready_txns.push(read_txn);
}
}
if !ready_txns.is_empty() {
let otel_ctx = ready_txns.first().expect("known to exist").otel_ctx.clone();
let span = tracing::debug_span!("message_linearize_reads");
otel_ctx.attach_as_parent_to(&span);
let now = Instant::now();
for ready_txn in ready_txns {
let span = tracing::debug_span!("retire_read_results");
ready_txn.otel_ctx.attach_as_parent_to(&span);
let _entered = span.enter();
self.metrics
.linearize_message_seconds
.with_label_values(&[
ready_txn.txn.label(),
if ready_txn.num_requeues == 0 {
"true"
} else {
"false"
},
])
.observe((now - ready_txn.created).as_secs_f64());
if let Some((ctx, result)) = ready_txn.txn.finish() {
ctx.retire(result);
}
}
}
if !self.pending_linearize_read_txns.is_empty() {
let remaining_ms = std::cmp::min(shortest_wait, Duration::from_millis(1_000));
let internal_cmd_tx = self.internal_cmd_tx.clone();
task::spawn(|| "deferred_read_txns", async move {
tokio::time::sleep(remaining_ms).await;
let result = internal_cmd_tx.send(Message::LinearizeReads);
if let Err(e) = result {
warn!("internal_cmd_rx dropped before we could send: {:?}", e);
}
});
}
}
}