1use std::collections::btree_map::Entry;
25use std::collections::{BTreeMap, BTreeSet};
26use std::mem;
27use std::num::NonZeroI64;
28use std::sync::Arc;
29use std::time::Duration;
30
31use futures::future::BoxFuture;
32use mz_build_info::BuildInfo;
33use mz_cluster_client::metrics::ControllerMetrics;
34use mz_cluster_client::{ReplicaId, WallclockLagFn};
35use mz_compute_client::controller::{
36 ComputeController, ComputeControllerResponse, ComputeControllerTimestamp, PeekNotification,
37};
38use mz_compute_client::protocol::response::SubscribeBatch;
39use mz_compute_client::service::{ComputeClient, ComputeGrpcClient};
40use mz_controller_types::WatchSetId;
41use mz_orchestrator::{NamespacedOrchestrator, Orchestrator, ServiceProcessMetrics};
42use mz_ore::id_gen::Gen;
43use mz_ore::instrument;
44use mz_ore::metrics::MetricsRegistry;
45use mz_ore::now::{EpochMillis, NowFn};
46use mz_ore::task::AbortOnDropHandle;
47use mz_ore::tracing::OpenTelemetryContext;
48use mz_persist_client::PersistLocation;
49use mz_persist_client::cache::PersistClientCache;
50use mz_persist_types::Codec64;
51use mz_proto::RustType;
52use mz_repr::{Datum, GlobalId, Row, TimestampManipulation};
53use mz_service::secrets::SecretsReaderCliArgs;
54use mz_storage_client::client::{
55 ProtoStorageCommand, ProtoStorageResponse, StorageCommand, StorageResponse,
56};
57use mz_storage_client::controller::{
58 IntrospectionType, StorageController, StorageMetadata, StorageTxn,
59};
60use mz_storage_client::storage_collections::{self, StorageCollections};
61use mz_storage_types::configuration::StorageConfiguration;
62use mz_storage_types::connections::ConnectionContext;
63use mz_storage_types::controller::StorageError;
64use mz_txn_wal::metrics::Metrics as TxnMetrics;
65use serde::Serialize;
66use timely::progress::{Antichain, Timestamp};
67use tokio::sync::mpsc;
68use uuid::Uuid;
69
70pub mod clusters;
71
72pub use mz_storage_controller::prepare_initialization;
75
76#[derive(Debug, Clone)]
78pub struct ControllerConfig {
79 pub build_info: &'static BuildInfo,
81 pub orchestrator: Arc<dyn Orchestrator>,
83 pub persist_location: PersistLocation,
85 pub persist_clients: Arc<PersistClientCache>,
89 pub clusterd_image: String,
91 pub init_container_image: Option<String>,
93 pub deploy_generation: u64,
98 pub now: NowFn,
100 pub metrics_registry: MetricsRegistry,
102 pub persist_pubsub_url: String,
104 pub secrets_args: SecretsReaderCliArgs,
106 pub connection_context: ConnectionContext,
108}
109
110#[derive(Debug)]
112pub enum ControllerResponse<T = mz_repr::Timestamp> {
113 PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
119 SubscribeResponse(GlobalId, SubscribeBatch<T>),
121 CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
123 ComputeReplicaMetrics(ReplicaId, Vec<ServiceProcessMetrics>),
125 WatchSetFinished(Vec<WatchSetId>),
129}
130
131#[derive(Debug, Default)]
134enum Readiness<T> {
135 #[default]
137 NotReady,
138 Storage,
140 Compute,
142 Metrics((ReplicaId, Vec<ServiceProcessMetrics>)),
144 Internal(ControllerResponse<T>),
146}
147
148pub struct Controller<T: ComputeControllerTimestamp = mz_repr::Timestamp> {
150 pub storage: Box<dyn StorageController<Timestamp = T>>,
151 pub storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
152 pub compute: ComputeController<T>,
153 clusterd_image: String,
155 init_container_image: Option<String>,
157 deploy_generation: u64,
159 read_only: bool,
165 orchestrator: Arc<dyn NamespacedOrchestrator>,
167 readiness: Readiness<T>,
169 metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>,
171 metrics_tx: mpsc::UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>,
173 metrics_rx: mpsc::UnboundedReceiver<(ReplicaId, Vec<ServiceProcessMetrics>)>,
175 now: NowFn,
177
178 persist_pubsub_url: String,
180
181 secrets_args: SecretsReaderCliArgs,
183
184 unfulfilled_watch_sets_by_object: BTreeMap<GlobalId, BTreeSet<WatchSetId>>,
192 unfulfilled_watch_sets: BTreeMap<WatchSetId, (BTreeSet<GlobalId>, T)>,
194 watch_set_id_gen: Gen<WatchSetId>,
196
197 immediate_watch_sets: Vec<WatchSetId>,
203}
204
205impl<T: ComputeControllerTimestamp> Controller<T> {
206 pub fn set_arrangement_exert_proportionality(&mut self, value: u32) {
207 self.compute.set_arrangement_exert_proportionality(value);
208 }
209
210 pub fn start_compute_introspection_sink(&mut self) {
215 self.compute.start_introspection_sink(&*self.storage);
216 }
217
218 pub fn connection_context(&self) -> &ConnectionContext {
222 &self.storage.config().connection_context
223 }
224
225 pub fn storage_configuration(&self) -> &StorageConfiguration {
229 self.storage.config()
230 }
231
232 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
236 let Self {
243 storage_collections: _,
244 storage: _,
245 compute,
246 clusterd_image: _,
247 init_container_image: _,
248 deploy_generation,
249 read_only,
250 orchestrator: _,
251 readiness,
252 metrics_tasks: _,
253 metrics_tx: _,
254 metrics_rx: _,
255 now: _,
256 persist_pubsub_url: _,
257 secrets_args: _,
258 unfulfilled_watch_sets_by_object: _,
259 unfulfilled_watch_sets,
260 watch_set_id_gen: _,
261 immediate_watch_sets,
262 } = self;
263
264 let compute = compute.dump().await?;
265
266 let unfulfilled_watch_sets: BTreeMap<_, _> = unfulfilled_watch_sets
267 .iter()
268 .map(|(ws_id, watches)| (format!("{ws_id:?}"), format!("{watches:?}")))
269 .collect();
270 let immediate_watch_sets: Vec<_> = immediate_watch_sets
271 .iter()
272 .map(|watch| format!("{watch:?}"))
273 .collect();
274
275 fn field(
276 key: &str,
277 value: impl Serialize,
278 ) -> Result<(String, serde_json::Value), anyhow::Error> {
279 let value = serde_json::to_value(value)?;
280 Ok((key.to_string(), value))
281 }
282
283 let map = serde_json::Map::from_iter([
284 field("compute", compute)?,
285 field("deploy_generation", deploy_generation)?,
286 field("read_only", read_only)?,
287 field("readiness", format!("{readiness:?}"))?,
288 field("unfulfilled_watch_sets", unfulfilled_watch_sets)?,
289 field("immediate_watch_sets", immediate_watch_sets)?,
290 ]);
291 Ok(serde_json::Value::Object(map))
292 }
293}
294
295impl<T> Controller<T>
296where
297 T: ComputeControllerTimestamp,
298 ComputeGrpcClient: ComputeClient<T>,
299{
300 pub fn update_orchestrator_scheduling_config(
301 &self,
302 config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
303 ) {
304 self.orchestrator.update_scheduling_config(config);
305 }
306 pub fn initialization_complete(&mut self) {
312 self.storage.initialization_complete();
313 self.compute.initialization_complete();
314 }
315
316 pub fn read_only(&self) -> bool {
318 self.read_only
319 }
320
321 fn take_internal_response(&mut self) -> Option<ControllerResponse<T>> {
325 let ws = std::mem::take(&mut self.immediate_watch_sets);
326 (!ws.is_empty()).then_some(ControllerResponse::WatchSetFinished(ws))
327 }
328
329 pub async fn ready(&mut self) {
338 if let Readiness::NotReady = self.readiness {
339 if let Some(response) = self.take_internal_response() {
346 self.readiness = Readiness::Internal(response);
347 } else {
348 tokio::select! {
351 () = self.storage.ready() => {
352 self.readiness = Readiness::Storage;
353 }
354 () = self.compute.ready() => {
355 self.readiness = Readiness::Compute;
356 }
357 Some(metrics) = self.metrics_rx.recv() => {
358 self.readiness = Readiness::Metrics(metrics);
359 }
360 }
361 }
362 }
363 }
364
365 pub fn install_compute_watch_set(
374 &mut self,
375 mut objects: BTreeSet<GlobalId>,
376 t: T,
377 ) -> WatchSetId {
378 let ws_id = self.watch_set_id_gen.allocate_id();
379
380 objects.retain(|id| {
381 let frontier = self
382 .compute
383 .collection_frontiers(*id, None)
384 .map(|f| f.write_frontier)
385 .expect("missing compute dependency");
386 frontier.less_equal(&t)
387 });
388 if objects.is_empty() {
389 self.immediate_watch_sets.push(ws_id);
390 } else {
391 for id in objects.iter() {
392 self.unfulfilled_watch_sets_by_object
393 .entry(*id)
394 .or_default()
395 .insert(ws_id);
396 }
397 self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
398 }
399
400 ws_id
401 }
402
403 pub fn install_storage_watch_set(
412 &mut self,
413 mut objects: BTreeSet<GlobalId>,
414 t: T,
415 ) -> WatchSetId {
416 let ws_id = self.watch_set_id_gen.allocate_id();
417
418 let uppers = self
419 .storage
420 .collections_frontiers(objects.iter().cloned().collect())
421 .expect("missing storage dependencies")
422 .into_iter()
423 .map(|(id, _since, upper)| (id, upper))
424 .collect::<BTreeMap<_, _>>();
425
426 objects.retain(|id| {
427 let upper = uppers.get(id).expect("missing collection");
428 upper.less_equal(&t)
429 });
430 if objects.is_empty() {
431 self.immediate_watch_sets.push(ws_id);
432 } else {
433 for id in objects.iter() {
434 self.unfulfilled_watch_sets_by_object
435 .entry(*id)
436 .or_default()
437 .insert(ws_id);
438 }
439 self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
440 }
441 ws_id
442 }
443
444 pub fn uninstall_watch_set(&mut self, ws_id: &WatchSetId) {
450 if let Some((obj_ids, _)) = self.unfulfilled_watch_sets.remove(ws_id) {
451 for obj_id in obj_ids {
452 let mut entry = match self.unfulfilled_watch_sets_by_object.entry(obj_id) {
453 Entry::Occupied(entry) => entry,
454 Entry::Vacant(_) => panic!("corrupted watchset state"),
455 };
456 entry.get_mut().remove(ws_id);
457 if entry.get().is_empty() {
458 entry.remove();
459 }
460 }
461 }
462 }
463
464 fn process_storage_response(
467 &mut self,
468 storage_metadata: &StorageMetadata,
469 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
470 let maybe_response = self.storage.process(storage_metadata)?;
471 Ok(maybe_response.and_then(
472 |mz_storage_client::controller::Response::FrontierUpdates(r)| {
473 self.handle_frontier_updates(&r)
474 },
475 ))
476 }
477
478 fn process_compute_response(&mut self) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
481 let response = self.compute.process();
482
483 let response = response.and_then(|r| match r {
484 ComputeControllerResponse::PeekNotification(uuid, peek, otel_ctx) => {
485 Some(ControllerResponse::PeekNotification(uuid, peek, otel_ctx))
486 }
487 ComputeControllerResponse::SubscribeResponse(id, tail) => {
488 Some(ControllerResponse::SubscribeResponse(id, tail))
489 }
490 ComputeControllerResponse::CopyToResponse(id, tail) => {
491 Some(ControllerResponse::CopyToResponse(id, tail))
492 }
493 ComputeControllerResponse::FrontierUpper { id, upper } => {
494 self.handle_frontier_updates(&[(id, upper)])
495 }
496 });
497 Ok(response)
498 }
499
500 #[mz_ore::instrument(level = "debug")]
508 pub fn process(
509 &mut self,
510 storage_metadata: &StorageMetadata,
511 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
512 match mem::take(&mut self.readiness) {
513 Readiness::NotReady => Ok(None),
514 Readiness::Storage => self.process_storage_response(storage_metadata),
515 Readiness::Compute => self.process_compute_response(),
516 Readiness::Metrics((id, metrics)) => self.process_replica_metrics(id, metrics),
517 Readiness::Internal(message) => Ok(Some(message)),
518 }
519 }
520
521 fn handle_frontier_updates(
525 &mut self,
526 updates: &[(GlobalId, Antichain<T>)],
527 ) -> Option<ControllerResponse<T>> {
528 let mut finished = vec![];
529 for (obj_id, antichain) in updates {
530 let ws_ids = self.unfulfilled_watch_sets_by_object.entry(*obj_id);
531 if let Entry::Occupied(mut ws_ids) = ws_ids {
532 ws_ids.get_mut().retain(|ws_id| {
533 let mut entry = match self.unfulfilled_watch_sets.entry(*ws_id) {
534 Entry::Occupied(entry) => entry,
535 Entry::Vacant(_) => panic!("corrupted watchset state"),
536 };
537 if !antichain.less_equal(&entry.get().1) {
539 entry.get_mut().0.remove(obj_id);
541 if entry.get().0.is_empty() {
543 entry.remove();
544 finished.push(*ws_id);
545 }
546 false
548 } else {
549 true
551 }
552 });
553 if ws_ids.get().is_empty() {
555 ws_ids.remove();
556 }
557 }
558 }
559 (!(finished.is_empty())).then(|| ControllerResponse::WatchSetFinished(finished))
560 }
561
562 fn process_replica_metrics(
563 &mut self,
564 id: ReplicaId,
565 metrics: Vec<ServiceProcessMetrics>,
566 ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
567 self.record_replica_metrics(id, &metrics);
568 Ok(Some(ControllerResponse::ComputeReplicaMetrics(id, metrics)))
569 }
570
571 fn record_replica_metrics(&mut self, replica_id: ReplicaId, metrics: &[ServiceProcessMetrics]) {
572 if self.read_only() {
573 return;
574 }
575
576 let now = mz_ore::now::to_datetime((self.now)());
577 let now_tz = now.try_into().expect("must fit");
578
579 let replica_id = replica_id.to_string();
580 let mut row = Row::default();
581 let updates = metrics
582 .iter()
583 .zip(0..)
584 .map(|(m, process_id)| {
585 row.packer().extend(&[
586 Datum::String(&replica_id),
587 Datum::UInt64(process_id),
588 m.cpu_nano_cores.into(),
589 m.memory_bytes.into(),
590 m.disk_usage_bytes.into(),
591 Datum::TimestampTz(now_tz),
592 ]);
593 (row.clone(), mz_repr::Diff::ONE)
594 })
595 .collect();
596
597 self.storage
598 .append_introspection_updates(IntrospectionType::ReplicaMetricsHistory, updates);
599 }
600
601 pub async fn determine_real_time_recent_timestamp(
612 &self,
613 ids: BTreeSet<GlobalId>,
614 timeout: Duration,
615 ) -> Result<BoxFuture<'static, Result<T, StorageError<T>>>, StorageError<T>> {
616 self.storage.real_time_recent_timestamp(ids, timeout).await
617 }
618}
619
620impl<T> Controller<T>
621where
622 T: Timestamp
624 + Codec64
625 + From<EpochMillis>
626 + TimestampManipulation
627 + std::fmt::Display
628 + Into<mz_repr::Timestamp>,
629 StorageCommand<T>: RustType<ProtoStorageCommand>,
630 StorageResponse<T>: RustType<ProtoStorageResponse>,
631 ComputeGrpcClient: ComputeClient<T>,
632 T: ComputeControllerTimestamp,
634{
635 #[instrument(name = "controller::new")]
643 pub async fn new(
644 config: ControllerConfig,
645 envd_epoch: NonZeroI64,
646 read_only: bool,
647 storage_txn: &dyn StorageTxn<T>,
648 ) -> Self {
649 if read_only {
650 tracing::info!("starting controllers in read-only mode!");
651 }
652
653 let now_fn = config.now.clone();
654 let wallclock_lag: WallclockLagFn<_> = Arc::new(move |time: &T| {
655 let now = mz_repr::Timestamp::new(now_fn());
656 let time_ts: mz_repr::Timestamp = time.clone().into();
657 let lag_ts = now.saturating_sub(time_ts);
658 Duration::from(lag_ts)
659 });
660
661 let controller_metrics = ControllerMetrics::new(&config.metrics_registry);
662
663 let txns_metrics = Arc::new(TxnMetrics::new(&config.metrics_registry));
664 let collections_ctl = storage_collections::StorageCollectionsImpl::new(
665 config.persist_location.clone(),
666 Arc::clone(&config.persist_clients),
667 &config.metrics_registry,
668 config.now.clone(),
669 Arc::clone(&txns_metrics),
670 envd_epoch,
671 read_only,
672 config.connection_context.clone(),
673 storage_txn,
674 )
675 .await;
676
677 let collections_ctl: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync> =
678 Arc::new(collections_ctl);
679
680 let storage_controller = mz_storage_controller::Controller::new(
681 config.build_info,
682 config.persist_location,
683 config.persist_clients,
684 config.now.clone(),
685 Arc::clone(&wallclock_lag),
686 Arc::clone(&txns_metrics),
687 envd_epoch,
688 read_only,
689 &config.metrics_registry,
690 controller_metrics.clone(),
691 config.connection_context,
692 storage_txn,
693 Arc::clone(&collections_ctl),
694 )
695 .await;
696
697 let storage_collections = Arc::clone(&collections_ctl);
698 let compute_controller = ComputeController::new(
699 config.build_info,
700 storage_collections,
701 envd_epoch,
702 read_only,
703 &config.metrics_registry,
704 controller_metrics,
705 config.now.clone(),
706 wallclock_lag,
707 );
708 let (metrics_tx, metrics_rx) = mpsc::unbounded_channel();
709
710 let this = Self {
711 storage: Box::new(storage_controller),
712 storage_collections: collections_ctl,
713 compute: compute_controller,
714 clusterd_image: config.clusterd_image,
715 init_container_image: config.init_container_image,
716 deploy_generation: config.deploy_generation,
717 read_only,
718 orchestrator: config.orchestrator.namespace("cluster"),
719 readiness: Readiness::NotReady,
720 metrics_tasks: BTreeMap::new(),
721 metrics_tx,
722 metrics_rx,
723 now: config.now,
724 persist_pubsub_url: config.persist_pubsub_url,
725 secrets_args: config.secrets_args,
726 unfulfilled_watch_sets_by_object: BTreeMap::new(),
727 unfulfilled_watch_sets: BTreeMap::new(),
728 watch_set_id_gen: Gen::default(),
729 immediate_watch_sets: Vec::new(),
730 };
731
732 if !this.read_only {
733 this.remove_past_generation_replicas_in_background();
734 }
735
736 this
737 }
738}