1use std::collections::btree_map::Entry;
25use std::collections::{BTreeMap, BTreeSet};
26use std::mem;
27use std::num::NonZeroI64;
28use std::sync::Arc;
29use std::time::Duration;
30
31use futures::future::BoxFuture;
32use mz_build_info::BuildInfo;
33use mz_cluster_client::metrics::ControllerMetrics;
34use mz_cluster_client::{ReplicaId, WallclockLagFn};
35use mz_compute_client::controller::{
36 ComputeController, ComputeControllerResponse, ComputeControllerTimestamp, PeekNotification,
37};
38use mz_compute_client::protocol::response::SubscribeBatch;
39use mz_controller_types::WatchSetId;
40use mz_dyncfg::{ConfigSet, ConfigUpdates};
41use mz_orchestrator::{NamespacedOrchestrator, Orchestrator, ServiceProcessMetrics};
42use mz_ore::cast::CastFrom;
43use mz_ore::id_gen::Gen;
44use mz_ore::instrument;
45use mz_ore::metrics::MetricsRegistry;
46use mz_ore::now::{EpochMillis, NowFn};
47use mz_ore::task::AbortOnDropHandle;
48use mz_ore::tracing::OpenTelemetryContext;
49use mz_persist_client::PersistLocation;
50use mz_persist_client::cache::PersistClientCache;
51use mz_persist_types::Codec64;
52use mz_repr::{Datum, GlobalId, Row, TimestampManipulation};
53use mz_service::secrets::SecretsReaderCliArgs;
54use mz_storage_client::controller::{
55 IntrospectionType, StorageController, StorageMetadata, StorageTxn,
56};
57use mz_storage_client::storage_collections::{self, StorageCollections};
58use mz_storage_types::configuration::StorageConfiguration;
59use mz_storage_types::connections::ConnectionContext;
60use mz_storage_types::controller::StorageError;
61use mz_txn_wal::metrics::Metrics as TxnMetrics;
62use timely::progress::{Antichain, Timestamp};
63use tokio::sync::mpsc;
64use uuid::Uuid;
65
66pub mod clusters;
67
68pub use mz_storage_controller::prepare_initialization;
71
72#[derive(Debug, Clone)]
74pub struct ControllerConfig {
75 pub build_info: &'static BuildInfo,
77 pub orchestrator: Arc<dyn Orchestrator>,
79 pub persist_location: PersistLocation,
81 pub persist_clients: Arc<PersistClientCache>,
85 pub clusterd_image: String,
87 pub init_container_image: Option<String>,
89 pub deploy_generation: u64,
94 pub now: NowFn,
96 pub metrics_registry: MetricsRegistry,
98 pub persist_pubsub_url: String,
100 pub secrets_args: SecretsReaderCliArgs,
102 pub connection_context: ConnectionContext,
104}
105
106#[derive(Debug)]
108pub enum ControllerResponse<T = mz_repr::Timestamp> {
109 PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
115 SubscribeResponse(GlobalId, SubscribeBatch<T>),
117 CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
119 WatchSetFinished(Vec<WatchSetId>),
123}
124
125#[derive(Debug, Default)]
128pub enum Readiness<T> {
129 #[default]
131 NotReady,
132 Storage,
134 Compute,
136 Metrics((ReplicaId, Vec<ServiceProcessMetrics>)),
138 Internal(ControllerResponse<T>),
140}
141
142pub struct Controller<T: ComputeControllerTimestamp = mz_repr::Timestamp> {
144 pub storage: Box<dyn StorageController<Timestamp = T>>,
145 pub storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
146 pub compute: ComputeController<T>,
147 clusterd_image: String,
149 init_container_image: Option<String>,
151 deploy_generation: u64,
153 read_only: bool,
159 orchestrator: Arc<dyn NamespacedOrchestrator>,
161 readiness: Readiness<T>,
163 metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>,
165 metrics_tx: mpsc::UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>,
167 metrics_rx: mpsc::UnboundedReceiver<(ReplicaId, Vec<ServiceProcessMetrics>)>,
169 now: NowFn,
171
172 persist_pubsub_url: String,
174
175 secrets_args: SecretsReaderCliArgs,
177
178 unfulfilled_watch_sets_by_object: BTreeMap<GlobalId, BTreeSet<WatchSetId>>,
186 unfulfilled_watch_sets: BTreeMap<WatchSetId, (BTreeSet<GlobalId>, T)>,
188 watch_set_id_gen: Gen<WatchSetId>,
190
191 immediate_watch_sets: Vec<WatchSetId>,
197
198 dyncfg: ConfigSet,
200}
201
202impl<T: ComputeControllerTimestamp> Controller<T> {
203 pub fn update_configuration(&mut self, updates: ConfigUpdates) {
205 updates.apply(&self.dyncfg);
206 }
207
208 pub fn start_compute_introspection_sink(&mut self) {
213 self.compute.start_introspection_sink(&*self.storage);
214 }
215
216 pub fn connection_context(&self) -> &ConnectionContext {
220 &self.storage.config().connection_context
221 }
222
223 pub fn storage_configuration(&self) -> &StorageConfiguration {
227 self.storage.config()
228 }
229
230 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
234 let Self {
241 storage_collections,
242 storage,
243 compute,
244 clusterd_image: _,
245 init_container_image: _,
246 deploy_generation,
247 read_only,
248 orchestrator: _,
249 readiness,
250 metrics_tasks: _,
251 metrics_tx: _,
252 metrics_rx: _,
253 now: _,
254 persist_pubsub_url: _,
255 secrets_args: _,
256 unfulfilled_watch_sets_by_object: _,
257 unfulfilled_watch_sets,
258 watch_set_id_gen: _,
259 immediate_watch_sets,
260 dyncfg: _,
261 } = self;
262
263 let storage_collections = storage_collections.dump()?;
264 let storage = storage.dump()?;
265 let compute = compute.dump().await?;
266
267 let unfulfilled_watch_sets: BTreeMap<_, _> = unfulfilled_watch_sets
268 .iter()
269 .map(|(ws_id, watches)| (format!("{ws_id:?}"), format!("{watches:?}")))
270 .collect();
271 let immediate_watch_sets: Vec<_> = immediate_watch_sets
272 .iter()
273 .map(|watch| format!("{watch:?}"))
274 .collect();
275
276 Ok(serde_json::json!({
277 "storage_collections": storage_collections,
278 "storage": storage,
279 "compute": compute,
280 "deploy_generation": deploy_generation,
281 "read_only": read_only,
282 "readiness": format!("{readiness:?}"),
283 "unfulfilled_watch_sets": unfulfilled_watch_sets,
284 "immediate_watch_sets": immediate_watch_sets,
285 }))
286 }
287}
288
289impl<T> Controller<T>
290where
291 T: ComputeControllerTimestamp,
292{
293 pub fn update_orchestrator_scheduling_config(
294 &self,
295 config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
296 ) {
297 self.orchestrator.update_scheduling_config(config);
298 }
299 pub fn initialization_complete(&mut self) {
305 self.storage.initialization_complete();
306 self.compute.initialization_complete();
307 }
308
309 pub fn read_only(&self) -> bool {
311 self.read_only
312 }
313
314 fn take_internal_response(&mut self) -> Option<ControllerResponse<T>> {
318 let ws = std::mem::take(&mut self.immediate_watch_sets);
319 (!ws.is_empty()).then_some(ControllerResponse::WatchSetFinished(ws))
320 }
321
322 pub async fn ready(&mut self) {
331 if let Readiness::NotReady = self.readiness {
332 if let Some(response) = self.take_internal_response() {
339 self.readiness = Readiness::Internal(response);
340 } else {
341 tokio::select! {
344 () = self.storage.ready() => {
345 self.readiness = Readiness::Storage;
346 }
347 () = self.compute.ready() => {
348 self.readiness = Readiness::Compute;
349 }
350 Some(metrics) = self.metrics_rx.recv() => {
351 self.readiness = Readiness::Metrics(metrics);
352 }
353 }
354 }
355 }
356 }
357
358 pub fn get_readiness(&self) -> &Readiness<T> {
360 &self.readiness
361 }
362
363 pub fn install_compute_watch_set(
372 &mut self,
373 mut objects: BTreeSet<GlobalId>,
374 t: T,
375 ) -> WatchSetId {
376 let ws_id = self.watch_set_id_gen.allocate_id();
377
378 objects.retain(|id| {
379 let frontier = self
380 .compute
381 .collection_frontiers(*id, None)
382 .map(|f| f.write_frontier)
383 .expect("missing compute dependency");
384 frontier.less_equal(&t)
385 });
386 if objects.is_empty() {
387 self.immediate_watch_sets.push(ws_id);
388 } else {
389 for id in objects.iter() {
390 self.unfulfilled_watch_sets_by_object
391 .entry(*id)
392 .or_default()
393 .insert(ws_id);
394 }
395 self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
396 }
397
398 ws_id
399 }
400
401 pub fn install_storage_watch_set(
410 &mut self,
411 mut objects: BTreeSet<GlobalId>,
412 t: T,
413 ) -> WatchSetId {
414 let ws_id = self.watch_set_id_gen.allocate_id();
415
416 let uppers = self
417 .storage
418 .collections_frontiers(objects.iter().cloned().collect())
419 .expect("missing storage dependencies")
420 .into_iter()
421 .map(|(id, _since, upper)| (id, upper))
422 .collect::<BTreeMap<_, _>>();
423
424 objects.retain(|id| {
425 let upper = uppers.get(id).expect("missing collection");
426 upper.less_equal(&t)
427 });
428 if objects.is_empty() {
429 self.immediate_watch_sets.push(ws_id);
430 } else {
431 for id in objects.iter() {
432 self.unfulfilled_watch_sets_by_object
433 .entry(*id)
434 .or_default()
435 .insert(ws_id);
436 }
437 self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
438 }
439 ws_id
440 }
441
442 pub fn uninstall_watch_set(&mut self, ws_id: &WatchSetId) {
448 if let Some((obj_ids, _)) = self.unfulfilled_watch_sets.remove(ws_id) {
449 for obj_id in obj_ids {
450 let mut entry = match self.unfulfilled_watch_sets_by_object.entry(obj_id) {
451 Entry::Occupied(entry) => entry,
452 Entry::Vacant(_) => panic!("corrupted watchset state"),
453 };
454 entry.get_mut().remove(ws_id);
455 if entry.get().is_empty() {
456 entry.remove();
457 }
458 }
459 }
460 }
461
462 fn process_storage_response(
465 &mut self,
466 storage_metadata: &StorageMetadata,
467 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
468 let maybe_response = self.storage.process(storage_metadata)?;
469 Ok(maybe_response.and_then(
470 |mz_storage_client::controller::Response::FrontierUpdates(r)| {
471 self.handle_frontier_updates(&r)
472 },
473 ))
474 }
475
476 fn process_compute_response(&mut self) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
479 let response = self.compute.process();
480
481 let response = response.and_then(|r| match r {
482 ComputeControllerResponse::PeekNotification(uuid, peek, otel_ctx) => {
483 Some(ControllerResponse::PeekNotification(uuid, peek, otel_ctx))
484 }
485 ComputeControllerResponse::SubscribeResponse(id, tail) => {
486 Some(ControllerResponse::SubscribeResponse(id, tail))
487 }
488 ComputeControllerResponse::CopyToResponse(id, tail) => {
489 Some(ControllerResponse::CopyToResponse(id, tail))
490 }
491 ComputeControllerResponse::FrontierUpper { id, upper } => {
492 self.handle_frontier_updates(&[(id, upper)])
493 }
494 });
495 Ok(response)
496 }
497
498 #[mz_ore::instrument(level = "debug")]
506 pub fn process(
507 &mut self,
508 storage_metadata: &StorageMetadata,
509 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
510 match mem::take(&mut self.readiness) {
511 Readiness::NotReady => Ok(None),
512 Readiness::Storage => self.process_storage_response(storage_metadata),
513 Readiness::Compute => self.process_compute_response(),
514 Readiness::Metrics((id, metrics)) => self.process_replica_metrics(id, metrics),
515 Readiness::Internal(message) => Ok(Some(message)),
516 }
517 }
518
519 fn handle_frontier_updates(
523 &mut self,
524 updates: &[(GlobalId, Antichain<T>)],
525 ) -> Option<ControllerResponse<T>> {
526 let mut finished = vec![];
527 for (obj_id, antichain) in updates {
528 let ws_ids = self.unfulfilled_watch_sets_by_object.entry(*obj_id);
529 if let Entry::Occupied(mut ws_ids) = ws_ids {
530 ws_ids.get_mut().retain(|ws_id| {
531 let mut entry = match self.unfulfilled_watch_sets.entry(*ws_id) {
532 Entry::Occupied(entry) => entry,
533 Entry::Vacant(_) => panic!("corrupted watchset state"),
534 };
535 if !antichain.less_equal(&entry.get().1) {
537 entry.get_mut().0.remove(obj_id);
539 if entry.get().0.is_empty() {
541 entry.remove();
542 finished.push(*ws_id);
543 }
544 false
546 } else {
547 true
549 }
550 });
551 if ws_ids.get().is_empty() {
553 ws_ids.remove();
554 }
555 }
556 }
557 (!(finished.is_empty())).then(|| ControllerResponse::WatchSetFinished(finished))
558 }
559
560 fn process_replica_metrics(
561 &mut self,
562 id: ReplicaId,
563 metrics: Vec<ServiceProcessMetrics>,
564 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
565 self.record_replica_metrics(id, &metrics);
566 Ok(None)
567 }
568
569 fn record_replica_metrics(&mut self, replica_id: ReplicaId, metrics: &[ServiceProcessMetrics]) {
570 if self.read_only() {
571 return;
572 }
573
574 let now = mz_ore::now::to_datetime((self.now)());
575 let now_tz = now.try_into().expect("must fit");
576
577 let replica_id = replica_id.to_string();
578 let mut row = Row::default();
579 let updates = metrics
580 .iter()
581 .enumerate()
582 .map(|(process_id, m)| {
583 row.packer().extend(&[
584 Datum::String(&replica_id),
585 Datum::UInt64(u64::cast_from(process_id)),
586 m.cpu_nano_cores.into(),
587 m.memory_bytes.into(),
588 m.disk_bytes.into(),
589 Datum::TimestampTz(now_tz),
590 m.heap_bytes.into(),
591 m.heap_limit.into(),
592 ]);
593 (row.clone(), mz_repr::Diff::ONE)
594 })
595 .collect();
596
597 self.storage
598 .append_introspection_updates(IntrospectionType::ReplicaMetricsHistory, updates);
599 }
600
601 pub async fn determine_real_time_recent_timestamp(
612 &self,
613 ids: BTreeSet<GlobalId>,
614 timeout: Duration,
615 ) -> Result<BoxFuture<'static, Result<T, StorageError<T>>>, StorageError<T>> {
616 self.storage.real_time_recent_timestamp(ids, timeout).await
617 }
618}
619
620impl<T> Controller<T>
621where
622 T: Timestamp
624 + Codec64
625 + From<EpochMillis>
626 + TimestampManipulation
627 + std::fmt::Display
628 + Into<mz_repr::Timestamp>,
629 T: ComputeControllerTimestamp,
631{
632 #[instrument(name = "controller::new")]
640 pub async fn new(
641 config: ControllerConfig,
642 envd_epoch: NonZeroI64,
643 read_only: bool,
644 storage_txn: &dyn StorageTxn<T>,
645 ) -> Self {
646 if read_only {
647 tracing::info!("starting controllers in read-only mode!");
648 }
649
650 let now_fn = config.now.clone();
651 let wallclock_lag_fn = WallclockLagFn::new(now_fn);
652
653 let controller_metrics = ControllerMetrics::new(&config.metrics_registry);
654
655 let txns_metrics = Arc::new(TxnMetrics::new(&config.metrics_registry));
656 let collections_ctl = storage_collections::StorageCollectionsImpl::new(
657 config.persist_location.clone(),
658 Arc::clone(&config.persist_clients),
659 &config.metrics_registry,
660 config.now.clone(),
661 Arc::clone(&txns_metrics),
662 envd_epoch,
663 read_only,
664 config.connection_context.clone(),
665 storage_txn,
666 )
667 .await;
668
669 let collections_ctl: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync> =
670 Arc::new(collections_ctl);
671
672 let storage_controller = mz_storage_controller::Controller::new(
673 config.build_info,
674 config.persist_location.clone(),
675 config.persist_clients,
676 config.now.clone(),
677 wallclock_lag_fn.clone(),
678 Arc::clone(&txns_metrics),
679 read_only,
680 &config.metrics_registry,
681 controller_metrics.clone(),
682 config.connection_context,
683 storage_txn,
684 Arc::clone(&collections_ctl),
685 )
686 .await;
687
688 let storage_collections = Arc::clone(&collections_ctl);
689 let compute_controller = ComputeController::new(
690 config.build_info,
691 storage_collections,
692 read_only,
693 &config.metrics_registry,
694 config.persist_location,
695 controller_metrics,
696 config.now.clone(),
697 wallclock_lag_fn,
698 );
699 let (metrics_tx, metrics_rx) = mpsc::unbounded_channel();
700
701 let this = Self {
702 storage: Box::new(storage_controller),
703 storage_collections: collections_ctl,
704 compute: compute_controller,
705 clusterd_image: config.clusterd_image,
706 init_container_image: config.init_container_image,
707 deploy_generation: config.deploy_generation,
708 read_only,
709 orchestrator: config.orchestrator.namespace("cluster"),
710 readiness: Readiness::NotReady,
711 metrics_tasks: BTreeMap::new(),
712 metrics_tx,
713 metrics_rx,
714 now: config.now,
715 persist_pubsub_url: config.persist_pubsub_url,
716 secrets_args: config.secrets_args,
717 unfulfilled_watch_sets_by_object: BTreeMap::new(),
718 unfulfilled_watch_sets: BTreeMap::new(),
719 watch_set_id_gen: Gen::default(),
720 immediate_watch_sets: Vec::new(),
721 dyncfg: mz_dyncfgs::all_dyncfgs(),
722 };
723
724 if !this.read_only {
725 this.remove_past_generation_replicas_in_background();
726 }
727
728 this
729 }
730}