#![allow(unknown_lints)]
#![allow(clippy::style)]
#![allow(clippy::complexity)]
#![allow(clippy::large_enum_variant)]
#![allow(clippy::mutable_key_type)]
#![allow(clippy::stable_sort_primitive)]
#![allow(clippy::map_entry)]
#![allow(clippy::box_default)]
#![allow(clippy::drain_collect)]
#![warn(clippy::bool_comparison)]
#![warn(clippy::clone_on_ref_ptr)]
#![warn(clippy::no_effect)]
#![warn(clippy::unnecessary_unwrap)]
#![warn(clippy::dbg_macro)]
#![warn(clippy::todo)]
#![warn(clippy::wildcard_dependencies)]
#![warn(clippy::zero_prefixed_literal)]
#![warn(clippy::borrowed_box)]
#![warn(clippy::deref_addrof)]
#![warn(clippy::double_must_use)]
#![warn(clippy::double_parens)]
#![warn(clippy::extra_unused_lifetimes)]
#![warn(clippy::needless_borrow)]
#![warn(clippy::needless_question_mark)]
#![warn(clippy::needless_return)]
#![warn(clippy::redundant_pattern)]
#![warn(clippy::redundant_slicing)]
#![warn(clippy::redundant_static_lifetimes)]
#![warn(clippy::single_component_path_imports)]
#![warn(clippy::unnecessary_cast)]
#![warn(clippy::useless_asref)]
#![warn(clippy::useless_conversion)]
#![warn(clippy::builtin_type_shadow)]
#![warn(clippy::duplicate_underscore_argument)]
#![warn(clippy::double_neg)]
#![warn(clippy::unnecessary_mut_passed)]
#![warn(clippy::wildcard_in_or_patterns)]
#![warn(clippy::crosspointer_transmute)]
#![warn(clippy::excessive_precision)]
#![warn(clippy::overflow_check_conditional)]
#![warn(clippy::as_conversions)]
#![warn(clippy::match_overlapping_arm)]
#![warn(clippy::zero_divided_by_zero)]
#![warn(clippy::must_use_unit)]
#![warn(clippy::suspicious_assignment_formatting)]
#![warn(clippy::suspicious_else_formatting)]
#![warn(clippy::suspicious_unary_op_formatting)]
#![warn(clippy::mut_mutex_lock)]
#![warn(clippy::print_literal)]
#![warn(clippy::same_item_push)]
#![warn(clippy::useless_format)]
#![warn(clippy::write_literal)]
#![warn(clippy::redundant_closure)]
#![warn(clippy::redundant_closure_call)]
#![warn(clippy::unnecessary_lazy_evaluations)]
#![warn(clippy::partialeq_ne_impl)]
#![warn(clippy::redundant_field_names)]
#![warn(clippy::transmutes_expressible_as_ptr_casts)]
#![warn(clippy::unused_async)]
#![warn(clippy::disallowed_methods)]
#![warn(clippy::disallowed_macros)]
#![warn(clippy::disallowed_types)]
#![warn(clippy::from_over_into)]
use std::collections::BTreeMap;
use std::mem;
use std::num::NonZeroI64;
use std::sync::Arc;
use differential_dataflow::lattice::Lattice;
use futures::future::BoxFuture;
use futures::stream::{Peekable, StreamExt};
use mz_build_info::BuildInfo;
use mz_cluster_client::ReplicaId;
use mz_compute_client::controller::{
ActiveComputeController, ComputeController, ComputeControllerResponse,
};
use mz_compute_client::protocol::response::{PeekResponse, SubscribeResponse};
use mz_compute_client::service::{ComputeClient, ComputeGrpcClient};
use mz_orchestrator::{NamespacedOrchestrator, Orchestrator, ServiceProcessMetrics};
use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::{EpochMillis, NowFn};
use mz_ore::task::AbortOnDropHandle;
use mz_ore::tracing::OpenTelemetryContext;
use mz_persist_client::cache::PersistClientCache;
use mz_persist_client::PersistLocation;
use mz_persist_types::Codec64;
use mz_proto::RustType;
use mz_repr::{GlobalId, TimestampManipulation};
use mz_service::secrets::SecretsReaderCliArgs;
use mz_stash_types::metrics::Metrics as StashMetrics;
use mz_storage_client::client::{
ProtoStorageCommand, ProtoStorageResponse, StorageCommand, StorageResponse,
};
use mz_storage_client::controller::StorageController;
use mz_storage_types::connections::ConnectionContext;
use serde::{Deserialize, Serialize};
use timely::order::TotalOrder;
use timely::progress::Timestamp;
use tokio::sync::mpsc::{self, UnboundedSender};
use tokio::time::{self, Duration, Interval, MissedTickBehavior};
use tokio_stream::wrappers::UnboundedReceiverStream;
use uuid::Uuid;
pub mod clusters;
#[derive(Debug, Clone)]
pub struct ControllerConfig {
pub build_info: &'static BuildInfo,
pub orchestrator: Arc<dyn Orchestrator>,
pub persist_location: PersistLocation,
pub persist_clients: Arc<PersistClientCache>,
pub storage_stash_url: String,
pub clusterd_image: String,
pub init_container_image: Option<String>,
pub now: NowFn,
pub stash_metrics: Arc<StashMetrics>,
pub metrics_registry: MetricsRegistry,
pub persist_pubsub_url: String,
pub secrets_args: SecretsReaderCliArgs,
pub connection_context: ConnectionContext,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ControllerResponse<T = mz_repr::Timestamp> {
PeekResponse(Uuid, PeekResponse, OpenTelemetryContext),
SubscribeResponse(GlobalId, SubscribeResponse<T>),
ComputeReplicaMetrics(ReplicaId, Vec<ServiceProcessMetrics>),
}
impl<T> From<ComputeControllerResponse<T>> for ControllerResponse<T> {
fn from(r: ComputeControllerResponse<T>) -> ControllerResponse<T> {
match r {
ComputeControllerResponse::PeekResponse(uuid, peek, otel_ctx) => {
ControllerResponse::PeekResponse(uuid, peek, otel_ctx)
}
ComputeControllerResponse::SubscribeResponse(id, tail) => {
ControllerResponse::SubscribeResponse(id, tail)
}
}
}
}
#[derive(Default)]
enum Readiness {
#[default]
NotReady,
Storage,
Compute,
Metrics,
Frontiers,
}
pub struct Controller<T = mz_repr::Timestamp> {
pub storage: Box<dyn StorageController<Timestamp = T>>,
pub compute: ComputeController<T>,
clusterd_image: String,
init_container_image: Option<String>,
orchestrator: Arc<dyn NamespacedOrchestrator>,
readiness: Readiness,
metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>,
metrics_tx: UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>,
metrics_rx: Peekable<UnboundedReceiverStream<(ReplicaId, Vec<ServiceProcessMetrics>)>>,
frontiers_ticker: Interval,
persist_pubsub_url: String,
enable_persist_txn_tables: bool,
secrets_args: SecretsReaderCliArgs,
connection_context: ConnectionContext,
}
impl<T> Controller<T> {
pub fn active_compute(&mut self) -> ActiveComputeController<T> {
self.compute.activate(&mut *self.storage)
}
pub fn set_default_idle_arrangement_merge_effort(&mut self, value: u32) {
self.compute
.set_default_idle_arrangement_merge_effort(value);
}
pub fn set_default_arrangement_exert_proportionality(&mut self, value: u32) {
self.compute
.set_default_arrangement_exert_proportionality(value);
}
}
impl<T> Controller<T>
where
T: Timestamp + Lattice,
ComputeGrpcClient: ComputeClient<T>,
{
pub fn update_orchestrator_scheduling_config(
&mut self,
config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
) {
self.orchestrator.update_scheduling_config(config);
}
pub fn initialization_complete(&mut self) {
self.storage.initialization_complete();
self.compute.initialization_complete();
}
pub async fn ready(&mut self) {
if let Readiness::NotReady = self.readiness {
tokio::select! {
() = self.storage.ready() => {
self.readiness = Readiness::Storage;
}
() = self.compute.ready() => {
self.readiness = Readiness::Compute;
}
_ = Pin::new(&mut self.metrics_rx).peek() => {
self.readiness = Readiness::Metrics;
}
_ = self.frontiers_ticker.tick() => {
self.readiness = Readiness::Frontiers;
}
}
}
}
#[tracing::instrument(level = "debug", skip(self))]
pub async fn process(&mut self) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
match mem::take(&mut self.readiness) {
Readiness::NotReady => Ok(None),
Readiness::Storage => {
self.storage.process().await?;
Ok(None)
}
Readiness::Compute => {
let response = self.active_compute().process().await;
Ok(response.map(Into::into))
}
Readiness::Metrics => Ok(self
.metrics_rx
.next()
.await
.map(|(id, metrics)| ControllerResponse::ComputeReplicaMetrics(id, metrics))),
Readiness::Frontiers => {
self.record_frontiers().await;
Ok(None)
}
}
}
async fn record_frontiers(&mut self) {
let compute_frontiers = self.compute.collection_frontiers();
self.storage.record_frontiers(compute_frontiers).await;
let compute_replica_frontiers = self.compute.replica_write_frontiers();
self.storage
.record_replica_frontiers(compute_replica_frontiers)
.await;
}
#[allow(unused)]
#[allow(clippy::unused_async)]
pub fn recent_timestamp(
&self,
source_ids: impl Iterator<Item = GlobalId>,
) -> BoxFuture<'static, T> {
Box::pin(async { T::minimum() })
}
}
impl<T> Controller<T>
where
T: Timestamp
+ Lattice
+ TotalOrder
+ TryInto<i64>
+ TryFrom<i64>
+ Codec64
+ Unpin
+ From<EpochMillis>
+ TimestampManipulation,
<T as TryInto<i64>>::Error: std::fmt::Debug,
<T as TryFrom<i64>>::Error: std::fmt::Debug,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
mz_storage_controller::Controller<T>: StorageController<Timestamp = T>,
{
pub async fn new(
config: ControllerConfig,
envd_epoch: NonZeroI64,
enable_persist_txn_tables: bool,
) -> Self {
let storage_controller = mz_storage_controller::Controller::new(
config.build_info,
config.storage_stash_url,
config.persist_location,
config.persist_clients,
config.now,
config.stash_metrics,
envd_epoch,
config.metrics_registry.clone(),
enable_persist_txn_tables,
)
.await;
let compute_controller = ComputeController::new(
config.build_info,
envd_epoch,
config.metrics_registry.clone(),
);
let (metrics_tx, metrics_rx) = mpsc::unbounded_channel();
let mut frontiers_ticker = time::interval(Duration::from_secs(1));
frontiers_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
Self {
storage: Box::new(storage_controller),
compute: compute_controller,
clusterd_image: config.clusterd_image,
init_container_image: config.init_container_image,
orchestrator: config.orchestrator.namespace("cluster"),
readiness: Readiness::NotReady,
metrics_tasks: BTreeMap::new(),
metrics_tx,
metrics_rx: UnboundedReceiverStream::new(metrics_rx).peekable(),
frontiers_ticker,
persist_pubsub_url: config.persist_pubsub_url,
enable_persist_txn_tables,
secrets_args: config.secrets_args,
connection_context: config.connection_context,
}
}
pub fn connection_context(&self) -> &ConnectionContext {
&self.connection_context
}
}