1use std::collections::btree_map::Entry;
25use std::collections::{BTreeMap, BTreeSet};
26use std::mem;
27use std::num::NonZeroI64;
28use std::sync::Arc;
29use std::time::Duration;
30
31use futures::future::BoxFuture;
32use mz_build_info::BuildInfo;
33use mz_cluster_client::metrics::ControllerMetrics;
34use mz_cluster_client::{ReplicaId, WallclockLagFn};
35use mz_compute_client::controller::error::{CollectionLookupError, CollectionMissing};
36use mz_compute_client::controller::{
37 ComputeController, ComputeControllerResponse, PeekNotification,
38};
39use mz_compute_client::protocol::response::SubscribeBatch;
40use mz_controller_types::WatchSetId;
41use mz_dyncfg::{ConfigSet, ConfigUpdates};
42use mz_orchestrator::{NamespacedOrchestrator, Orchestrator, ServiceProcessMetrics};
43use mz_ore::cast::CastFrom;
44use mz_ore::id_gen::Gen;
45use mz_ore::instrument;
46use mz_ore::metrics::MetricsRegistry;
47use mz_ore::now::NowFn;
48use mz_ore::task::AbortOnDropHandle;
49use mz_ore::tracing::OpenTelemetryContext;
50use mz_persist_client::PersistLocation;
51use mz_persist_client::cache::PersistClientCache;
52use mz_repr::{Datum, GlobalId, Row, Timestamp};
53use mz_service::secrets::SecretsReaderCliArgs;
54use mz_storage_client::controller::{
55 IntrospectionType, StorageController, StorageMetadata, StorageTxn,
56};
57use mz_storage_client::storage_collections::{self, StorageCollections};
58use mz_storage_types::configuration::StorageConfiguration;
59use mz_storage_types::connections::ConnectionContext;
60use mz_storage_types::controller::StorageError;
61use mz_txn_wal::metrics::Metrics as TxnMetrics;
62use timely::progress::Antichain;
63use tokio::sync::mpsc;
64use uuid::Uuid;
65
66pub mod clusters;
67pub mod replica_http_locator;
68
69pub use mz_storage_controller::prepare_initialization;
72pub use replica_http_locator::ReplicaHttpLocator;
73
74#[derive(Debug, Clone)]
76pub struct ControllerConfig {
77 pub build_info: &'static BuildInfo,
79 pub orchestrator: Arc<dyn Orchestrator>,
81 pub persist_location: PersistLocation,
83 pub persist_clients: Arc<PersistClientCache>,
87 pub clusterd_image: String,
89 pub init_container_image: Option<String>,
91 pub deploy_generation: u64,
96 pub now: NowFn,
98 pub metrics_registry: MetricsRegistry,
100 pub persist_pubsub_url: String,
102 pub secrets_args: SecretsReaderCliArgs,
104 pub connection_context: ConnectionContext,
106 pub replica_http_locator: Arc<ReplicaHttpLocator>,
109}
110
111#[derive(Debug)]
113pub enum ControllerResponse {
114 PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
120 SubscribeResponse(GlobalId, SubscribeBatch),
122 CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
124 WatchSetFinished(Vec<WatchSetId>),
128}
129
130#[derive(Debug, Default)]
133pub enum Readiness {
134 #[default]
136 NotReady,
137 Storage,
139 Compute,
141 Metrics((ReplicaId, Vec<ServiceProcessMetrics>)),
143 Internal(ControllerResponse),
145}
146
147pub struct Controller {
149 pub storage: Box<dyn StorageController>,
150 pub storage_collections: Arc<dyn StorageCollections + Send + Sync>,
151 pub compute: ComputeController,
152 clusterd_image: String,
154 init_container_image: Option<String>,
156 deploy_generation: u64,
158 read_only: bool,
164 orchestrator: Arc<dyn NamespacedOrchestrator>,
166 readiness: Readiness,
168 metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>,
170 metrics_tx: mpsc::UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>,
172 metrics_rx: mpsc::UnboundedReceiver<(ReplicaId, Vec<ServiceProcessMetrics>)>,
174 now: NowFn,
176
177 persist_pubsub_url: String,
179
180 secrets_args: SecretsReaderCliArgs,
182
183 unfulfilled_watch_sets_by_object: BTreeMap<GlobalId, BTreeSet<WatchSetId>>,
191 unfulfilled_watch_sets: BTreeMap<WatchSetId, (BTreeSet<GlobalId>, Timestamp)>,
193 watch_set_id_gen: Gen<WatchSetId>,
195
196 immediate_watch_sets: Vec<WatchSetId>,
202
203 dyncfg: ConfigSet,
205
206 replica_http_locator: Arc<ReplicaHttpLocator>,
208}
209
210impl Controller {
211 pub fn update_configuration(&mut self, updates: ConfigUpdates) {
213 updates.apply(&self.dyncfg);
214 }
215
216 pub fn start_compute_introspection_sink(&mut self) {
221 self.compute.start_introspection_sink(&*self.storage);
222 }
223
224 pub fn connection_context(&self) -> &ConnectionContext {
228 &self.storage.config().connection_context
229 }
230
231 pub fn storage_configuration(&self) -> &StorageConfiguration {
235 self.storage.config()
236 }
237
238 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
242 let Self {
249 storage_collections,
250 storage,
251 compute,
252 clusterd_image: _,
253 init_container_image: _,
254 deploy_generation,
255 read_only,
256 orchestrator: _,
257 readiness,
258 metrics_tasks: _,
259 metrics_tx: _,
260 metrics_rx: _,
261 now: _,
262 persist_pubsub_url: _,
263 secrets_args: _,
264 unfulfilled_watch_sets_by_object: _,
265 unfulfilled_watch_sets,
266 watch_set_id_gen: _,
267 immediate_watch_sets,
268 dyncfg: _,
269 replica_http_locator: _,
270 } = self;
271
272 let storage_collections = storage_collections.dump()?;
273 let storage = storage.dump()?;
274 let compute = compute.dump().await?;
275
276 let unfulfilled_watch_sets: BTreeMap<_, _> = unfulfilled_watch_sets
277 .iter()
278 .map(|(ws_id, watches)| (format!("{ws_id:?}"), format!("{watches:?}")))
279 .collect();
280 let immediate_watch_sets: Vec<_> = immediate_watch_sets
281 .iter()
282 .map(|watch| format!("{watch:?}"))
283 .collect();
284
285 Ok(serde_json::json!({
286 "storage_collections": storage_collections,
287 "storage": storage,
288 "compute": compute,
289 "deploy_generation": deploy_generation,
290 "read_only": read_only,
291 "readiness": format!("{readiness:?}"),
292 "unfulfilled_watch_sets": unfulfilled_watch_sets,
293 "immediate_watch_sets": immediate_watch_sets,
294 }))
295 }
296}
297
298impl Controller {
299 pub fn update_orchestrator_scheduling_config(
300 &self,
301 config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
302 ) {
303 self.orchestrator.update_scheduling_config(config);
304 }
305 pub fn initialization_complete(&mut self) {
311 self.storage.initialization_complete();
312 self.compute.initialization_complete();
313 }
314
315 pub fn read_only(&self) -> bool {
317 self.read_only
318 }
319
320 fn take_internal_response(&mut self) -> Option<ControllerResponse> {
324 let ws = std::mem::take(&mut self.immediate_watch_sets);
325 (!ws.is_empty()).then_some(ControllerResponse::WatchSetFinished(ws))
326 }
327
328 pub async fn ready(&mut self) {
337 if let Readiness::NotReady = self.readiness {
338 if let Some(response) = self.take_internal_response() {
345 self.readiness = Readiness::Internal(response);
346 } else {
347 tokio::select! {
350 () = self.storage.ready() => {
351 self.readiness = Readiness::Storage;
352 }
353 () = self.compute.ready() => {
354 self.readiness = Readiness::Compute;
355 }
356 Some(metrics) = self.metrics_rx.recv() => {
357 self.readiness = Readiness::Metrics(metrics);
358 }
359 }
360 }
361 }
362 }
363
364 pub fn get_readiness(&self) -> &Readiness {
366 &self.readiness
367 }
368
369 pub fn install_compute_watch_set(
378 &mut self,
379 mut objects: BTreeSet<GlobalId>,
380 t: Timestamp,
381 ) -> Result<WatchSetId, CollectionLookupError> {
382 let ws_id = self.watch_set_id_gen.allocate_id();
383
384 let frontiers: BTreeMap<GlobalId, _> = objects
386 .iter()
387 .map(|id| {
388 self.compute
389 .collection_frontiers(*id, None)
390 .map(|f| (*id, f.write_frontier))
391 })
392 .collect::<Result<_, _>>()?;
393 objects.retain(|id| {
394 let frontier = frontiers.get(id).expect("just collected");
395 frontier.less_equal(&t)
396 });
397 if objects.is_empty() {
398 self.immediate_watch_sets.push(ws_id);
399 } else {
400 for id in objects.iter() {
401 self.unfulfilled_watch_sets_by_object
402 .entry(*id)
403 .or_default()
404 .insert(ws_id);
405 }
406 self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
407 }
408
409 Ok(ws_id)
410 }
411
412 pub fn install_storage_watch_set(
421 &mut self,
422 mut objects: BTreeSet<GlobalId>,
423 t: Timestamp,
424 ) -> Result<WatchSetId, CollectionMissing> {
425 let ws_id = self.watch_set_id_gen.allocate_id();
426
427 let uppers = self
428 .storage
429 .collections_frontiers(objects.iter().cloned().collect())?
430 .into_iter()
431 .map(|(id, _since, upper)| (id, upper))
432 .collect::<BTreeMap<_, _>>();
433
434 objects.retain(|id| {
435 let upper = uppers.get(id).expect("missing collection");
436 upper.less_equal(&t)
437 });
438 if objects.is_empty() {
439 self.immediate_watch_sets.push(ws_id);
440 } else {
441 for id in objects.iter() {
442 self.unfulfilled_watch_sets_by_object
443 .entry(*id)
444 .or_default()
445 .insert(ws_id);
446 }
447 self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
448 }
449 Ok(ws_id)
450 }
451
452 pub fn uninstall_watch_set(&mut self, ws_id: &WatchSetId) {
458 if let Some((obj_ids, _)) = self.unfulfilled_watch_sets.remove(ws_id) {
459 for obj_id in obj_ids {
460 let mut entry = match self.unfulfilled_watch_sets_by_object.entry(obj_id) {
461 Entry::Occupied(entry) => entry,
462 Entry::Vacant(_) => panic!("corrupted watchset state"),
463 };
464 entry.get_mut().remove(ws_id);
465 if entry.get().is_empty() {
466 entry.remove();
467 }
468 }
469 }
470 }
471
472 fn process_storage_response(
475 &mut self,
476 storage_metadata: &StorageMetadata,
477 ) -> Result<Option<ControllerResponse>, anyhow::Error> {
478 let maybe_response = self.storage.process(storage_metadata)?;
479 Ok(maybe_response.and_then(
480 |mz_storage_client::controller::Response::FrontierUpdates(r)| {
481 self.handle_frontier_updates(&r)
482 },
483 ))
484 }
485
486 fn process_compute_response(&mut self) -> Result<Option<ControllerResponse>, anyhow::Error> {
489 let response = self.compute.process();
490
491 let response = response.and_then(|r| match r {
492 ComputeControllerResponse::PeekNotification(uuid, peek, otel_ctx) => {
493 Some(ControllerResponse::PeekNotification(uuid, peek, otel_ctx))
494 }
495 ComputeControllerResponse::SubscribeResponse(id, tail) => {
496 Some(ControllerResponse::SubscribeResponse(id, tail))
497 }
498 ComputeControllerResponse::CopyToResponse(id, tail) => {
499 Some(ControllerResponse::CopyToResponse(id, tail))
500 }
501 ComputeControllerResponse::FrontierUpper { id, upper } => {
502 self.handle_frontier_updates(&[(id, upper)])
503 }
504 });
505 Ok(response)
506 }
507
508 #[mz_ore::instrument(level = "debug")]
516 pub fn process(
517 &mut self,
518 storage_metadata: &StorageMetadata,
519 ) -> Result<Option<ControllerResponse>, anyhow::Error> {
520 match mem::take(&mut self.readiness) {
521 Readiness::NotReady => Ok(None),
522 Readiness::Storage => self.process_storage_response(storage_metadata),
523 Readiness::Compute => self.process_compute_response(),
524 Readiness::Metrics((id, metrics)) => self.process_replica_metrics(id, metrics),
525 Readiness::Internal(message) => Ok(Some(message)),
526 }
527 }
528
529 fn handle_frontier_updates(
533 &mut self,
534 updates: &[(GlobalId, Antichain<Timestamp>)],
535 ) -> Option<ControllerResponse> {
536 let mut finished = vec![];
537 for (obj_id, antichain) in updates {
538 let ws_ids = self.unfulfilled_watch_sets_by_object.entry(*obj_id);
539 if let Entry::Occupied(mut ws_ids) = ws_ids {
540 ws_ids.get_mut().retain(|ws_id| {
541 let mut entry = match self.unfulfilled_watch_sets.entry(*ws_id) {
542 Entry::Occupied(entry) => entry,
543 Entry::Vacant(_) => panic!("corrupted watchset state"),
544 };
545 if !antichain.less_equal(&entry.get().1) {
547 entry.get_mut().0.remove(obj_id);
549 if entry.get().0.is_empty() {
551 entry.remove();
552 finished.push(*ws_id);
553 }
554 false
556 } else {
557 true
559 }
560 });
561 if ws_ids.get().is_empty() {
563 ws_ids.remove();
564 }
565 }
566 }
567 (!(finished.is_empty())).then(|| ControllerResponse::WatchSetFinished(finished))
568 }
569
570 fn process_replica_metrics(
571 &mut self,
572 id: ReplicaId,
573 metrics: Vec<ServiceProcessMetrics>,
574 ) -> Result<Option<ControllerResponse>, anyhow::Error> {
575 self.record_replica_metrics(id, &metrics);
576 Ok(None)
577 }
578
579 fn record_replica_metrics(&mut self, replica_id: ReplicaId, metrics: &[ServiceProcessMetrics]) {
580 if self.read_only() {
581 return;
582 }
583
584 let now = mz_ore::now::to_datetime((self.now)());
585 let now_tz = now.try_into().expect("must fit");
586
587 let replica_id = replica_id.to_string();
588 let mut row = Row::default();
589 let updates = metrics
590 .iter()
591 .enumerate()
592 .map(|(process_id, m)| {
593 row.packer().extend(&[
594 Datum::String(&replica_id),
595 Datum::UInt64(u64::cast_from(process_id)),
596 m.cpu_nano_cores.into(),
597 m.memory_bytes.into(),
598 m.disk_bytes.into(),
599 Datum::TimestampTz(now_tz),
600 m.heap_bytes.into(),
601 m.heap_limit.into(),
602 ]);
603 (row.clone(), mz_repr::Diff::ONE)
604 })
605 .collect();
606
607 self.storage
608 .append_introspection_updates(IntrospectionType::ReplicaMetricsHistory, updates);
609 }
610
611 pub async fn determine_real_time_recent_timestamp(
622 &self,
623 ids: BTreeSet<GlobalId>,
624 timeout: Duration,
625 ) -> Result<BoxFuture<'static, Result<Timestamp, StorageError>>, StorageError> {
626 self.storage.real_time_recent_timestamp(ids, timeout).await
627 }
628}
629
630impl Controller {
631 #[instrument(name = "controller::new")]
639 pub async fn new(
640 config: ControllerConfig,
641 envd_epoch: NonZeroI64,
642 read_only: bool,
643 storage_txn: &dyn StorageTxn,
644 ) -> Self {
645 if read_only {
646 tracing::info!("starting controllers in read-only mode!");
647 }
648
649 let now_fn = config.now.clone();
650 let wallclock_lag_fn = WallclockLagFn::new(now_fn);
651
652 let controller_metrics = ControllerMetrics::new(&config.metrics_registry);
653
654 let txns_metrics = Arc::new(TxnMetrics::new(&config.metrics_registry));
655 let collections_ctl = storage_collections::StorageCollectionsImpl::new(
656 config.persist_location.clone(),
657 Arc::clone(&config.persist_clients),
658 &config.metrics_registry,
659 config.now.clone(),
660 Arc::clone(&txns_metrics),
661 envd_epoch,
662 read_only,
663 config.connection_context.clone(),
664 storage_txn,
665 )
666 .await;
667
668 let collections_ctl: Arc<dyn StorageCollections + Send + Sync> = Arc::new(collections_ctl);
669
670 let storage_controller = mz_storage_controller::Controller::new(
671 config.build_info,
672 config.persist_location.clone(),
673 config.persist_clients,
674 config.now.clone(),
675 wallclock_lag_fn.clone(),
676 Arc::clone(&txns_metrics),
677 read_only,
678 &config.metrics_registry,
679 controller_metrics.clone(),
680 config.connection_context,
681 storage_txn,
682 Arc::clone(&collections_ctl),
683 )
684 .await;
685
686 let storage_collections = Arc::clone(&collections_ctl);
687 let compute_controller = ComputeController::new(
688 config.build_info,
689 storage_collections,
690 read_only,
691 &config.metrics_registry,
692 config.persist_location,
693 controller_metrics,
694 config.now.clone(),
695 wallclock_lag_fn,
696 );
697 let (metrics_tx, metrics_rx) = mpsc::unbounded_channel();
698
699 let this = Self {
700 storage: Box::new(storage_controller),
701 storage_collections: collections_ctl,
702 compute: compute_controller,
703 clusterd_image: config.clusterd_image,
704 init_container_image: config.init_container_image,
705 deploy_generation: config.deploy_generation,
706 read_only,
707 orchestrator: config.orchestrator.namespace("cluster"),
708 readiness: Readiness::NotReady,
709 metrics_tasks: BTreeMap::new(),
710 metrics_tx,
711 metrics_rx,
712 now: config.now,
713 persist_pubsub_url: config.persist_pubsub_url,
714 secrets_args: config.secrets_args,
715 unfulfilled_watch_sets_by_object: BTreeMap::new(),
716 unfulfilled_watch_sets: BTreeMap::new(),
717 watch_set_id_gen: Gen::default(),
718 immediate_watch_sets: Vec::new(),
719 dyncfg: mz_dyncfgs::all_dyncfgs(),
720 replica_http_locator: config.replica_http_locator,
721 };
722
723 if !this.read_only {
724 this.remove_past_generation_replicas_in_background();
725 }
726
727 this
728 }
729}