1use std::collections::btree_map::Entry;
25use std::collections::{BTreeMap, BTreeSet};
26use std::mem;
27use std::num::NonZeroI64;
28use std::sync::Arc;
29use std::time::Duration;
30
31use futures::future::BoxFuture;
32use mz_build_info::BuildInfo;
33use mz_cluster_client::metrics::ControllerMetrics;
34use mz_cluster_client::{ReplicaId, WallclockLagFn};
35use mz_compute_client::controller::{
36 ComputeController, ComputeControllerResponse, ComputeControllerTimestamp, PeekNotification,
37};
38use mz_compute_client::protocol::response::SubscribeBatch;
39use mz_compute_client::service::{ComputeClient, ComputeGrpcClient};
40use mz_controller_types::WatchSetId;
41use mz_dyncfg::{ConfigSet, ConfigUpdates};
42use mz_orchestrator::{NamespacedOrchestrator, Orchestrator, ServiceProcessMetrics};
43use mz_ore::id_gen::Gen;
44use mz_ore::instrument;
45use mz_ore::metrics::MetricsRegistry;
46use mz_ore::now::{EpochMillis, NowFn};
47use mz_ore::task::AbortOnDropHandle;
48use mz_ore::tracing::OpenTelemetryContext;
49use mz_persist_client::PersistLocation;
50use mz_persist_client::cache::PersistClientCache;
51use mz_persist_types::Codec64;
52use mz_proto::RustType;
53use mz_repr::{Datum, GlobalId, Row, TimestampManipulation};
54use mz_service::secrets::SecretsReaderCliArgs;
55use mz_storage_client::client::{
56 ProtoStorageCommand, ProtoStorageResponse, StorageCommand, StorageResponse,
57};
58use mz_storage_client::controller::{
59 IntrospectionType, StorageController, StorageMetadata, StorageTxn,
60};
61use mz_storage_client::storage_collections::{self, StorageCollections};
62use mz_storage_types::configuration::StorageConfiguration;
63use mz_storage_types::connections::ConnectionContext;
64use mz_storage_types::controller::StorageError;
65use mz_txn_wal::metrics::Metrics as TxnMetrics;
66use serde::Serialize;
67use timely::progress::{Antichain, Timestamp};
68use tokio::sync::mpsc;
69use uuid::Uuid;
70
71pub mod clusters;
72
73pub use mz_storage_controller::prepare_initialization;
76
77#[derive(Debug, Clone)]
79pub struct ControllerConfig {
80 pub build_info: &'static BuildInfo,
82 pub orchestrator: Arc<dyn Orchestrator>,
84 pub persist_location: PersistLocation,
86 pub persist_clients: Arc<PersistClientCache>,
90 pub clusterd_image: String,
92 pub init_container_image: Option<String>,
94 pub deploy_generation: u64,
99 pub now: NowFn,
101 pub metrics_registry: MetricsRegistry,
103 pub persist_pubsub_url: String,
105 pub secrets_args: SecretsReaderCliArgs,
107 pub connection_context: ConnectionContext,
109}
110
111#[derive(Debug)]
113pub enum ControllerResponse<T = mz_repr::Timestamp> {
114 PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
120 SubscribeResponse(GlobalId, SubscribeBatch<T>),
122 CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
124 WatchSetFinished(Vec<WatchSetId>),
128}
129
130#[derive(Debug, Default)]
133enum Readiness<T> {
134 #[default]
136 NotReady,
137 Storage,
139 Compute,
141 Metrics((ReplicaId, Vec<ServiceProcessMetrics>)),
143 Internal(ControllerResponse<T>),
145}
146
147pub struct Controller<T: ComputeControllerTimestamp = mz_repr::Timestamp> {
149 pub storage: Box<dyn StorageController<Timestamp = T>>,
150 pub storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
151 pub compute: ComputeController<T>,
152 clusterd_image: String,
154 init_container_image: Option<String>,
156 deploy_generation: u64,
158 read_only: bool,
164 orchestrator: Arc<dyn NamespacedOrchestrator>,
166 readiness: Readiness<T>,
168 metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>,
170 metrics_tx: mpsc::UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>,
172 metrics_rx: mpsc::UnboundedReceiver<(ReplicaId, Vec<ServiceProcessMetrics>)>,
174 now: NowFn,
176
177 persist_pubsub_url: String,
179
180 secrets_args: SecretsReaderCliArgs,
182
183 unfulfilled_watch_sets_by_object: BTreeMap<GlobalId, BTreeSet<WatchSetId>>,
191 unfulfilled_watch_sets: BTreeMap<WatchSetId, (BTreeSet<GlobalId>, T)>,
193 watch_set_id_gen: Gen<WatchSetId>,
195
196 immediate_watch_sets: Vec<WatchSetId>,
202
203 dyncfg: ConfigSet,
205}
206
207impl<T: ComputeControllerTimestamp> Controller<T> {
208 pub fn update_configuration(&mut self, updates: ConfigUpdates) {
210 updates.apply(&self.dyncfg);
211 }
212
213 pub fn start_compute_introspection_sink(&mut self) {
218 self.compute.start_introspection_sink(&*self.storage);
219 }
220
221 pub fn connection_context(&self) -> &ConnectionContext {
225 &self.storage.config().connection_context
226 }
227
228 pub fn storage_configuration(&self) -> &StorageConfiguration {
232 self.storage.config()
233 }
234
235 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
239 let Self {
246 storage_collections: _,
247 storage: _,
248 compute,
249 clusterd_image: _,
250 init_container_image: _,
251 deploy_generation,
252 read_only,
253 orchestrator: _,
254 readiness,
255 metrics_tasks: _,
256 metrics_tx: _,
257 metrics_rx: _,
258 now: _,
259 persist_pubsub_url: _,
260 secrets_args: _,
261 unfulfilled_watch_sets_by_object: _,
262 unfulfilled_watch_sets,
263 watch_set_id_gen: _,
264 immediate_watch_sets,
265 dyncfg: _,
266 } = self;
267
268 let compute = compute.dump().await?;
269
270 let unfulfilled_watch_sets: BTreeMap<_, _> = unfulfilled_watch_sets
271 .iter()
272 .map(|(ws_id, watches)| (format!("{ws_id:?}"), format!("{watches:?}")))
273 .collect();
274 let immediate_watch_sets: Vec<_> = immediate_watch_sets
275 .iter()
276 .map(|watch| format!("{watch:?}"))
277 .collect();
278
279 fn field(
280 key: &str,
281 value: impl Serialize,
282 ) -> Result<(String, serde_json::Value), anyhow::Error> {
283 let value = serde_json::to_value(value)?;
284 Ok((key.to_string(), value))
285 }
286
287 let map = serde_json::Map::from_iter([
288 field("compute", compute)?,
289 field("deploy_generation", deploy_generation)?,
290 field("read_only", read_only)?,
291 field("readiness", format!("{readiness:?}"))?,
292 field("unfulfilled_watch_sets", unfulfilled_watch_sets)?,
293 field("immediate_watch_sets", immediate_watch_sets)?,
294 ]);
295 Ok(serde_json::Value::Object(map))
296 }
297}
298
299impl<T> Controller<T>
300where
301 T: ComputeControllerTimestamp,
302 ComputeGrpcClient: ComputeClient<T>,
303{
304 pub fn update_orchestrator_scheduling_config(
305 &self,
306 config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
307 ) {
308 self.orchestrator.update_scheduling_config(config);
309 }
310 pub fn initialization_complete(&mut self) {
316 self.storage.initialization_complete();
317 self.compute.initialization_complete();
318 }
319
320 pub fn read_only(&self) -> bool {
322 self.read_only
323 }
324
325 fn take_internal_response(&mut self) -> Option<ControllerResponse<T>> {
329 let ws = std::mem::take(&mut self.immediate_watch_sets);
330 (!ws.is_empty()).then_some(ControllerResponse::WatchSetFinished(ws))
331 }
332
333 pub async fn ready(&mut self) {
342 if let Readiness::NotReady = self.readiness {
343 if let Some(response) = self.take_internal_response() {
350 self.readiness = Readiness::Internal(response);
351 } else {
352 tokio::select! {
355 () = self.storage.ready() => {
356 self.readiness = Readiness::Storage;
357 }
358 () = self.compute.ready() => {
359 self.readiness = Readiness::Compute;
360 }
361 Some(metrics) = self.metrics_rx.recv() => {
362 self.readiness = Readiness::Metrics(metrics);
363 }
364 }
365 }
366 }
367 }
368
369 pub fn install_compute_watch_set(
378 &mut self,
379 mut objects: BTreeSet<GlobalId>,
380 t: T,
381 ) -> WatchSetId {
382 let ws_id = self.watch_set_id_gen.allocate_id();
383
384 objects.retain(|id| {
385 let frontier = self
386 .compute
387 .collection_frontiers(*id, None)
388 .map(|f| f.write_frontier)
389 .expect("missing compute dependency");
390 frontier.less_equal(&t)
391 });
392 if objects.is_empty() {
393 self.immediate_watch_sets.push(ws_id);
394 } else {
395 for id in objects.iter() {
396 self.unfulfilled_watch_sets_by_object
397 .entry(*id)
398 .or_default()
399 .insert(ws_id);
400 }
401 self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
402 }
403
404 ws_id
405 }
406
407 pub fn install_storage_watch_set(
416 &mut self,
417 mut objects: BTreeSet<GlobalId>,
418 t: T,
419 ) -> WatchSetId {
420 let ws_id = self.watch_set_id_gen.allocate_id();
421
422 let uppers = self
423 .storage
424 .collections_frontiers(objects.iter().cloned().collect())
425 .expect("missing storage dependencies")
426 .into_iter()
427 .map(|(id, _since, upper)| (id, upper))
428 .collect::<BTreeMap<_, _>>();
429
430 objects.retain(|id| {
431 let upper = uppers.get(id).expect("missing collection");
432 upper.less_equal(&t)
433 });
434 if objects.is_empty() {
435 self.immediate_watch_sets.push(ws_id);
436 } else {
437 for id in objects.iter() {
438 self.unfulfilled_watch_sets_by_object
439 .entry(*id)
440 .or_default()
441 .insert(ws_id);
442 }
443 self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
444 }
445 ws_id
446 }
447
448 pub fn uninstall_watch_set(&mut self, ws_id: &WatchSetId) {
454 if let Some((obj_ids, _)) = self.unfulfilled_watch_sets.remove(ws_id) {
455 for obj_id in obj_ids {
456 let mut entry = match self.unfulfilled_watch_sets_by_object.entry(obj_id) {
457 Entry::Occupied(entry) => entry,
458 Entry::Vacant(_) => panic!("corrupted watchset state"),
459 };
460 entry.get_mut().remove(ws_id);
461 if entry.get().is_empty() {
462 entry.remove();
463 }
464 }
465 }
466 }
467
468 fn process_storage_response(
471 &mut self,
472 storage_metadata: &StorageMetadata,
473 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
474 let maybe_response = self.storage.process(storage_metadata)?;
475 Ok(maybe_response.and_then(
476 |mz_storage_client::controller::Response::FrontierUpdates(r)| {
477 self.handle_frontier_updates(&r)
478 },
479 ))
480 }
481
482 fn process_compute_response(&mut self) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
485 let response = self.compute.process();
486
487 let response = response.and_then(|r| match r {
488 ComputeControllerResponse::PeekNotification(uuid, peek, otel_ctx) => {
489 Some(ControllerResponse::PeekNotification(uuid, peek, otel_ctx))
490 }
491 ComputeControllerResponse::SubscribeResponse(id, tail) => {
492 Some(ControllerResponse::SubscribeResponse(id, tail))
493 }
494 ComputeControllerResponse::CopyToResponse(id, tail) => {
495 Some(ControllerResponse::CopyToResponse(id, tail))
496 }
497 ComputeControllerResponse::FrontierUpper { id, upper } => {
498 self.handle_frontier_updates(&[(id, upper)])
499 }
500 });
501 Ok(response)
502 }
503
504 #[mz_ore::instrument(level = "debug")]
512 pub fn process(
513 &mut self,
514 storage_metadata: &StorageMetadata,
515 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
516 match mem::take(&mut self.readiness) {
517 Readiness::NotReady => Ok(None),
518 Readiness::Storage => self.process_storage_response(storage_metadata),
519 Readiness::Compute => self.process_compute_response(),
520 Readiness::Metrics((id, metrics)) => self.process_replica_metrics(id, metrics),
521 Readiness::Internal(message) => Ok(Some(message)),
522 }
523 }
524
525 fn handle_frontier_updates(
529 &mut self,
530 updates: &[(GlobalId, Antichain<T>)],
531 ) -> Option<ControllerResponse<T>> {
532 let mut finished = vec![];
533 for (obj_id, antichain) in updates {
534 let ws_ids = self.unfulfilled_watch_sets_by_object.entry(*obj_id);
535 if let Entry::Occupied(mut ws_ids) = ws_ids {
536 ws_ids.get_mut().retain(|ws_id| {
537 let mut entry = match self.unfulfilled_watch_sets.entry(*ws_id) {
538 Entry::Occupied(entry) => entry,
539 Entry::Vacant(_) => panic!("corrupted watchset state"),
540 };
541 if !antichain.less_equal(&entry.get().1) {
543 entry.get_mut().0.remove(obj_id);
545 if entry.get().0.is_empty() {
547 entry.remove();
548 finished.push(*ws_id);
549 }
550 false
552 } else {
553 true
555 }
556 });
557 if ws_ids.get().is_empty() {
559 ws_ids.remove();
560 }
561 }
562 }
563 (!(finished.is_empty())).then(|| ControllerResponse::WatchSetFinished(finished))
564 }
565
566 fn process_replica_metrics(
567 &mut self,
568 id: ReplicaId,
569 metrics: Vec<ServiceProcessMetrics>,
570 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
571 self.record_replica_metrics(id, &metrics);
572 Ok(None)
573 }
574
575 fn record_replica_metrics(&mut self, replica_id: ReplicaId, metrics: &[ServiceProcessMetrics]) {
576 if self.read_only() {
577 return;
578 }
579
580 let now = mz_ore::now::to_datetime((self.now)());
581 let now_tz = now.try_into().expect("must fit");
582
583 let replica_id = replica_id.to_string();
584 let mut row = Row::default();
585 let updates = metrics
586 .iter()
587 .zip(0..)
588 .map(|(m, process_id)| {
589 row.packer().extend(&[
590 Datum::String(&replica_id),
591 Datum::UInt64(process_id),
592 m.cpu_nano_cores.into(),
593 m.memory_bytes.into(),
594 m.disk_usage_bytes.into(),
595 Datum::TimestampTz(now_tz),
596 ]);
597 (row.clone(), mz_repr::Diff::ONE)
598 })
599 .collect();
600
601 self.storage
602 .append_introspection_updates(IntrospectionType::ReplicaMetricsHistory, updates);
603 }
604
605 pub async fn determine_real_time_recent_timestamp(
616 &self,
617 ids: BTreeSet<GlobalId>,
618 timeout: Duration,
619 ) -> Result<BoxFuture<'static, Result<T, StorageError<T>>>, StorageError<T>> {
620 self.storage.real_time_recent_timestamp(ids, timeout).await
621 }
622}
623
624impl<T> Controller<T>
625where
626 T: Timestamp
628 + Codec64
629 + From<EpochMillis>
630 + TimestampManipulation
631 + std::fmt::Display
632 + Into<mz_repr::Timestamp>,
633 StorageCommand<T>: RustType<ProtoStorageCommand>,
634 StorageResponse<T>: RustType<ProtoStorageResponse>,
635 ComputeGrpcClient: ComputeClient<T>,
636 T: ComputeControllerTimestamp,
638{
639 #[instrument(name = "controller::new")]
647 pub async fn new(
648 config: ControllerConfig,
649 envd_epoch: NonZeroI64,
650 read_only: bool,
651 storage_txn: &dyn StorageTxn<T>,
652 ) -> Self {
653 if read_only {
654 tracing::info!("starting controllers in read-only mode!");
655 }
656
657 let now_fn = config.now.clone();
658 let wallclock_lag_fn = WallclockLagFn::new(now_fn);
659
660 let controller_metrics = ControllerMetrics::new(&config.metrics_registry);
661
662 let txns_metrics = Arc::new(TxnMetrics::new(&config.metrics_registry));
663 let collections_ctl = storage_collections::StorageCollectionsImpl::new(
664 config.persist_location.clone(),
665 Arc::clone(&config.persist_clients),
666 &config.metrics_registry,
667 config.now.clone(),
668 Arc::clone(&txns_metrics),
669 envd_epoch,
670 read_only,
671 config.connection_context.clone(),
672 storage_txn,
673 )
674 .await;
675
676 let collections_ctl: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync> =
677 Arc::new(collections_ctl);
678
679 let storage_controller = mz_storage_controller::Controller::new(
680 config.build_info,
681 config.persist_location.clone(),
682 config.persist_clients,
683 config.now.clone(),
684 wallclock_lag_fn.clone(),
685 Arc::clone(&txns_metrics),
686 read_only,
687 &config.metrics_registry,
688 controller_metrics.clone(),
689 config.connection_context,
690 storage_txn,
691 Arc::clone(&collections_ctl),
692 )
693 .await;
694
695 let storage_collections = Arc::clone(&collections_ctl);
696 let compute_controller = ComputeController::new(
697 config.build_info,
698 storage_collections,
699 read_only,
700 &config.metrics_registry,
701 config.persist_location,
702 controller_metrics,
703 config.now.clone(),
704 wallclock_lag_fn,
705 );
706 let (metrics_tx, metrics_rx) = mpsc::unbounded_channel();
707
708 let this = Self {
709 storage: Box::new(storage_controller),
710 storage_collections: collections_ctl,
711 compute: compute_controller,
712 clusterd_image: config.clusterd_image,
713 init_container_image: config.init_container_image,
714 deploy_generation: config.deploy_generation,
715 read_only,
716 orchestrator: config.orchestrator.namespace("cluster"),
717 readiness: Readiness::NotReady,
718 metrics_tasks: BTreeMap::new(),
719 metrics_tx,
720 metrics_rx,
721 now: config.now,
722 persist_pubsub_url: config.persist_pubsub_url,
723 secrets_args: config.secrets_args,
724 unfulfilled_watch_sets_by_object: BTreeMap::new(),
725 unfulfilled_watch_sets: BTreeMap::new(),
726 watch_set_id_gen: Gen::default(),
727 immediate_watch_sets: Vec::new(),
728 dyncfg: mz_dyncfgs::all_dyncfgs(),
729 };
730
731 if !this.read_only {
732 this.remove_past_generation_replicas_in_background();
733 }
734
735 this
736 }
737}