1use std::collections::btree_map::Entry;
25use std::collections::{BTreeMap, BTreeSet};
26use std::mem;
27use std::num::NonZeroI64;
28use std::sync::Arc;
29use std::time::Duration;
30
31use futures::future::BoxFuture;
32use mz_build_info::BuildInfo;
33use mz_cluster_client::metrics::ControllerMetrics;
34use mz_cluster_client::{ReplicaId, WallclockLagFn};
35use mz_compute_client::controller::{
36 ComputeController, ComputeControllerResponse, ComputeControllerTimestamp, PeekNotification,
37};
38use mz_compute_client::protocol::response::SubscribeBatch;
39use mz_compute_client::service::{ComputeClient, ComputeGrpcClient};
40use mz_controller_types::WatchSetId;
41use mz_dyncfg::{ConfigSet, ConfigUpdates};
42use mz_orchestrator::{NamespacedOrchestrator, Orchestrator, ServiceProcessMetrics};
43use mz_ore::id_gen::Gen;
44use mz_ore::instrument;
45use mz_ore::metrics::MetricsRegistry;
46use mz_ore::now::{EpochMillis, NowFn};
47use mz_ore::task::AbortOnDropHandle;
48use mz_ore::tracing::OpenTelemetryContext;
49use mz_persist_client::PersistLocation;
50use mz_persist_client::cache::PersistClientCache;
51use mz_persist_types::Codec64;
52use mz_proto::RustType;
53use mz_repr::{Datum, GlobalId, Row, TimestampManipulation};
54use mz_service::secrets::SecretsReaderCliArgs;
55use mz_storage_client::client::{
56 ProtoStorageCommand, ProtoStorageResponse, StorageCommand, StorageResponse,
57};
58use mz_storage_client::controller::{
59 IntrospectionType, StorageController, StorageMetadata, StorageTxn,
60};
61use mz_storage_client::storage_collections::{self, StorageCollections};
62use mz_storage_types::configuration::StorageConfiguration;
63use mz_storage_types::connections::ConnectionContext;
64use mz_storage_types::controller::StorageError;
65use mz_txn_wal::metrics::Metrics as TxnMetrics;
66use serde::Serialize;
67use timely::progress::{Antichain, Timestamp};
68use tokio::sync::mpsc;
69use uuid::Uuid;
70
71pub mod clusters;
72
73pub use mz_storage_controller::prepare_initialization;
76
77#[derive(Debug, Clone)]
79pub struct ControllerConfig {
80 pub build_info: &'static BuildInfo,
82 pub orchestrator: Arc<dyn Orchestrator>,
84 pub persist_location: PersistLocation,
86 pub persist_clients: Arc<PersistClientCache>,
90 pub clusterd_image: String,
92 pub init_container_image: Option<String>,
94 pub deploy_generation: u64,
99 pub now: NowFn,
101 pub metrics_registry: MetricsRegistry,
103 pub persist_pubsub_url: String,
105 pub secrets_args: SecretsReaderCliArgs,
107 pub connection_context: ConnectionContext,
109}
110
111#[derive(Debug)]
113pub enum ControllerResponse<T = mz_repr::Timestamp> {
114 PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
120 SubscribeResponse(GlobalId, SubscribeBatch<T>),
122 CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
124 WatchSetFinished(Vec<WatchSetId>),
128}
129
130#[derive(Debug, Default)]
133pub enum Readiness<T> {
134 #[default]
136 NotReady,
137 Storage,
139 Compute,
141 Metrics((ReplicaId, Vec<ServiceProcessMetrics>)),
143 Internal(ControllerResponse<T>),
145}
146
147pub struct Controller<T: ComputeControllerTimestamp = mz_repr::Timestamp> {
149 pub storage: Box<dyn StorageController<Timestamp = T>>,
150 pub storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
151 pub compute: ComputeController<T>,
152 clusterd_image: String,
154 init_container_image: Option<String>,
156 deploy_generation: u64,
158 read_only: bool,
164 orchestrator: Arc<dyn NamespacedOrchestrator>,
166 readiness: Readiness<T>,
168 metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>,
170 metrics_tx: mpsc::UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>,
172 metrics_rx: mpsc::UnboundedReceiver<(ReplicaId, Vec<ServiceProcessMetrics>)>,
174 now: NowFn,
176
177 persist_pubsub_url: String,
179
180 secrets_args: SecretsReaderCliArgs,
182
183 unfulfilled_watch_sets_by_object: BTreeMap<GlobalId, BTreeSet<WatchSetId>>,
191 unfulfilled_watch_sets: BTreeMap<WatchSetId, (BTreeSet<GlobalId>, T)>,
193 watch_set_id_gen: Gen<WatchSetId>,
195
196 immediate_watch_sets: Vec<WatchSetId>,
202
203 dyncfg: ConfigSet,
205}
206
207impl<T: ComputeControllerTimestamp> Controller<T> {
208 pub fn update_configuration(&mut self, updates: ConfigUpdates) {
210 updates.apply(&self.dyncfg);
211 }
212
213 pub fn start_compute_introspection_sink(&mut self) {
218 self.compute.start_introspection_sink(&*self.storage);
219 }
220
221 pub fn connection_context(&self) -> &ConnectionContext {
225 &self.storage.config().connection_context
226 }
227
228 pub fn storage_configuration(&self) -> &StorageConfiguration {
232 self.storage.config()
233 }
234
235 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
239 let Self {
246 storage_collections: _,
247 storage: _,
248 compute,
249 clusterd_image: _,
250 init_container_image: _,
251 deploy_generation,
252 read_only,
253 orchestrator: _,
254 readiness,
255 metrics_tasks: _,
256 metrics_tx: _,
257 metrics_rx: _,
258 now: _,
259 persist_pubsub_url: _,
260 secrets_args: _,
261 unfulfilled_watch_sets_by_object: _,
262 unfulfilled_watch_sets,
263 watch_set_id_gen: _,
264 immediate_watch_sets,
265 dyncfg: _,
266 } = self;
267
268 let compute = compute.dump().await?;
269
270 let unfulfilled_watch_sets: BTreeMap<_, _> = unfulfilled_watch_sets
271 .iter()
272 .map(|(ws_id, watches)| (format!("{ws_id:?}"), format!("{watches:?}")))
273 .collect();
274 let immediate_watch_sets: Vec<_> = immediate_watch_sets
275 .iter()
276 .map(|watch| format!("{watch:?}"))
277 .collect();
278
279 fn field(
280 key: &str,
281 value: impl Serialize,
282 ) -> Result<(String, serde_json::Value), anyhow::Error> {
283 let value = serde_json::to_value(value)?;
284 Ok((key.to_string(), value))
285 }
286
287 let map = serde_json::Map::from_iter([
288 field("compute", compute)?,
289 field("deploy_generation", deploy_generation)?,
290 field("read_only", read_only)?,
291 field("readiness", format!("{readiness:?}"))?,
292 field("unfulfilled_watch_sets", unfulfilled_watch_sets)?,
293 field("immediate_watch_sets", immediate_watch_sets)?,
294 ]);
295 Ok(serde_json::Value::Object(map))
296 }
297}
298
299impl<T> Controller<T>
300where
301 T: ComputeControllerTimestamp,
302 ComputeGrpcClient: ComputeClient<T>,
303{
304 pub fn update_orchestrator_scheduling_config(
305 &self,
306 config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
307 ) {
308 self.orchestrator.update_scheduling_config(config);
309 }
310 pub fn initialization_complete(&mut self) {
316 self.storage.initialization_complete();
317 self.compute.initialization_complete();
318 }
319
320 pub fn read_only(&self) -> bool {
322 self.read_only
323 }
324
325 fn take_internal_response(&mut self) -> Option<ControllerResponse<T>> {
329 let ws = std::mem::take(&mut self.immediate_watch_sets);
330 (!ws.is_empty()).then_some(ControllerResponse::WatchSetFinished(ws))
331 }
332
333 pub async fn ready(&mut self) {
342 if let Readiness::NotReady = self.readiness {
343 if let Some(response) = self.take_internal_response() {
350 self.readiness = Readiness::Internal(response);
351 } else {
352 tokio::select! {
355 () = self.storage.ready() => {
356 self.readiness = Readiness::Storage;
357 }
358 () = self.compute.ready() => {
359 self.readiness = Readiness::Compute;
360 }
361 Some(metrics) = self.metrics_rx.recv() => {
362 self.readiness = Readiness::Metrics(metrics);
363 }
364 }
365 }
366 }
367 }
368
369 pub fn get_readiness(&self) -> &Readiness<T> {
371 &self.readiness
372 }
373
374 pub fn install_compute_watch_set(
383 &mut self,
384 mut objects: BTreeSet<GlobalId>,
385 t: T,
386 ) -> WatchSetId {
387 let ws_id = self.watch_set_id_gen.allocate_id();
388
389 objects.retain(|id| {
390 let frontier = self
391 .compute
392 .collection_frontiers(*id, None)
393 .map(|f| f.write_frontier)
394 .expect("missing compute dependency");
395 frontier.less_equal(&t)
396 });
397 if objects.is_empty() {
398 self.immediate_watch_sets.push(ws_id);
399 } else {
400 for id in objects.iter() {
401 self.unfulfilled_watch_sets_by_object
402 .entry(*id)
403 .or_default()
404 .insert(ws_id);
405 }
406 self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
407 }
408
409 ws_id
410 }
411
412 pub fn install_storage_watch_set(
421 &mut self,
422 mut objects: BTreeSet<GlobalId>,
423 t: T,
424 ) -> WatchSetId {
425 let ws_id = self.watch_set_id_gen.allocate_id();
426
427 let uppers = self
428 .storage
429 .collections_frontiers(objects.iter().cloned().collect())
430 .expect("missing storage dependencies")
431 .into_iter()
432 .map(|(id, _since, upper)| (id, upper))
433 .collect::<BTreeMap<_, _>>();
434
435 objects.retain(|id| {
436 let upper = uppers.get(id).expect("missing collection");
437 upper.less_equal(&t)
438 });
439 if objects.is_empty() {
440 self.immediate_watch_sets.push(ws_id);
441 } else {
442 for id in objects.iter() {
443 self.unfulfilled_watch_sets_by_object
444 .entry(*id)
445 .or_default()
446 .insert(ws_id);
447 }
448 self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
449 }
450 ws_id
451 }
452
453 pub fn uninstall_watch_set(&mut self, ws_id: &WatchSetId) {
459 if let Some((obj_ids, _)) = self.unfulfilled_watch_sets.remove(ws_id) {
460 for obj_id in obj_ids {
461 let mut entry = match self.unfulfilled_watch_sets_by_object.entry(obj_id) {
462 Entry::Occupied(entry) => entry,
463 Entry::Vacant(_) => panic!("corrupted watchset state"),
464 };
465 entry.get_mut().remove(ws_id);
466 if entry.get().is_empty() {
467 entry.remove();
468 }
469 }
470 }
471 }
472
473 fn process_storage_response(
476 &mut self,
477 storage_metadata: &StorageMetadata,
478 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
479 let maybe_response = self.storage.process(storage_metadata)?;
480 Ok(maybe_response.and_then(
481 |mz_storage_client::controller::Response::FrontierUpdates(r)| {
482 self.handle_frontier_updates(&r)
483 },
484 ))
485 }
486
487 fn process_compute_response(&mut self) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
490 let response = self.compute.process();
491
492 let response = response.and_then(|r| match r {
493 ComputeControllerResponse::PeekNotification(uuid, peek, otel_ctx) => {
494 Some(ControllerResponse::PeekNotification(uuid, peek, otel_ctx))
495 }
496 ComputeControllerResponse::SubscribeResponse(id, tail) => {
497 Some(ControllerResponse::SubscribeResponse(id, tail))
498 }
499 ComputeControllerResponse::CopyToResponse(id, tail) => {
500 Some(ControllerResponse::CopyToResponse(id, tail))
501 }
502 ComputeControllerResponse::FrontierUpper { id, upper } => {
503 self.handle_frontier_updates(&[(id, upper)])
504 }
505 });
506 Ok(response)
507 }
508
509 #[mz_ore::instrument(level = "debug")]
517 pub fn process(
518 &mut self,
519 storage_metadata: &StorageMetadata,
520 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
521 match mem::take(&mut self.readiness) {
522 Readiness::NotReady => Ok(None),
523 Readiness::Storage => self.process_storage_response(storage_metadata),
524 Readiness::Compute => self.process_compute_response(),
525 Readiness::Metrics((id, metrics)) => self.process_replica_metrics(id, metrics),
526 Readiness::Internal(message) => Ok(Some(message)),
527 }
528 }
529
530 fn handle_frontier_updates(
534 &mut self,
535 updates: &[(GlobalId, Antichain<T>)],
536 ) -> Option<ControllerResponse<T>> {
537 let mut finished = vec![];
538 for (obj_id, antichain) in updates {
539 let ws_ids = self.unfulfilled_watch_sets_by_object.entry(*obj_id);
540 if let Entry::Occupied(mut ws_ids) = ws_ids {
541 ws_ids.get_mut().retain(|ws_id| {
542 let mut entry = match self.unfulfilled_watch_sets.entry(*ws_id) {
543 Entry::Occupied(entry) => entry,
544 Entry::Vacant(_) => panic!("corrupted watchset state"),
545 };
546 if !antichain.less_equal(&entry.get().1) {
548 entry.get_mut().0.remove(obj_id);
550 if entry.get().0.is_empty() {
552 entry.remove();
553 finished.push(*ws_id);
554 }
555 false
557 } else {
558 true
560 }
561 });
562 if ws_ids.get().is_empty() {
564 ws_ids.remove();
565 }
566 }
567 }
568 (!(finished.is_empty())).then(|| ControllerResponse::WatchSetFinished(finished))
569 }
570
571 fn process_replica_metrics(
572 &mut self,
573 id: ReplicaId,
574 metrics: Vec<ServiceProcessMetrics>,
575 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
576 self.record_replica_metrics(id, &metrics);
577 Ok(None)
578 }
579
580 fn record_replica_metrics(&mut self, replica_id: ReplicaId, metrics: &[ServiceProcessMetrics]) {
581 if self.read_only() {
582 return;
583 }
584
585 let now = mz_ore::now::to_datetime((self.now)());
586 let now_tz = now.try_into().expect("must fit");
587
588 let replica_id = replica_id.to_string();
589 let mut row = Row::default();
590 let updates = metrics
591 .iter()
592 .zip(0..)
593 .map(|(m, process_id)| {
594 row.packer().extend(&[
595 Datum::String(&replica_id),
596 Datum::UInt64(process_id),
597 m.cpu_nano_cores.into(),
598 m.memory_bytes.into(),
599 m.disk_usage_bytes.into(),
600 Datum::TimestampTz(now_tz),
601 ]);
602 (row.clone(), mz_repr::Diff::ONE)
603 })
604 .collect();
605
606 self.storage
607 .append_introspection_updates(IntrospectionType::ReplicaMetricsHistory, updates);
608 }
609
610 pub async fn determine_real_time_recent_timestamp(
621 &self,
622 ids: BTreeSet<GlobalId>,
623 timeout: Duration,
624 ) -> Result<BoxFuture<'static, Result<T, StorageError<T>>>, StorageError<T>> {
625 self.storage.real_time_recent_timestamp(ids, timeout).await
626 }
627}
628
629impl<T> Controller<T>
630where
631 T: Timestamp
633 + Codec64
634 + From<EpochMillis>
635 + TimestampManipulation
636 + std::fmt::Display
637 + Into<mz_repr::Timestamp>,
638 StorageCommand<T>: RustType<ProtoStorageCommand>,
639 StorageResponse<T>: RustType<ProtoStorageResponse>,
640 ComputeGrpcClient: ComputeClient<T>,
641 T: ComputeControllerTimestamp,
643{
644 #[instrument(name = "controller::new")]
652 pub async fn new(
653 config: ControllerConfig,
654 envd_epoch: NonZeroI64,
655 read_only: bool,
656 storage_txn: &dyn StorageTxn<T>,
657 ) -> Self {
658 if read_only {
659 tracing::info!("starting controllers in read-only mode!");
660 }
661
662 let now_fn = config.now.clone();
663 let wallclock_lag_fn = WallclockLagFn::new(now_fn);
664
665 let controller_metrics = ControllerMetrics::new(&config.metrics_registry);
666
667 let txns_metrics = Arc::new(TxnMetrics::new(&config.metrics_registry));
668 let collections_ctl = storage_collections::StorageCollectionsImpl::new(
669 config.persist_location.clone(),
670 Arc::clone(&config.persist_clients),
671 &config.metrics_registry,
672 config.now.clone(),
673 Arc::clone(&txns_metrics),
674 envd_epoch,
675 read_only,
676 config.connection_context.clone(),
677 storage_txn,
678 )
679 .await;
680
681 let collections_ctl: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync> =
682 Arc::new(collections_ctl);
683
684 let storage_controller = mz_storage_controller::Controller::new(
685 config.build_info,
686 config.persist_location.clone(),
687 config.persist_clients,
688 config.now.clone(),
689 wallclock_lag_fn.clone(),
690 Arc::clone(&txns_metrics),
691 read_only,
692 &config.metrics_registry,
693 controller_metrics.clone(),
694 config.connection_context,
695 storage_txn,
696 Arc::clone(&collections_ctl),
697 )
698 .await;
699
700 let storage_collections = Arc::clone(&collections_ctl);
701 let compute_controller = ComputeController::new(
702 config.build_info,
703 storage_collections,
704 read_only,
705 &config.metrics_registry,
706 config.persist_location,
707 controller_metrics,
708 config.now.clone(),
709 wallclock_lag_fn,
710 );
711 let (metrics_tx, metrics_rx) = mpsc::unbounded_channel();
712
713 let this = Self {
714 storage: Box::new(storage_controller),
715 storage_collections: collections_ctl,
716 compute: compute_controller,
717 clusterd_image: config.clusterd_image,
718 init_container_image: config.init_container_image,
719 deploy_generation: config.deploy_generation,
720 read_only,
721 orchestrator: config.orchestrator.namespace("cluster"),
722 readiness: Readiness::NotReady,
723 metrics_tasks: BTreeMap::new(),
724 metrics_tx,
725 metrics_rx,
726 now: config.now,
727 persist_pubsub_url: config.persist_pubsub_url,
728 secrets_args: config.secrets_args,
729 unfulfilled_watch_sets_by_object: BTreeMap::new(),
730 unfulfilled_watch_sets: BTreeMap::new(),
731 watch_set_id_gen: Gen::default(),
732 immediate_watch_sets: Vec::new(),
733 dyncfg: mz_dyncfgs::all_dyncfgs(),
734 };
735
736 if !this.read_only {
737 this.remove_past_generation_replicas_in_background();
738 }
739
740 this
741 }
742}