1use std::collections::btree_map::Entry;
25use std::collections::{BTreeMap, BTreeSet};
26use std::mem;
27use std::num::NonZeroI64;
28use std::sync::Arc;
29use std::time::Duration;
30
31use futures::future::BoxFuture;
32use mz_build_info::BuildInfo;
33use mz_cluster_client::metrics::ControllerMetrics;
34use mz_cluster_client::{ReplicaId, WallclockLagFn};
35use mz_compute_client::controller::error::CollectionLookupError;
36use mz_compute_client::controller::{
37 ComputeController, ComputeControllerResponse, ComputeControllerTimestamp, PeekNotification,
38};
39use mz_compute_client::protocol::response::SubscribeBatch;
40use mz_controller_types::WatchSetId;
41use mz_dyncfg::{ConfigSet, ConfigUpdates};
42use mz_orchestrator::{NamespacedOrchestrator, Orchestrator, ServiceProcessMetrics};
43use mz_ore::cast::CastFrom;
44use mz_ore::id_gen::Gen;
45use mz_ore::instrument;
46use mz_ore::metrics::MetricsRegistry;
47use mz_ore::now::{EpochMillis, NowFn};
48use mz_ore::task::AbortOnDropHandle;
49use mz_ore::tracing::OpenTelemetryContext;
50use mz_persist_client::PersistLocation;
51use mz_persist_client::cache::PersistClientCache;
52use mz_persist_types::Codec64;
53use mz_repr::{Datum, GlobalId, Row, TimestampManipulation};
54use mz_service::secrets::SecretsReaderCliArgs;
55use mz_storage_client::controller::{
56 IntrospectionType, StorageController, StorageMetadata, StorageTxn,
57};
58use mz_storage_client::storage_collections::{self, StorageCollections};
59use mz_storage_types::configuration::StorageConfiguration;
60use mz_storage_types::connections::ConnectionContext;
61use mz_storage_types::controller::StorageError;
62use mz_txn_wal::metrics::Metrics as TxnMetrics;
63use timely::progress::{Antichain, Timestamp};
64use tokio::sync::mpsc;
65use uuid::Uuid;
66
67pub mod clusters;
68
69pub use mz_storage_controller::prepare_initialization;
72
73#[derive(Debug, Clone)]
75pub struct ControllerConfig {
76 pub build_info: &'static BuildInfo,
78 pub orchestrator: Arc<dyn Orchestrator>,
80 pub persist_location: PersistLocation,
82 pub persist_clients: Arc<PersistClientCache>,
86 pub clusterd_image: String,
88 pub init_container_image: Option<String>,
90 pub deploy_generation: u64,
95 pub now: NowFn,
97 pub metrics_registry: MetricsRegistry,
99 pub persist_pubsub_url: String,
101 pub secrets_args: SecretsReaderCliArgs,
103 pub connection_context: ConnectionContext,
105}
106
107#[derive(Debug)]
109pub enum ControllerResponse<T = mz_repr::Timestamp> {
110 PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
116 SubscribeResponse(GlobalId, SubscribeBatch<T>),
118 CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
120 WatchSetFinished(Vec<WatchSetId>),
124}
125
126#[derive(Debug, Default)]
129pub enum Readiness<T> {
130 #[default]
132 NotReady,
133 Storage,
135 Compute,
137 Metrics((ReplicaId, Vec<ServiceProcessMetrics>)),
139 Internal(ControllerResponse<T>),
141}
142
143pub struct Controller<T: ComputeControllerTimestamp = mz_repr::Timestamp> {
145 pub storage: Box<dyn StorageController<Timestamp = T>>,
146 pub storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
147 pub compute: ComputeController<T>,
148 clusterd_image: String,
150 init_container_image: Option<String>,
152 deploy_generation: u64,
154 read_only: bool,
160 orchestrator: Arc<dyn NamespacedOrchestrator>,
162 readiness: Readiness<T>,
164 metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>,
166 metrics_tx: mpsc::UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>,
168 metrics_rx: mpsc::UnboundedReceiver<(ReplicaId, Vec<ServiceProcessMetrics>)>,
170 now: NowFn,
172
173 persist_pubsub_url: String,
175
176 secrets_args: SecretsReaderCliArgs,
178
179 unfulfilled_watch_sets_by_object: BTreeMap<GlobalId, BTreeSet<WatchSetId>>,
187 unfulfilled_watch_sets: BTreeMap<WatchSetId, (BTreeSet<GlobalId>, T)>,
189 watch_set_id_gen: Gen<WatchSetId>,
191
192 immediate_watch_sets: Vec<WatchSetId>,
198
199 dyncfg: ConfigSet,
201}
202
203impl<T: ComputeControllerTimestamp> Controller<T> {
204 pub fn update_configuration(&mut self, updates: ConfigUpdates) {
206 updates.apply(&self.dyncfg);
207 }
208
209 pub fn start_compute_introspection_sink(&mut self) {
214 self.compute.start_introspection_sink(&*self.storage);
215 }
216
217 pub fn connection_context(&self) -> &ConnectionContext {
221 &self.storage.config().connection_context
222 }
223
224 pub fn storage_configuration(&self) -> &StorageConfiguration {
228 self.storage.config()
229 }
230
231 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
235 let Self {
242 storage_collections,
243 storage,
244 compute,
245 clusterd_image: _,
246 init_container_image: _,
247 deploy_generation,
248 read_only,
249 orchestrator: _,
250 readiness,
251 metrics_tasks: _,
252 metrics_tx: _,
253 metrics_rx: _,
254 now: _,
255 persist_pubsub_url: _,
256 secrets_args: _,
257 unfulfilled_watch_sets_by_object: _,
258 unfulfilled_watch_sets,
259 watch_set_id_gen: _,
260 immediate_watch_sets,
261 dyncfg: _,
262 } = self;
263
264 let storage_collections = storage_collections.dump()?;
265 let storage = storage.dump()?;
266 let compute = compute.dump().await?;
267
268 let unfulfilled_watch_sets: BTreeMap<_, _> = unfulfilled_watch_sets
269 .iter()
270 .map(|(ws_id, watches)| (format!("{ws_id:?}"), format!("{watches:?}")))
271 .collect();
272 let immediate_watch_sets: Vec<_> = immediate_watch_sets
273 .iter()
274 .map(|watch| format!("{watch:?}"))
275 .collect();
276
277 Ok(serde_json::json!({
278 "storage_collections": storage_collections,
279 "storage": storage,
280 "compute": compute,
281 "deploy_generation": deploy_generation,
282 "read_only": read_only,
283 "readiness": format!("{readiness:?}"),
284 "unfulfilled_watch_sets": unfulfilled_watch_sets,
285 "immediate_watch_sets": immediate_watch_sets,
286 }))
287 }
288}
289
290impl<T> Controller<T>
291where
292 T: ComputeControllerTimestamp,
293{
294 pub fn update_orchestrator_scheduling_config(
295 &self,
296 config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
297 ) {
298 self.orchestrator.update_scheduling_config(config);
299 }
300 pub fn initialization_complete(&mut self) {
306 self.storage.initialization_complete();
307 self.compute.initialization_complete();
308 }
309
310 pub fn read_only(&self) -> bool {
312 self.read_only
313 }
314
315 fn take_internal_response(&mut self) -> Option<ControllerResponse<T>> {
319 let ws = std::mem::take(&mut self.immediate_watch_sets);
320 (!ws.is_empty()).then_some(ControllerResponse::WatchSetFinished(ws))
321 }
322
323 pub async fn ready(&mut self) {
332 if let Readiness::NotReady = self.readiness {
333 if let Some(response) = self.take_internal_response() {
340 self.readiness = Readiness::Internal(response);
341 } else {
342 tokio::select! {
345 () = self.storage.ready() => {
346 self.readiness = Readiness::Storage;
347 }
348 () = self.compute.ready() => {
349 self.readiness = Readiness::Compute;
350 }
351 Some(metrics) = self.metrics_rx.recv() => {
352 self.readiness = Readiness::Metrics(metrics);
353 }
354 }
355 }
356 }
357 }
358
359 pub fn get_readiness(&self) -> &Readiness<T> {
361 &self.readiness
362 }
363
364 pub fn install_compute_watch_set(
373 &mut self,
374 mut objects: BTreeSet<GlobalId>,
375 t: T,
376 ) -> Result<WatchSetId, CollectionLookupError> {
377 let ws_id = self.watch_set_id_gen.allocate_id();
378
379 let frontiers: BTreeMap<GlobalId, _> = objects
381 .iter()
382 .map(|id| {
383 self.compute
384 .collection_frontiers(*id, None)
385 .map(|f| (*id, f.write_frontier))
386 })
387 .collect::<Result<_, _>>()?;
388 objects.retain(|id| {
389 let frontier = frontiers.get(id).expect("just collected");
390 frontier.less_equal(&t)
391 });
392 if objects.is_empty() {
393 self.immediate_watch_sets.push(ws_id);
394 } else {
395 for id in objects.iter() {
396 self.unfulfilled_watch_sets_by_object
397 .entry(*id)
398 .or_default()
399 .insert(ws_id);
400 }
401 self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
402 }
403
404 Ok(ws_id)
405 }
406
407 pub fn install_storage_watch_set(
416 &mut self,
417 mut objects: BTreeSet<GlobalId>,
418 t: T,
419 ) -> Result<WatchSetId, CollectionLookupError> {
420 let ws_id = self.watch_set_id_gen.allocate_id();
421
422 let uppers = self
423 .storage
424 .collections_frontiers(objects.iter().cloned().collect())?
425 .into_iter()
426 .map(|(id, _since, upper)| (id, upper))
427 .collect::<BTreeMap<_, _>>();
428
429 objects.retain(|id| {
430 let upper = uppers.get(id).expect("missing collection");
431 upper.less_equal(&t)
432 });
433 if objects.is_empty() {
434 self.immediate_watch_sets.push(ws_id);
435 } else {
436 for id in objects.iter() {
437 self.unfulfilled_watch_sets_by_object
438 .entry(*id)
439 .or_default()
440 .insert(ws_id);
441 }
442 self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
443 }
444 Ok(ws_id)
445 }
446
447 pub fn uninstall_watch_set(&mut self, ws_id: &WatchSetId) {
453 if let Some((obj_ids, _)) = self.unfulfilled_watch_sets.remove(ws_id) {
454 for obj_id in obj_ids {
455 let mut entry = match self.unfulfilled_watch_sets_by_object.entry(obj_id) {
456 Entry::Occupied(entry) => entry,
457 Entry::Vacant(_) => panic!("corrupted watchset state"),
458 };
459 entry.get_mut().remove(ws_id);
460 if entry.get().is_empty() {
461 entry.remove();
462 }
463 }
464 }
465 }
466
467 fn process_storage_response(
470 &mut self,
471 storage_metadata: &StorageMetadata,
472 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
473 let maybe_response = self.storage.process(storage_metadata)?;
474 Ok(maybe_response.and_then(
475 |mz_storage_client::controller::Response::FrontierUpdates(r)| {
476 self.handle_frontier_updates(&r)
477 },
478 ))
479 }
480
481 fn process_compute_response(&mut self) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
484 let response = self.compute.process();
485
486 let response = response.and_then(|r| match r {
487 ComputeControllerResponse::PeekNotification(uuid, peek, otel_ctx) => {
488 Some(ControllerResponse::PeekNotification(uuid, peek, otel_ctx))
489 }
490 ComputeControllerResponse::SubscribeResponse(id, tail) => {
491 Some(ControllerResponse::SubscribeResponse(id, tail))
492 }
493 ComputeControllerResponse::CopyToResponse(id, tail) => {
494 Some(ControllerResponse::CopyToResponse(id, tail))
495 }
496 ComputeControllerResponse::FrontierUpper { id, upper } => {
497 self.handle_frontier_updates(&[(id, upper)])
498 }
499 });
500 Ok(response)
501 }
502
503 #[mz_ore::instrument(level = "debug")]
511 pub fn process(
512 &mut self,
513 storage_metadata: &StorageMetadata,
514 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
515 match mem::take(&mut self.readiness) {
516 Readiness::NotReady => Ok(None),
517 Readiness::Storage => self.process_storage_response(storage_metadata),
518 Readiness::Compute => self.process_compute_response(),
519 Readiness::Metrics((id, metrics)) => self.process_replica_metrics(id, metrics),
520 Readiness::Internal(message) => Ok(Some(message)),
521 }
522 }
523
524 fn handle_frontier_updates(
528 &mut self,
529 updates: &[(GlobalId, Antichain<T>)],
530 ) -> Option<ControllerResponse<T>> {
531 let mut finished = vec![];
532 for (obj_id, antichain) in updates {
533 let ws_ids = self.unfulfilled_watch_sets_by_object.entry(*obj_id);
534 if let Entry::Occupied(mut ws_ids) = ws_ids {
535 ws_ids.get_mut().retain(|ws_id| {
536 let mut entry = match self.unfulfilled_watch_sets.entry(*ws_id) {
537 Entry::Occupied(entry) => entry,
538 Entry::Vacant(_) => panic!("corrupted watchset state"),
539 };
540 if !antichain.less_equal(&entry.get().1) {
542 entry.get_mut().0.remove(obj_id);
544 if entry.get().0.is_empty() {
546 entry.remove();
547 finished.push(*ws_id);
548 }
549 false
551 } else {
552 true
554 }
555 });
556 if ws_ids.get().is_empty() {
558 ws_ids.remove();
559 }
560 }
561 }
562 (!(finished.is_empty())).then(|| ControllerResponse::WatchSetFinished(finished))
563 }
564
565 fn process_replica_metrics(
566 &mut self,
567 id: ReplicaId,
568 metrics: Vec<ServiceProcessMetrics>,
569 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
570 self.record_replica_metrics(id, &metrics);
571 Ok(None)
572 }
573
574 fn record_replica_metrics(&mut self, replica_id: ReplicaId, metrics: &[ServiceProcessMetrics]) {
575 if self.read_only() {
576 return;
577 }
578
579 let now = mz_ore::now::to_datetime((self.now)());
580 let now_tz = now.try_into().expect("must fit");
581
582 let replica_id = replica_id.to_string();
583 let mut row = Row::default();
584 let updates = metrics
585 .iter()
586 .enumerate()
587 .map(|(process_id, m)| {
588 row.packer().extend(&[
589 Datum::String(&replica_id),
590 Datum::UInt64(u64::cast_from(process_id)),
591 m.cpu_nano_cores.into(),
592 m.memory_bytes.into(),
593 m.disk_bytes.into(),
594 Datum::TimestampTz(now_tz),
595 m.heap_bytes.into(),
596 m.heap_limit.into(),
597 ]);
598 (row.clone(), mz_repr::Diff::ONE)
599 })
600 .collect();
601
602 self.storage
603 .append_introspection_updates(IntrospectionType::ReplicaMetricsHistory, updates);
604 }
605
606 pub async fn determine_real_time_recent_timestamp(
617 &self,
618 ids: BTreeSet<GlobalId>,
619 timeout: Duration,
620 ) -> Result<BoxFuture<'static, Result<T, StorageError<T>>>, StorageError<T>> {
621 self.storage.real_time_recent_timestamp(ids, timeout).await
622 }
623}
624
625impl<T> Controller<T>
626where
627 T: Timestamp
629 + Codec64
630 + From<EpochMillis>
631 + TimestampManipulation
632 + std::fmt::Display
633 + Into<mz_repr::Timestamp>,
634 T: ComputeControllerTimestamp,
636{
637 #[instrument(name = "controller::new")]
645 pub async fn new(
646 config: ControllerConfig,
647 envd_epoch: NonZeroI64,
648 read_only: bool,
649 storage_txn: &dyn StorageTxn<T>,
650 ) -> Self {
651 if read_only {
652 tracing::info!("starting controllers in read-only mode!");
653 }
654
655 let now_fn = config.now.clone();
656 let wallclock_lag_fn = WallclockLagFn::new(now_fn);
657
658 let controller_metrics = ControllerMetrics::new(&config.metrics_registry);
659
660 let txns_metrics = Arc::new(TxnMetrics::new(&config.metrics_registry));
661 let collections_ctl = storage_collections::StorageCollectionsImpl::new(
662 config.persist_location.clone(),
663 Arc::clone(&config.persist_clients),
664 &config.metrics_registry,
665 config.now.clone(),
666 Arc::clone(&txns_metrics),
667 envd_epoch,
668 read_only,
669 config.connection_context.clone(),
670 storage_txn,
671 )
672 .await;
673
674 let collections_ctl: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync> =
675 Arc::new(collections_ctl);
676
677 let storage_controller = mz_storage_controller::Controller::new(
678 config.build_info,
679 config.persist_location.clone(),
680 config.persist_clients,
681 config.now.clone(),
682 wallclock_lag_fn.clone(),
683 Arc::clone(&txns_metrics),
684 read_only,
685 &config.metrics_registry,
686 controller_metrics.clone(),
687 config.connection_context,
688 storage_txn,
689 Arc::clone(&collections_ctl),
690 )
691 .await;
692
693 let storage_collections = Arc::clone(&collections_ctl);
694 let compute_controller = ComputeController::new(
695 config.build_info,
696 storage_collections,
697 read_only,
698 &config.metrics_registry,
699 config.persist_location,
700 controller_metrics,
701 config.now.clone(),
702 wallclock_lag_fn,
703 );
704 let (metrics_tx, metrics_rx) = mpsc::unbounded_channel();
705
706 let this = Self {
707 storage: Box::new(storage_controller),
708 storage_collections: collections_ctl,
709 compute: compute_controller,
710 clusterd_image: config.clusterd_image,
711 init_container_image: config.init_container_image,
712 deploy_generation: config.deploy_generation,
713 read_only,
714 orchestrator: config.orchestrator.namespace("cluster"),
715 readiness: Readiness::NotReady,
716 metrics_tasks: BTreeMap::new(),
717 metrics_tx,
718 metrics_rx,
719 now: config.now,
720 persist_pubsub_url: config.persist_pubsub_url,
721 secrets_args: config.secrets_args,
722 unfulfilled_watch_sets_by_object: BTreeMap::new(),
723 unfulfilled_watch_sets: BTreeMap::new(),
724 watch_set_id_gen: Gen::default(),
725 immediate_watch_sets: Vec::new(),
726 dyncfg: mz_dyncfgs::all_dyncfgs(),
727 };
728
729 if !this.read_only {
730 this.remove_past_generation_replicas_in_background();
731 }
732
733 this
734 }
735}