1use std::collections::btree_map::Entry;
25use std::collections::{BTreeMap, BTreeSet};
26use std::mem;
27use std::num::NonZeroI64;
28use std::sync::Arc;
29use std::time::Duration;
30
31use futures::future::BoxFuture;
32use mz_build_info::BuildInfo;
33use mz_cluster_client::metrics::ControllerMetrics;
34use mz_cluster_client::{ReplicaId, WallclockLagFn};
35use mz_compute_client::controller::error::CollectionLookupError;
36use mz_compute_client::controller::{
37 ComputeController, ComputeControllerResponse, ComputeControllerTimestamp, PeekNotification,
38};
39use mz_compute_client::protocol::response::SubscribeBatch;
40use mz_controller_types::WatchSetId;
41use mz_dyncfg::{ConfigSet, ConfigUpdates};
42use mz_orchestrator::{NamespacedOrchestrator, Orchestrator, ServiceProcessMetrics};
43use mz_ore::cast::CastFrom;
44use mz_ore::id_gen::Gen;
45use mz_ore::instrument;
46use mz_ore::metrics::MetricsRegistry;
47use mz_ore::now::{EpochMillis, NowFn};
48use mz_ore::task::AbortOnDropHandle;
49use mz_ore::tracing::OpenTelemetryContext;
50use mz_persist_client::PersistLocation;
51use mz_persist_client::cache::PersistClientCache;
52use mz_persist_types::Codec64;
53use mz_repr::{Datum, GlobalId, Row, TimestampManipulation};
54use mz_service::secrets::SecretsReaderCliArgs;
55use mz_storage_client::controller::{
56 IntrospectionType, StorageController, StorageMetadata, StorageTxn,
57};
58use mz_storage_client::storage_collections::{self, StorageCollections};
59use mz_storage_types::configuration::StorageConfiguration;
60use mz_storage_types::connections::ConnectionContext;
61use mz_storage_types::controller::StorageError;
62use mz_txn_wal::metrics::Metrics as TxnMetrics;
63use timely::progress::{Antichain, Timestamp};
64use tokio::sync::mpsc;
65use uuid::Uuid;
66
67pub mod clusters;
68pub mod replica_http_locator;
69
70pub use mz_storage_controller::prepare_initialization;
73pub use replica_http_locator::ReplicaHttpLocator;
74
75#[derive(Debug, Clone)]
77pub struct ControllerConfig {
78 pub build_info: &'static BuildInfo,
80 pub orchestrator: Arc<dyn Orchestrator>,
82 pub persist_location: PersistLocation,
84 pub persist_clients: Arc<PersistClientCache>,
88 pub clusterd_image: String,
90 pub init_container_image: Option<String>,
92 pub deploy_generation: u64,
97 pub now: NowFn,
99 pub metrics_registry: MetricsRegistry,
101 pub persist_pubsub_url: String,
103 pub secrets_args: SecretsReaderCliArgs,
105 pub connection_context: ConnectionContext,
107 pub replica_http_locator: Arc<ReplicaHttpLocator>,
110}
111
112#[derive(Debug)]
114pub enum ControllerResponse<T = mz_repr::Timestamp> {
115 PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
121 SubscribeResponse(GlobalId, SubscribeBatch<T>),
123 CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
125 WatchSetFinished(Vec<WatchSetId>),
129}
130
131#[derive(Debug, Default)]
134pub enum Readiness<T> {
135 #[default]
137 NotReady,
138 Storage,
140 Compute,
142 Metrics((ReplicaId, Vec<ServiceProcessMetrics>)),
144 Internal(ControllerResponse<T>),
146}
147
148pub struct Controller<T: ComputeControllerTimestamp = mz_repr::Timestamp> {
150 pub storage: Box<dyn StorageController<Timestamp = T>>,
151 pub storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
152 pub compute: ComputeController<T>,
153 clusterd_image: String,
155 init_container_image: Option<String>,
157 deploy_generation: u64,
159 read_only: bool,
165 orchestrator: Arc<dyn NamespacedOrchestrator>,
167 readiness: Readiness<T>,
169 metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>,
171 metrics_tx: mpsc::UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>,
173 metrics_rx: mpsc::UnboundedReceiver<(ReplicaId, Vec<ServiceProcessMetrics>)>,
175 now: NowFn,
177
178 persist_pubsub_url: String,
180
181 secrets_args: SecretsReaderCliArgs,
183
184 unfulfilled_watch_sets_by_object: BTreeMap<GlobalId, BTreeSet<WatchSetId>>,
192 unfulfilled_watch_sets: BTreeMap<WatchSetId, (BTreeSet<GlobalId>, T)>,
194 watch_set_id_gen: Gen<WatchSetId>,
196
197 immediate_watch_sets: Vec<WatchSetId>,
203
204 dyncfg: ConfigSet,
206
207 replica_http_locator: Arc<ReplicaHttpLocator>,
209}
210
211impl<T: ComputeControllerTimestamp> Controller<T> {
212 pub fn update_configuration(&mut self, updates: ConfigUpdates) {
214 updates.apply(&self.dyncfg);
215 }
216
217 pub fn start_compute_introspection_sink(&mut self) {
222 self.compute.start_introspection_sink(&*self.storage);
223 }
224
225 pub fn connection_context(&self) -> &ConnectionContext {
229 &self.storage.config().connection_context
230 }
231
232 pub fn storage_configuration(&self) -> &StorageConfiguration {
236 self.storage.config()
237 }
238
239 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
243 let Self {
250 storage_collections,
251 storage,
252 compute,
253 clusterd_image: _,
254 init_container_image: _,
255 deploy_generation,
256 read_only,
257 orchestrator: _,
258 readiness,
259 metrics_tasks: _,
260 metrics_tx: _,
261 metrics_rx: _,
262 now: _,
263 persist_pubsub_url: _,
264 secrets_args: _,
265 unfulfilled_watch_sets_by_object: _,
266 unfulfilled_watch_sets,
267 watch_set_id_gen: _,
268 immediate_watch_sets,
269 dyncfg: _,
270 replica_http_locator: _,
271 } = self;
272
273 let storage_collections = storage_collections.dump()?;
274 let storage = storage.dump()?;
275 let compute = compute.dump().await?;
276
277 let unfulfilled_watch_sets: BTreeMap<_, _> = unfulfilled_watch_sets
278 .iter()
279 .map(|(ws_id, watches)| (format!("{ws_id:?}"), format!("{watches:?}")))
280 .collect();
281 let immediate_watch_sets: Vec<_> = immediate_watch_sets
282 .iter()
283 .map(|watch| format!("{watch:?}"))
284 .collect();
285
286 Ok(serde_json::json!({
287 "storage_collections": storage_collections,
288 "storage": storage,
289 "compute": compute,
290 "deploy_generation": deploy_generation,
291 "read_only": read_only,
292 "readiness": format!("{readiness:?}"),
293 "unfulfilled_watch_sets": unfulfilled_watch_sets,
294 "immediate_watch_sets": immediate_watch_sets,
295 }))
296 }
297}
298
299impl<T> Controller<T>
300where
301 T: ComputeControllerTimestamp,
302{
303 pub fn update_orchestrator_scheduling_config(
304 &self,
305 config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
306 ) {
307 self.orchestrator.update_scheduling_config(config);
308 }
309 pub fn initialization_complete(&mut self) {
315 self.storage.initialization_complete();
316 self.compute.initialization_complete();
317 }
318
319 pub fn read_only(&self) -> bool {
321 self.read_only
322 }
323
324 fn take_internal_response(&mut self) -> Option<ControllerResponse<T>> {
328 let ws = std::mem::take(&mut self.immediate_watch_sets);
329 (!ws.is_empty()).then_some(ControllerResponse::WatchSetFinished(ws))
330 }
331
332 pub async fn ready(&mut self) {
341 if let Readiness::NotReady = self.readiness {
342 if let Some(response) = self.take_internal_response() {
349 self.readiness = Readiness::Internal(response);
350 } else {
351 tokio::select! {
354 () = self.storage.ready() => {
355 self.readiness = Readiness::Storage;
356 }
357 () = self.compute.ready() => {
358 self.readiness = Readiness::Compute;
359 }
360 Some(metrics) = self.metrics_rx.recv() => {
361 self.readiness = Readiness::Metrics(metrics);
362 }
363 }
364 }
365 }
366 }
367
368 pub fn get_readiness(&self) -> &Readiness<T> {
370 &self.readiness
371 }
372
373 pub fn install_compute_watch_set(
382 &mut self,
383 mut objects: BTreeSet<GlobalId>,
384 t: T,
385 ) -> Result<WatchSetId, CollectionLookupError> {
386 let ws_id = self.watch_set_id_gen.allocate_id();
387
388 let frontiers: BTreeMap<GlobalId, _> = objects
390 .iter()
391 .map(|id| {
392 self.compute
393 .collection_frontiers(*id, None)
394 .map(|f| (*id, f.write_frontier))
395 })
396 .collect::<Result<_, _>>()?;
397 objects.retain(|id| {
398 let frontier = frontiers.get(id).expect("just collected");
399 frontier.less_equal(&t)
400 });
401 if objects.is_empty() {
402 self.immediate_watch_sets.push(ws_id);
403 } else {
404 for id in objects.iter() {
405 self.unfulfilled_watch_sets_by_object
406 .entry(*id)
407 .or_default()
408 .insert(ws_id);
409 }
410 self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
411 }
412
413 Ok(ws_id)
414 }
415
416 pub fn install_storage_watch_set(
425 &mut self,
426 mut objects: BTreeSet<GlobalId>,
427 t: T,
428 ) -> Result<WatchSetId, CollectionLookupError> {
429 let ws_id = self.watch_set_id_gen.allocate_id();
430
431 let uppers = self
432 .storage
433 .collections_frontiers(objects.iter().cloned().collect())?
434 .into_iter()
435 .map(|(id, _since, upper)| (id, upper))
436 .collect::<BTreeMap<_, _>>();
437
438 objects.retain(|id| {
439 let upper = uppers.get(id).expect("missing collection");
440 upper.less_equal(&t)
441 });
442 if objects.is_empty() {
443 self.immediate_watch_sets.push(ws_id);
444 } else {
445 for id in objects.iter() {
446 self.unfulfilled_watch_sets_by_object
447 .entry(*id)
448 .or_default()
449 .insert(ws_id);
450 }
451 self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
452 }
453 Ok(ws_id)
454 }
455
456 pub fn uninstall_watch_set(&mut self, ws_id: &WatchSetId) {
462 if let Some((obj_ids, _)) = self.unfulfilled_watch_sets.remove(ws_id) {
463 for obj_id in obj_ids {
464 let mut entry = match self.unfulfilled_watch_sets_by_object.entry(obj_id) {
465 Entry::Occupied(entry) => entry,
466 Entry::Vacant(_) => panic!("corrupted watchset state"),
467 };
468 entry.get_mut().remove(ws_id);
469 if entry.get().is_empty() {
470 entry.remove();
471 }
472 }
473 }
474 }
475
476 fn process_storage_response(
479 &mut self,
480 storage_metadata: &StorageMetadata,
481 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
482 let maybe_response = self.storage.process(storage_metadata)?;
483 Ok(maybe_response.and_then(
484 |mz_storage_client::controller::Response::FrontierUpdates(r)| {
485 self.handle_frontier_updates(&r)
486 },
487 ))
488 }
489
490 fn process_compute_response(&mut self) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
493 let response = self.compute.process();
494
495 let response = response.and_then(|r| match r {
496 ComputeControllerResponse::PeekNotification(uuid, peek, otel_ctx) => {
497 Some(ControllerResponse::PeekNotification(uuid, peek, otel_ctx))
498 }
499 ComputeControllerResponse::SubscribeResponse(id, tail) => {
500 Some(ControllerResponse::SubscribeResponse(id, tail))
501 }
502 ComputeControllerResponse::CopyToResponse(id, tail) => {
503 Some(ControllerResponse::CopyToResponse(id, tail))
504 }
505 ComputeControllerResponse::FrontierUpper { id, upper } => {
506 self.handle_frontier_updates(&[(id, upper)])
507 }
508 });
509 Ok(response)
510 }
511
512 #[mz_ore::instrument(level = "debug")]
520 pub fn process(
521 &mut self,
522 storage_metadata: &StorageMetadata,
523 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
524 match mem::take(&mut self.readiness) {
525 Readiness::NotReady => Ok(None),
526 Readiness::Storage => self.process_storage_response(storage_metadata),
527 Readiness::Compute => self.process_compute_response(),
528 Readiness::Metrics((id, metrics)) => self.process_replica_metrics(id, metrics),
529 Readiness::Internal(message) => Ok(Some(message)),
530 }
531 }
532
533 fn handle_frontier_updates(
537 &mut self,
538 updates: &[(GlobalId, Antichain<T>)],
539 ) -> Option<ControllerResponse<T>> {
540 let mut finished = vec![];
541 for (obj_id, antichain) in updates {
542 let ws_ids = self.unfulfilled_watch_sets_by_object.entry(*obj_id);
543 if let Entry::Occupied(mut ws_ids) = ws_ids {
544 ws_ids.get_mut().retain(|ws_id| {
545 let mut entry = match self.unfulfilled_watch_sets.entry(*ws_id) {
546 Entry::Occupied(entry) => entry,
547 Entry::Vacant(_) => panic!("corrupted watchset state"),
548 };
549 if !antichain.less_equal(&entry.get().1) {
551 entry.get_mut().0.remove(obj_id);
553 if entry.get().0.is_empty() {
555 entry.remove();
556 finished.push(*ws_id);
557 }
558 false
560 } else {
561 true
563 }
564 });
565 if ws_ids.get().is_empty() {
567 ws_ids.remove();
568 }
569 }
570 }
571 (!(finished.is_empty())).then(|| ControllerResponse::WatchSetFinished(finished))
572 }
573
574 fn process_replica_metrics(
575 &mut self,
576 id: ReplicaId,
577 metrics: Vec<ServiceProcessMetrics>,
578 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
579 self.record_replica_metrics(id, &metrics);
580 Ok(None)
581 }
582
583 fn record_replica_metrics(&mut self, replica_id: ReplicaId, metrics: &[ServiceProcessMetrics]) {
584 if self.read_only() {
585 return;
586 }
587
588 let now = mz_ore::now::to_datetime((self.now)());
589 let now_tz = now.try_into().expect("must fit");
590
591 let replica_id = replica_id.to_string();
592 let mut row = Row::default();
593 let updates = metrics
594 .iter()
595 .enumerate()
596 .map(|(process_id, m)| {
597 row.packer().extend(&[
598 Datum::String(&replica_id),
599 Datum::UInt64(u64::cast_from(process_id)),
600 m.cpu_nano_cores.into(),
601 m.memory_bytes.into(),
602 m.disk_bytes.into(),
603 Datum::TimestampTz(now_tz),
604 m.heap_bytes.into(),
605 m.heap_limit.into(),
606 ]);
607 (row.clone(), mz_repr::Diff::ONE)
608 })
609 .collect();
610
611 self.storage
612 .append_introspection_updates(IntrospectionType::ReplicaMetricsHistory, updates);
613 }
614
615 pub async fn determine_real_time_recent_timestamp(
626 &self,
627 ids: BTreeSet<GlobalId>,
628 timeout: Duration,
629 ) -> Result<BoxFuture<'static, Result<T, StorageError<T>>>, StorageError<T>> {
630 self.storage.real_time_recent_timestamp(ids, timeout).await
631 }
632}
633
634impl<T> Controller<T>
635where
636 T: Timestamp
638 + Codec64
639 + From<EpochMillis>
640 + TimestampManipulation
641 + std::fmt::Display
642 + Into<mz_repr::Timestamp>,
643 T: ComputeControllerTimestamp,
645{
646 #[instrument(name = "controller::new")]
654 pub async fn new(
655 config: ControllerConfig,
656 envd_epoch: NonZeroI64,
657 read_only: bool,
658 storage_txn: &dyn StorageTxn<T>,
659 ) -> Self {
660 if read_only {
661 tracing::info!("starting controllers in read-only mode!");
662 }
663
664 let now_fn = config.now.clone();
665 let wallclock_lag_fn = WallclockLagFn::new(now_fn);
666
667 let controller_metrics = ControllerMetrics::new(&config.metrics_registry);
668
669 let txns_metrics = Arc::new(TxnMetrics::new(&config.metrics_registry));
670 let collections_ctl = storage_collections::StorageCollectionsImpl::new(
671 config.persist_location.clone(),
672 Arc::clone(&config.persist_clients),
673 &config.metrics_registry,
674 config.now.clone(),
675 Arc::clone(&txns_metrics),
676 envd_epoch,
677 read_only,
678 config.connection_context.clone(),
679 storage_txn,
680 )
681 .await;
682
683 let collections_ctl: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync> =
684 Arc::new(collections_ctl);
685
686 let storage_controller = mz_storage_controller::Controller::new(
687 config.build_info,
688 config.persist_location.clone(),
689 config.persist_clients,
690 config.now.clone(),
691 wallclock_lag_fn.clone(),
692 Arc::clone(&txns_metrics),
693 read_only,
694 &config.metrics_registry,
695 controller_metrics.clone(),
696 config.connection_context,
697 storage_txn,
698 Arc::clone(&collections_ctl),
699 )
700 .await;
701
702 let storage_collections = Arc::clone(&collections_ctl);
703 let compute_controller = ComputeController::new(
704 config.build_info,
705 storage_collections,
706 read_only,
707 &config.metrics_registry,
708 config.persist_location,
709 controller_metrics,
710 config.now.clone(),
711 wallclock_lag_fn,
712 );
713 let (metrics_tx, metrics_rx) = mpsc::unbounded_channel();
714
715 let this = Self {
716 storage: Box::new(storage_controller),
717 storage_collections: collections_ctl,
718 compute: compute_controller,
719 clusterd_image: config.clusterd_image,
720 init_container_image: config.init_container_image,
721 deploy_generation: config.deploy_generation,
722 read_only,
723 orchestrator: config.orchestrator.namespace("cluster"),
724 readiness: Readiness::NotReady,
725 metrics_tasks: BTreeMap::new(),
726 metrics_tx,
727 metrics_rx,
728 now: config.now,
729 persist_pubsub_url: config.persist_pubsub_url,
730 secrets_args: config.secrets_args,
731 unfulfilled_watch_sets_by_object: BTreeMap::new(),
732 unfulfilled_watch_sets: BTreeMap::new(),
733 watch_set_id_gen: Gen::default(),
734 immediate_watch_sets: Vec::new(),
735 dyncfg: mz_dyncfgs::all_dyncfgs(),
736 replica_http_locator: config.replica_http_locator,
737 };
738
739 if !this.read_only {
740 this.remove_past_generation_replicas_in_background();
741 }
742
743 this
744 }
745}