1use std::collections::btree_map::Entry;
25use std::collections::{BTreeMap, BTreeSet};
26use std::mem;
27use std::num::NonZeroI64;
28use std::sync::Arc;
29use std::time::Duration;
30
31use futures::future::BoxFuture;
32use mz_build_info::BuildInfo;
33use mz_cluster_client::metrics::ControllerMetrics;
34use mz_cluster_client::{ReplicaId, WallclockLagFn};
35use mz_compute_client::controller::{
36 ComputeController, ComputeControllerResponse, ComputeControllerTimestamp, PeekNotification,
37};
38use mz_compute_client::protocol::response::SubscribeBatch;
39use mz_controller_types::WatchSetId;
40use mz_dyncfg::{ConfigSet, ConfigUpdates};
41use mz_orchestrator::{NamespacedOrchestrator, Orchestrator, ServiceProcessMetrics};
42use mz_ore::cast::CastFrom;
43use mz_ore::id_gen::Gen;
44use mz_ore::instrument;
45use mz_ore::metrics::MetricsRegistry;
46use mz_ore::now::{EpochMillis, NowFn};
47use mz_ore::task::AbortOnDropHandle;
48use mz_ore::tracing::OpenTelemetryContext;
49use mz_persist_client::PersistLocation;
50use mz_persist_client::cache::PersistClientCache;
51use mz_persist_types::Codec64;
52use mz_repr::{Datum, GlobalId, Row, TimestampManipulation};
53use mz_service::secrets::SecretsReaderCliArgs;
54use mz_storage_client::controller::{
55 IntrospectionType, StorageController, StorageMetadata, StorageTxn,
56};
57use mz_storage_client::storage_collections::{self, StorageCollections};
58use mz_storage_types::configuration::StorageConfiguration;
59use mz_storage_types::connections::ConnectionContext;
60use mz_storage_types::controller::StorageError;
61use mz_txn_wal::metrics::Metrics as TxnMetrics;
62use serde::Serialize;
63use timely::progress::{Antichain, Timestamp};
64use tokio::sync::mpsc;
65use uuid::Uuid;
66
67pub mod clusters;
68
69pub use mz_storage_controller::prepare_initialization;
72
73#[derive(Debug, Clone)]
75pub struct ControllerConfig {
76 pub build_info: &'static BuildInfo,
78 pub orchestrator: Arc<dyn Orchestrator>,
80 pub persist_location: PersistLocation,
82 pub persist_clients: Arc<PersistClientCache>,
86 pub clusterd_image: String,
88 pub init_container_image: Option<String>,
90 pub deploy_generation: u64,
95 pub now: NowFn,
97 pub metrics_registry: MetricsRegistry,
99 pub persist_pubsub_url: String,
101 pub secrets_args: SecretsReaderCliArgs,
103 pub connection_context: ConnectionContext,
105}
106
107#[derive(Debug)]
109pub enum ControllerResponse<T = mz_repr::Timestamp> {
110 PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
116 SubscribeResponse(GlobalId, SubscribeBatch<T>),
118 CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
120 WatchSetFinished(Vec<WatchSetId>),
124}
125
126#[derive(Debug, Default)]
129pub enum Readiness<T> {
130 #[default]
132 NotReady,
133 Storage,
135 Compute,
137 Metrics((ReplicaId, Vec<ServiceProcessMetrics>)),
139 Internal(ControllerResponse<T>),
141}
142
143pub struct Controller<T: ComputeControllerTimestamp = mz_repr::Timestamp> {
145 pub storage: Box<dyn StorageController<Timestamp = T>>,
146 pub storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
147 pub compute: ComputeController<T>,
148 clusterd_image: String,
150 init_container_image: Option<String>,
152 deploy_generation: u64,
154 read_only: bool,
160 orchestrator: Arc<dyn NamespacedOrchestrator>,
162 readiness: Readiness<T>,
164 metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>,
166 metrics_tx: mpsc::UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>,
168 metrics_rx: mpsc::UnboundedReceiver<(ReplicaId, Vec<ServiceProcessMetrics>)>,
170 now: NowFn,
172
173 persist_pubsub_url: String,
175
176 secrets_args: SecretsReaderCliArgs,
178
179 unfulfilled_watch_sets_by_object: BTreeMap<GlobalId, BTreeSet<WatchSetId>>,
187 unfulfilled_watch_sets: BTreeMap<WatchSetId, (BTreeSet<GlobalId>, T)>,
189 watch_set_id_gen: Gen<WatchSetId>,
191
192 immediate_watch_sets: Vec<WatchSetId>,
198
199 dyncfg: ConfigSet,
201}
202
203impl<T: ComputeControllerTimestamp> Controller<T> {
204 pub fn update_configuration(&mut self, updates: ConfigUpdates) {
206 updates.apply(&self.dyncfg);
207 }
208
209 pub fn start_compute_introspection_sink(&mut self) {
214 self.compute.start_introspection_sink(&*self.storage);
215 }
216
217 pub fn connection_context(&self) -> &ConnectionContext {
221 &self.storage.config().connection_context
222 }
223
224 pub fn storage_configuration(&self) -> &StorageConfiguration {
228 self.storage.config()
229 }
230
231 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
235 let Self {
242 storage_collections: _,
243 storage: _,
244 compute,
245 clusterd_image: _,
246 init_container_image: _,
247 deploy_generation,
248 read_only,
249 orchestrator: _,
250 readiness,
251 metrics_tasks: _,
252 metrics_tx: _,
253 metrics_rx: _,
254 now: _,
255 persist_pubsub_url: _,
256 secrets_args: _,
257 unfulfilled_watch_sets_by_object: _,
258 unfulfilled_watch_sets,
259 watch_set_id_gen: _,
260 immediate_watch_sets,
261 dyncfg: _,
262 } = self;
263
264 let compute = compute.dump().await?;
265
266 let unfulfilled_watch_sets: BTreeMap<_, _> = unfulfilled_watch_sets
267 .iter()
268 .map(|(ws_id, watches)| (format!("{ws_id:?}"), format!("{watches:?}")))
269 .collect();
270 let immediate_watch_sets: Vec<_> = immediate_watch_sets
271 .iter()
272 .map(|watch| format!("{watch:?}"))
273 .collect();
274
275 fn field(
276 key: &str,
277 value: impl Serialize,
278 ) -> Result<(String, serde_json::Value), anyhow::Error> {
279 let value = serde_json::to_value(value)?;
280 Ok((key.to_string(), value))
281 }
282
283 let map = serde_json::Map::from_iter([
284 field("compute", compute)?,
285 field("deploy_generation", deploy_generation)?,
286 field("read_only", read_only)?,
287 field("readiness", format!("{readiness:?}"))?,
288 field("unfulfilled_watch_sets", unfulfilled_watch_sets)?,
289 field("immediate_watch_sets", immediate_watch_sets)?,
290 ]);
291 Ok(serde_json::Value::Object(map))
292 }
293}
294
295impl<T> Controller<T>
296where
297 T: ComputeControllerTimestamp,
298{
299 pub fn update_orchestrator_scheduling_config(
300 &self,
301 config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
302 ) {
303 self.orchestrator.update_scheduling_config(config);
304 }
305 pub fn initialization_complete(&mut self) {
311 self.storage.initialization_complete();
312 self.compute.initialization_complete();
313 }
314
315 pub fn read_only(&self) -> bool {
317 self.read_only
318 }
319
320 fn take_internal_response(&mut self) -> Option<ControllerResponse<T>> {
324 let ws = std::mem::take(&mut self.immediate_watch_sets);
325 (!ws.is_empty()).then_some(ControllerResponse::WatchSetFinished(ws))
326 }
327
328 pub async fn ready(&mut self) {
337 if let Readiness::NotReady = self.readiness {
338 if let Some(response) = self.take_internal_response() {
345 self.readiness = Readiness::Internal(response);
346 } else {
347 tokio::select! {
350 () = self.storage.ready() => {
351 self.readiness = Readiness::Storage;
352 }
353 () = self.compute.ready() => {
354 self.readiness = Readiness::Compute;
355 }
356 Some(metrics) = self.metrics_rx.recv() => {
357 self.readiness = Readiness::Metrics(metrics);
358 }
359 }
360 }
361 }
362 }
363
364 pub fn get_readiness(&self) -> &Readiness<T> {
366 &self.readiness
367 }
368
369 pub fn install_compute_watch_set(
378 &mut self,
379 mut objects: BTreeSet<GlobalId>,
380 t: T,
381 ) -> WatchSetId {
382 let ws_id = self.watch_set_id_gen.allocate_id();
383
384 objects.retain(|id| {
385 let frontier = self
386 .compute
387 .collection_frontiers(*id, None)
388 .map(|f| f.write_frontier)
389 .expect("missing compute dependency");
390 frontier.less_equal(&t)
391 });
392 if objects.is_empty() {
393 self.immediate_watch_sets.push(ws_id);
394 } else {
395 for id in objects.iter() {
396 self.unfulfilled_watch_sets_by_object
397 .entry(*id)
398 .or_default()
399 .insert(ws_id);
400 }
401 self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
402 }
403
404 ws_id
405 }
406
407 pub fn install_storage_watch_set(
416 &mut self,
417 mut objects: BTreeSet<GlobalId>,
418 t: T,
419 ) -> WatchSetId {
420 let ws_id = self.watch_set_id_gen.allocate_id();
421
422 let uppers = self
423 .storage
424 .collections_frontiers(objects.iter().cloned().collect())
425 .expect("missing storage dependencies")
426 .into_iter()
427 .map(|(id, _since, upper)| (id, upper))
428 .collect::<BTreeMap<_, _>>();
429
430 objects.retain(|id| {
431 let upper = uppers.get(id).expect("missing collection");
432 upper.less_equal(&t)
433 });
434 if objects.is_empty() {
435 self.immediate_watch_sets.push(ws_id);
436 } else {
437 for id in objects.iter() {
438 self.unfulfilled_watch_sets_by_object
439 .entry(*id)
440 .or_default()
441 .insert(ws_id);
442 }
443 self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
444 }
445 ws_id
446 }
447
448 pub fn uninstall_watch_set(&mut self, ws_id: &WatchSetId) {
454 if let Some((obj_ids, _)) = self.unfulfilled_watch_sets.remove(ws_id) {
455 for obj_id in obj_ids {
456 let mut entry = match self.unfulfilled_watch_sets_by_object.entry(obj_id) {
457 Entry::Occupied(entry) => entry,
458 Entry::Vacant(_) => panic!("corrupted watchset state"),
459 };
460 entry.get_mut().remove(ws_id);
461 if entry.get().is_empty() {
462 entry.remove();
463 }
464 }
465 }
466 }
467
468 fn process_storage_response(
471 &mut self,
472 storage_metadata: &StorageMetadata,
473 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
474 let maybe_response = self.storage.process(storage_metadata)?;
475 Ok(maybe_response.and_then(
476 |mz_storage_client::controller::Response::FrontierUpdates(r)| {
477 self.handle_frontier_updates(&r)
478 },
479 ))
480 }
481
482 fn process_compute_response(&mut self) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
485 let response = self.compute.process();
486
487 let response = response.and_then(|r| match r {
488 ComputeControllerResponse::PeekNotification(uuid, peek, otel_ctx) => {
489 Some(ControllerResponse::PeekNotification(uuid, peek, otel_ctx))
490 }
491 ComputeControllerResponse::SubscribeResponse(id, tail) => {
492 Some(ControllerResponse::SubscribeResponse(id, tail))
493 }
494 ComputeControllerResponse::CopyToResponse(id, tail) => {
495 Some(ControllerResponse::CopyToResponse(id, tail))
496 }
497 ComputeControllerResponse::FrontierUpper { id, upper } => {
498 self.handle_frontier_updates(&[(id, upper)])
499 }
500 });
501 Ok(response)
502 }
503
504 #[mz_ore::instrument(level = "debug")]
512 pub fn process(
513 &mut self,
514 storage_metadata: &StorageMetadata,
515 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
516 match mem::take(&mut self.readiness) {
517 Readiness::NotReady => Ok(None),
518 Readiness::Storage => self.process_storage_response(storage_metadata),
519 Readiness::Compute => self.process_compute_response(),
520 Readiness::Metrics((id, metrics)) => self.process_replica_metrics(id, metrics),
521 Readiness::Internal(message) => Ok(Some(message)),
522 }
523 }
524
525 fn handle_frontier_updates(
529 &mut self,
530 updates: &[(GlobalId, Antichain<T>)],
531 ) -> Option<ControllerResponse<T>> {
532 let mut finished = vec![];
533 for (obj_id, antichain) in updates {
534 let ws_ids = self.unfulfilled_watch_sets_by_object.entry(*obj_id);
535 if let Entry::Occupied(mut ws_ids) = ws_ids {
536 ws_ids.get_mut().retain(|ws_id| {
537 let mut entry = match self.unfulfilled_watch_sets.entry(*ws_id) {
538 Entry::Occupied(entry) => entry,
539 Entry::Vacant(_) => panic!("corrupted watchset state"),
540 };
541 if !antichain.less_equal(&entry.get().1) {
543 entry.get_mut().0.remove(obj_id);
545 if entry.get().0.is_empty() {
547 entry.remove();
548 finished.push(*ws_id);
549 }
550 false
552 } else {
553 true
555 }
556 });
557 if ws_ids.get().is_empty() {
559 ws_ids.remove();
560 }
561 }
562 }
563 (!(finished.is_empty())).then(|| ControllerResponse::WatchSetFinished(finished))
564 }
565
566 fn process_replica_metrics(
567 &mut self,
568 id: ReplicaId,
569 metrics: Vec<ServiceProcessMetrics>,
570 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
571 self.record_replica_metrics(id, &metrics);
572 Ok(None)
573 }
574
575 fn record_replica_metrics(&mut self, replica_id: ReplicaId, metrics: &[ServiceProcessMetrics]) {
576 if self.read_only() {
577 return;
578 }
579
580 let now = mz_ore::now::to_datetime((self.now)());
581 let now_tz = now.try_into().expect("must fit");
582
583 let replica_id = replica_id.to_string();
584 let mut row = Row::default();
585 let updates = metrics
586 .iter()
587 .enumerate()
588 .map(|(process_id, m)| {
589 row.packer().extend(&[
590 Datum::String(&replica_id),
591 Datum::UInt64(u64::cast_from(process_id)),
592 m.cpu_nano_cores.into(),
593 m.memory_bytes.into(),
594 m.disk_usage_bytes.into(),
595 Datum::TimestampTz(now_tz),
596 ]);
597 (row.clone(), mz_repr::Diff::ONE)
598 })
599 .collect();
600
601 self.storage
602 .append_introspection_updates(IntrospectionType::ReplicaMetricsHistory, updates);
603 }
604
605 pub async fn determine_real_time_recent_timestamp(
616 &self,
617 ids: BTreeSet<GlobalId>,
618 timeout: Duration,
619 ) -> Result<BoxFuture<'static, Result<T, StorageError<T>>>, StorageError<T>> {
620 self.storage.real_time_recent_timestamp(ids, timeout).await
621 }
622}
623
624impl<T> Controller<T>
625where
626 T: Timestamp
628 + Codec64
629 + From<EpochMillis>
630 + TimestampManipulation
631 + std::fmt::Display
632 + Into<mz_repr::Timestamp>,
633 T: ComputeControllerTimestamp,
635{
636 #[instrument(name = "controller::new")]
644 pub async fn new(
645 config: ControllerConfig,
646 envd_epoch: NonZeroI64,
647 read_only: bool,
648 storage_txn: &dyn StorageTxn<T>,
649 ) -> Self {
650 if read_only {
651 tracing::info!("starting controllers in read-only mode!");
652 }
653
654 let now_fn = config.now.clone();
655 let wallclock_lag_fn = WallclockLagFn::new(now_fn);
656
657 let controller_metrics = ControllerMetrics::new(&config.metrics_registry);
658
659 let txns_metrics = Arc::new(TxnMetrics::new(&config.metrics_registry));
660 let collections_ctl = storage_collections::StorageCollectionsImpl::new(
661 config.persist_location.clone(),
662 Arc::clone(&config.persist_clients),
663 &config.metrics_registry,
664 config.now.clone(),
665 Arc::clone(&txns_metrics),
666 envd_epoch,
667 read_only,
668 config.connection_context.clone(),
669 storage_txn,
670 )
671 .await;
672
673 let collections_ctl: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync> =
674 Arc::new(collections_ctl);
675
676 let storage_controller = mz_storage_controller::Controller::new(
677 config.build_info,
678 config.persist_location.clone(),
679 config.persist_clients,
680 config.now.clone(),
681 wallclock_lag_fn.clone(),
682 Arc::clone(&txns_metrics),
683 read_only,
684 &config.metrics_registry,
685 controller_metrics.clone(),
686 config.connection_context,
687 storage_txn,
688 Arc::clone(&collections_ctl),
689 )
690 .await;
691
692 let storage_collections = Arc::clone(&collections_ctl);
693 let compute_controller = ComputeController::new(
694 config.build_info,
695 storage_collections,
696 read_only,
697 &config.metrics_registry,
698 config.persist_location,
699 controller_metrics,
700 config.now.clone(),
701 wallclock_lag_fn,
702 );
703 let (metrics_tx, metrics_rx) = mpsc::unbounded_channel();
704
705 let this = Self {
706 storage: Box::new(storage_controller),
707 storage_collections: collections_ctl,
708 compute: compute_controller,
709 clusterd_image: config.clusterd_image,
710 init_container_image: config.init_container_image,
711 deploy_generation: config.deploy_generation,
712 read_only,
713 orchestrator: config.orchestrator.namespace("cluster"),
714 readiness: Readiness::NotReady,
715 metrics_tasks: BTreeMap::new(),
716 metrics_tx,
717 metrics_rx,
718 now: config.now,
719 persist_pubsub_url: config.persist_pubsub_url,
720 secrets_args: config.secrets_args,
721 unfulfilled_watch_sets_by_object: BTreeMap::new(),
722 unfulfilled_watch_sets: BTreeMap::new(),
723 watch_set_id_gen: Gen::default(),
724 immediate_watch_sets: Vec::new(),
725 dyncfg: mz_dyncfgs::all_dyncfgs(),
726 };
727
728 if !this.read_only {
729 this.remove_past_generation_replicas_in_background();
730 }
731
732 this
733 }
734}