Skip to main content

mz_controller/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A representative of STORAGE and COMPUTE that maintains summaries of the involved objects.
11//!
12//! The `Controller` provides the ability to create and manipulate storage and compute instances.
13//! Each of Storage and Compute provide their own controllers, accessed through the `storage()`
14//! and `compute(instance_id)` methods. It is an error to access a compute instance before it has
15//! been created.
16//!
17//! The controller also provides a `recv()` method that returns responses from the storage and
18//! compute layers, which may remain of value to the interested user. With time, these responses
19//! may be thinned down in an effort to make the controller more self contained.
20//!
21//! Consult the `StorageController` and `ComputeController` documentation for more information
22//! about each of these interfaces.
23
24use std::collections::btree_map::Entry;
25use std::collections::{BTreeMap, BTreeSet};
26use std::mem;
27use std::num::NonZeroI64;
28use std::sync::Arc;
29use std::time::Duration;
30
31use futures::future::BoxFuture;
32use mz_build_info::BuildInfo;
33use mz_cluster_client::metrics::ControllerMetrics;
34use mz_cluster_client::{ReplicaId, WallclockLagFn};
35use mz_compute_client::controller::error::{CollectionLookupError, CollectionMissing};
36use mz_compute_client::controller::{
37    ComputeController, ComputeControllerResponse, PeekNotification,
38};
39use mz_compute_client::protocol::response::SubscribeBatch;
40use mz_controller_types::WatchSetId;
41use mz_dyncfg::{ConfigSet, ConfigUpdates};
42use mz_orchestrator::{NamespacedOrchestrator, Orchestrator, ServiceProcessMetrics};
43use mz_ore::cast::CastFrom;
44use mz_ore::id_gen::Gen;
45use mz_ore::instrument;
46use mz_ore::metrics::MetricsRegistry;
47use mz_ore::now::NowFn;
48use mz_ore::task::AbortOnDropHandle;
49use mz_ore::tracing::OpenTelemetryContext;
50use mz_persist_client::PersistLocation;
51use mz_persist_client::cache::PersistClientCache;
52use mz_repr::{Datum, GlobalId, Row, Timestamp};
53use mz_service::secrets::SecretsReaderCliArgs;
54use mz_storage_client::controller::{
55    IntrospectionType, StorageController, StorageMetadata, StorageTxn,
56};
57use mz_storage_client::storage_collections::{self, StorageCollections};
58use mz_storage_types::configuration::StorageConfiguration;
59use mz_storage_types::connections::ConnectionContext;
60use mz_storage_types::controller::StorageError;
61use mz_txn_wal::metrics::Metrics as TxnMetrics;
62use timely::progress::Antichain;
63use tokio::sync::mpsc;
64use uuid::Uuid;
65
66pub mod clusters;
67pub mod replica_http_locator;
68
69// Export this on behalf of the storage controller to provide a unified
70// interface, allowing other crates to depend on this crate alone.
71pub use mz_storage_controller::prepare_initialization;
72pub use replica_http_locator::ReplicaHttpLocator;
73
74/// Configures a controller.
75#[derive(Debug, Clone)]
76pub struct ControllerConfig {
77    /// The build information for this process.
78    pub build_info: &'static BuildInfo,
79    /// The orchestrator implementation to use.
80    pub orchestrator: Arc<dyn Orchestrator>,
81    /// The persist location where all storage collections will be written to.
82    pub persist_location: PersistLocation,
83    /// A process-global cache of (blob_uri, consensus_uri) ->
84    /// PersistClient.
85    /// This is intentionally shared between workers.
86    pub persist_clients: Arc<PersistClientCache>,
87    /// The clusterd image to use when starting new cluster processes.
88    pub clusterd_image: String,
89    /// The init container image to use for clusterd.
90    pub init_container_image: Option<String>,
91    /// A number representing the environment's generation.
92    ///
93    /// This is incremented to request that the new process perform a graceful
94    /// transition of power from the prior generation.
95    pub deploy_generation: u64,
96    /// The now function to advance the controller's introspection collections.
97    pub now: NowFn,
98    /// The metrics registry.
99    pub metrics_registry: MetricsRegistry,
100    /// The URL for Persist PubSub.
101    pub persist_pubsub_url: String,
102    /// Arguments for secrets readers.
103    pub secrets_args: SecretsReaderCliArgs,
104    /// The connection context, to thread through to clusterd, with cli flags.
105    pub connection_context: ConnectionContext,
106    /// Locator for HTTP addresses of cluster replicas, used to proxy HTTP
107    /// requests from environmentd to clusterd.
108    pub replica_http_locator: Arc<ReplicaHttpLocator>,
109}
110
111/// Responses that [`Controller`] can produce.
112#[derive(Debug)]
113pub enum ControllerResponse {
114    /// Notification of a worker's response to a specified (by connection id) peek.
115    ///
116    /// Additionally, an `OpenTelemetryContext` to forward trace information
117    /// back into coord. This allows coord traces to be children of work
118    /// done in compute!
119    PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
120    /// The worker's next response to a specified subscribe.
121    SubscribeResponse(GlobalId, SubscribeBatch),
122    /// The worker's next response to a specified copy to.
123    CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
124    /// Notification that a watch set has finished. See
125    /// [`Controller::install_compute_watch_set`] and
126    /// [`Controller::install_storage_watch_set`] for details.
127    WatchSetFinished(Vec<WatchSetId>),
128}
129
130/// Whether one of the underlying controllers is ready for their `process`
131/// method to be called.
132#[derive(Debug, Default)]
133pub enum Readiness {
134    /// No underlying controllers are ready.
135    #[default]
136    NotReady,
137    /// The storage controller is ready.
138    Storage,
139    /// The compute controller is ready.
140    Compute,
141    /// A batch of metric data is ready.
142    Metrics((ReplicaId, Vec<ServiceProcessMetrics>)),
143    /// An internally-generated message is ready to be returned.
144    Internal(ControllerResponse),
145}
146
147/// A client that maintains soft state and validates commands, in addition to forwarding them.
148pub struct Controller {
149    pub storage: Box<dyn StorageController>,
150    pub storage_collections: Arc<dyn StorageCollections + Send + Sync>,
151    pub compute: ComputeController,
152    /// The clusterd image to use when starting new cluster processes.
153    clusterd_image: String,
154    /// The init container image to use for clusterd.
155    init_container_image: Option<String>,
156    /// A number representing the environment's generation.
157    deploy_generation: u64,
158    /// Whether or not this controller is in read-only mode.
159    ///
160    /// When in read-only mode, neither this controller nor the instances
161    /// controlled by it are allowed to affect changes to external systems
162    /// (largely persist).
163    read_only: bool,
164    /// The cluster orchestrator.
165    orchestrator: Arc<dyn NamespacedOrchestrator>,
166    /// Tracks the readiness of the underlying controllers.
167    readiness: Readiness,
168    /// Tasks for collecting replica metrics.
169    metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>,
170    /// Sender for the channel over which replica metrics are sent.
171    metrics_tx: mpsc::UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>,
172    /// Receiver for the channel over which replica metrics are sent.
173    metrics_rx: mpsc::UnboundedReceiver<(ReplicaId, Vec<ServiceProcessMetrics>)>,
174    /// A function providing the current wallclock time.
175    now: NowFn,
176
177    /// The URL for Persist PubSub.
178    persist_pubsub_url: String,
179
180    /// Arguments for secrets readers.
181    secrets_args: SecretsReaderCliArgs,
182
183    /// A map associating a global ID to the set of all the unfulfilled watch
184    /// set ids that include it.
185    ///
186    /// See [`Controller::install_compute_watch_set`]/[`Controller::install_storage_watch_set`] for a description of watch sets.
187    // When a watch set is fulfilled for a given object (that is, when
188    // the object's frontier advances to at least the watch set's
189    // timestamp), the corresponding entry will be removed from the set.
190    unfulfilled_watch_sets_by_object: BTreeMap<GlobalId, BTreeSet<WatchSetId>>,
191    /// A map of installed watch sets indexed by id.
192    unfulfilled_watch_sets: BTreeMap<WatchSetId, (BTreeSet<GlobalId>, Timestamp)>,
193    /// A sequence of numbers used to mint unique WatchSetIds.
194    watch_set_id_gen: Gen<WatchSetId>,
195
196    /// A list of watch sets that were already fulfilled as soon as
197    /// they were installed, and thus that must be returned to the
198    /// client on the next call to [`Controller::process_compute_response`]/[`Controller::process_storage_response`].
199    ///
200    /// See [`Controller::install_compute_watch_set`]/[`Controller::install_storage_watch_set`] for a description of watch sets.
201    immediate_watch_sets: Vec<WatchSetId>,
202
203    /// Dynamic system configuration.
204    dyncfg: ConfigSet,
205
206    /// Locator for HTTP addresses of cluster replicas.
207    replica_http_locator: Arc<ReplicaHttpLocator>,
208}
209
210impl Controller {
211    /// Update the controller configuration.
212    pub fn update_configuration(&mut self, updates: ConfigUpdates) {
213        updates.apply(&self.dyncfg);
214    }
215
216    /// Start sinking the compute controller's introspection data into storage.
217    ///
218    /// This method should be called once the introspection collections have been registered with
219    /// the storage controller. It will panic if invoked earlier than that.
220    pub fn start_compute_introspection_sink(&mut self) {
221        self.compute.start_introspection_sink(&*self.storage);
222    }
223
224    /// Returns the connection context installed in the controller.
225    ///
226    /// This is purely a helper, and can be obtained from `self.storage`.
227    pub fn connection_context(&self) -> &ConnectionContext {
228        &self.storage.config().connection_context
229    }
230
231    /// Returns the storage configuration installed in the storage controller.
232    ///
233    /// This is purely a helper, and can be obtained from `self.storage`.
234    pub fn storage_configuration(&self) -> &StorageConfiguration {
235        self.storage.config()
236    }
237
238    /// Returns the state of the [`Controller`] formatted as JSON.
239    ///
240    /// The returned value is not guaranteed to be stable and may change at any point in time.
241    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
242        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
243        // returned object as a tradeoff between usability and stability. `serde_json` will fail
244        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
245        // prevents a future unrelated change from silently breaking this method.
246
247        // Destructure `self` here so we don't forget to consider dumping newly added fields.
248        let Self {
249            storage_collections,
250            storage,
251            compute,
252            clusterd_image: _,
253            init_container_image: _,
254            deploy_generation,
255            read_only,
256            orchestrator: _,
257            readiness,
258            metrics_tasks: _,
259            metrics_tx: _,
260            metrics_rx: _,
261            now: _,
262            persist_pubsub_url: _,
263            secrets_args: _,
264            unfulfilled_watch_sets_by_object: _,
265            unfulfilled_watch_sets,
266            watch_set_id_gen: _,
267            immediate_watch_sets,
268            dyncfg: _,
269            replica_http_locator: _,
270        } = self;
271
272        let storage_collections = storage_collections.dump()?;
273        let storage = storage.dump()?;
274        let compute = compute.dump().await?;
275
276        let unfulfilled_watch_sets: BTreeMap<_, _> = unfulfilled_watch_sets
277            .iter()
278            .map(|(ws_id, watches)| (format!("{ws_id:?}"), format!("{watches:?}")))
279            .collect();
280        let immediate_watch_sets: Vec<_> = immediate_watch_sets
281            .iter()
282            .map(|watch| format!("{watch:?}"))
283            .collect();
284
285        Ok(serde_json::json!({
286            "storage_collections": storage_collections,
287            "storage": storage,
288            "compute": compute,
289            "deploy_generation": deploy_generation,
290            "read_only": read_only,
291            "readiness": format!("{readiness:?}"),
292            "unfulfilled_watch_sets": unfulfilled_watch_sets,
293            "immediate_watch_sets": immediate_watch_sets,
294        }))
295    }
296}
297
298impl Controller {
299    pub fn update_orchestrator_scheduling_config(
300        &self,
301        config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
302    ) {
303        self.orchestrator.update_scheduling_config(config);
304    }
305    /// Marks the end of any initialization commands.
306    ///
307    /// The implementor may wait for this method to be called before implementing prior commands,
308    /// and so it is important for a user to invoke this method as soon as it is comfortable.
309    /// This method can be invoked immediately, at the potential expense of performance.
310    pub fn initialization_complete(&mut self) {
311        self.storage.initialization_complete();
312        self.compute.initialization_complete();
313    }
314
315    /// Reports whether the controller is in read only mode.
316    pub fn read_only(&self) -> bool {
317        self.read_only
318    }
319
320    /// Returns `Some` if there is an immediately available
321    /// internally-generated response that we need to return to the
322    /// client (as opposed to waiting for a response from compute or storage).
323    fn take_internal_response(&mut self) -> Option<ControllerResponse> {
324        let ws = std::mem::take(&mut self.immediate_watch_sets);
325        (!ws.is_empty()).then_some(ControllerResponse::WatchSetFinished(ws))
326    }
327
328    /// Waits until the controller is ready to process a response.
329    ///
330    /// This method may block for an arbitrarily long time.
331    ///
332    /// When the method returns, the owner should call [`Controller::ready`] to
333    /// process the ready message.
334    ///
335    /// This method is cancellation safe.
336    pub async fn ready(&mut self) {
337        if let Readiness::NotReady = self.readiness {
338            // the coordinator wants to be able to make a simple
339            // sequence of ready, process, ready, process, .... calls,
340            // but the controller sometimes has responses immediately
341            // ready to be processed and should do so before calling
342            // into either of the lower-level controllers. This `if`
343            // statement handles that case.
344            if let Some(response) = self.take_internal_response() {
345                self.readiness = Readiness::Internal(response);
346            } else {
347                // The underlying `ready` methods are cancellation safe, so it is
348                // safe to construct this `select!`.
349                tokio::select! {
350                    () = self.storage.ready() => {
351                        self.readiness = Readiness::Storage;
352                    }
353                    () = self.compute.ready() => {
354                        self.readiness = Readiness::Compute;
355                    }
356                    Some(metrics) = self.metrics_rx.recv() => {
357                        self.readiness = Readiness::Metrics(metrics);
358                    }
359                }
360            }
361        }
362    }
363
364    /// Returns the [Readiness] status of this controller.
365    pub fn get_readiness(&self) -> &Readiness {
366        &self.readiness
367    }
368
369    /// Install a _watch set_ in the controller.
370    ///
371    /// A _watch set_ is a request to be informed by the controller when
372    /// all of the frontiers of a particular set of objects have advanced at
373    /// least to a particular timestamp.
374    ///
375    /// When all the objects in `objects` have advanced to `t`, the watchset id
376    /// is returned to the client on the next call to [`Self::process`].
377    pub fn install_compute_watch_set(
378        &mut self,
379        mut objects: BTreeSet<GlobalId>,
380        t: Timestamp,
381    ) -> Result<WatchSetId, CollectionLookupError> {
382        let ws_id = self.watch_set_id_gen.allocate_id();
383
384        // Collect all frontiers first, returning any errors
385        let frontiers: BTreeMap<GlobalId, _> = objects
386            .iter()
387            .map(|id| {
388                self.compute
389                    .collection_frontiers(*id, None)
390                    .map(|f| (*id, f.write_frontier))
391            })
392            .collect::<Result<_, _>>()?;
393        objects.retain(|id| {
394            let frontier = frontiers.get(id).expect("just collected");
395            frontier.less_equal(&t)
396        });
397        if objects.is_empty() {
398            self.immediate_watch_sets.push(ws_id);
399        } else {
400            for id in objects.iter() {
401                self.unfulfilled_watch_sets_by_object
402                    .entry(*id)
403                    .or_default()
404                    .insert(ws_id);
405            }
406            self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
407        }
408
409        Ok(ws_id)
410    }
411
412    /// Install a _watch set_ in the controller.
413    ///
414    /// A _watch set_ is a request to be informed by the controller when
415    /// all of the frontiers of a particular set of objects have advanced at
416    /// least to a particular timestamp.
417    ///
418    /// When all the objects in `objects` have advanced to `t`, the watchset id
419    /// is returned to the client on the next call to [`Self::process`].
420    pub fn install_storage_watch_set(
421        &mut self,
422        mut objects: BTreeSet<GlobalId>,
423        t: Timestamp,
424    ) -> Result<WatchSetId, CollectionMissing> {
425        let ws_id = self.watch_set_id_gen.allocate_id();
426
427        let uppers = self
428            .storage
429            .collections_frontiers(objects.iter().cloned().collect())?
430            .into_iter()
431            .map(|(id, _since, upper)| (id, upper))
432            .collect::<BTreeMap<_, _>>();
433
434        objects.retain(|id| {
435            let upper = uppers.get(id).expect("missing collection");
436            upper.less_equal(&t)
437        });
438        if objects.is_empty() {
439            self.immediate_watch_sets.push(ws_id);
440        } else {
441            for id in objects.iter() {
442                self.unfulfilled_watch_sets_by_object
443                    .entry(*id)
444                    .or_default()
445                    .insert(ws_id);
446            }
447            self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
448        }
449        Ok(ws_id)
450    }
451
452    /// Uninstalls a previously installed WatchSetId. The method is a no-op if the watch set has
453    /// already finished and therefore it's safe to call this function unconditionally.
454    ///
455    /// # Panics
456    /// This method panics if called with a WatchSetId that was never returned by the function.
457    pub fn uninstall_watch_set(&mut self, ws_id: &WatchSetId) {
458        if let Some((obj_ids, _)) = self.unfulfilled_watch_sets.remove(ws_id) {
459            for obj_id in obj_ids {
460                let mut entry = match self.unfulfilled_watch_sets_by_object.entry(obj_id) {
461                    Entry::Occupied(entry) => entry,
462                    Entry::Vacant(_) => panic!("corrupted watchset state"),
463                };
464                entry.get_mut().remove(ws_id);
465                if entry.get().is_empty() {
466                    entry.remove();
467                }
468            }
469        }
470    }
471
472    /// Process a pending response from the storage controller. If necessary,
473    /// return a higher-level response to our client.
474    fn process_storage_response(
475        &mut self,
476        storage_metadata: &StorageMetadata,
477    ) -> Result<Option<ControllerResponse>, anyhow::Error> {
478        let maybe_response = self.storage.process(storage_metadata)?;
479        Ok(maybe_response.and_then(
480            |mz_storage_client::controller::Response::FrontierUpdates(r)| {
481                self.handle_frontier_updates(&r)
482            },
483        ))
484    }
485
486    /// Process a pending response from the compute controller. If necessary,
487    /// return a higher-level response to our client.
488    fn process_compute_response(&mut self) -> Result<Option<ControllerResponse>, anyhow::Error> {
489        let response = self.compute.process();
490
491        let response = response.and_then(|r| match r {
492            ComputeControllerResponse::PeekNotification(uuid, peek, otel_ctx) => {
493                Some(ControllerResponse::PeekNotification(uuid, peek, otel_ctx))
494            }
495            ComputeControllerResponse::SubscribeResponse(id, tail) => {
496                Some(ControllerResponse::SubscribeResponse(id, tail))
497            }
498            ComputeControllerResponse::CopyToResponse(id, tail) => {
499                Some(ControllerResponse::CopyToResponse(id, tail))
500            }
501            ComputeControllerResponse::FrontierUpper { id, upper } => {
502                self.handle_frontier_updates(&[(id, upper)])
503            }
504        });
505        Ok(response)
506    }
507
508    /// Processes the work queued by [`Controller::ready`].
509    ///
510    /// This method is guaranteed to return "quickly" unless doing so would
511    /// compromise the correctness of the system.
512    ///
513    /// This method is **not** guaranteed to be cancellation safe. It **must**
514    /// be awaited to completion.
515    #[mz_ore::instrument(level = "debug")]
516    pub fn process(
517        &mut self,
518        storage_metadata: &StorageMetadata,
519    ) -> Result<Option<ControllerResponse>, anyhow::Error> {
520        match mem::take(&mut self.readiness) {
521            Readiness::NotReady => Ok(None),
522            Readiness::Storage => self.process_storage_response(storage_metadata),
523            Readiness::Compute => self.process_compute_response(),
524            Readiness::Metrics((id, metrics)) => self.process_replica_metrics(id, metrics),
525            Readiness::Internal(message) => Ok(Some(message)),
526        }
527    }
528
529    /// Record updates to frontiers, and propagate any necessary responses.
530    /// As of this writing (2/29/2024), the only response that can be generated
531    /// from a frontier update is `WatchSetCompleted`.
532    fn handle_frontier_updates(
533        &mut self,
534        updates: &[(GlobalId, Antichain<Timestamp>)],
535    ) -> Option<ControllerResponse> {
536        let mut finished = vec![];
537        for (obj_id, antichain) in updates {
538            let ws_ids = self.unfulfilled_watch_sets_by_object.entry(*obj_id);
539            if let Entry::Occupied(mut ws_ids) = ws_ids {
540                ws_ids.get_mut().retain(|ws_id| {
541                    let mut entry = match self.unfulfilled_watch_sets.entry(*ws_id) {
542                        Entry::Occupied(entry) => entry,
543                        Entry::Vacant(_) => panic!("corrupted watchset state"),
544                    };
545                    // If this object has made more progress than required by this watchset we:
546                    if !antichain.less_equal(&entry.get().1) {
547                        // 1. Remove the object from the set of pending objects for the watchset
548                        entry.get_mut().0.remove(obj_id);
549                        // 2. Mark the watchset as finished if this was the last watched object
550                        if entry.get().0.is_empty() {
551                            entry.remove();
552                            finished.push(*ws_id);
553                        }
554                        // 3. Remove the watchset from the set of pending watchsets for the object
555                        false
556                    } else {
557                        // Otherwise we keep the watchset around to re-check in the future
558                        true
559                    }
560                });
561                // Clear the entry if this was the last watchset that was interested in obj_id
562                if ws_ids.get().is_empty() {
563                    ws_ids.remove();
564                }
565            }
566        }
567        (!(finished.is_empty())).then(|| ControllerResponse::WatchSetFinished(finished))
568    }
569
570    fn process_replica_metrics(
571        &mut self,
572        id: ReplicaId,
573        metrics: Vec<ServiceProcessMetrics>,
574    ) -> Result<Option<ControllerResponse>, anyhow::Error> {
575        self.record_replica_metrics(id, &metrics);
576        Ok(None)
577    }
578
579    fn record_replica_metrics(&mut self, replica_id: ReplicaId, metrics: &[ServiceProcessMetrics]) {
580        if self.read_only() {
581            return;
582        }
583
584        let now = mz_ore::now::to_datetime((self.now)());
585        let now_tz = now.try_into().expect("must fit");
586
587        let replica_id = replica_id.to_string();
588        let mut row = Row::default();
589        let updates = metrics
590            .iter()
591            .enumerate()
592            .map(|(process_id, m)| {
593                row.packer().extend(&[
594                    Datum::String(&replica_id),
595                    Datum::UInt64(u64::cast_from(process_id)),
596                    m.cpu_nano_cores.into(),
597                    m.memory_bytes.into(),
598                    m.disk_bytes.into(),
599                    Datum::TimestampTz(now_tz),
600                    m.heap_bytes.into(),
601                    m.heap_limit.into(),
602                ]);
603                (row.clone(), mz_repr::Diff::ONE)
604            })
605            .collect();
606
607        self.storage
608            .append_introspection_updates(IntrospectionType::ReplicaMetricsHistory, updates);
609    }
610
611    /// Determine the "real-time recency" timestamp for all `ids`.
612    ///
613    /// Real-time recency is defined as the minimum value of `T` that all
614    /// objects can be queried at to return all data visible in the upstream
615    /// system the query was issued. In this case, "the upstream systems" are
616    /// any user sources that connect to objects outside of Materialize, such as
617    /// Kafka sources.
618    ///
619    /// If no items in `ids` connect to external systems, this function will
620    /// return `Ok(T::minimum)`.
621    pub async fn determine_real_time_recent_timestamp(
622        &self,
623        ids: BTreeSet<GlobalId>,
624        timeout: Duration,
625    ) -> Result<BoxFuture<'static, Result<Timestamp, StorageError>>, StorageError> {
626        self.storage.real_time_recent_timestamp(ids, timeout).await
627    }
628}
629
630impl Controller {
631    /// Creates a new controller.
632    ///
633    /// For correctness, this function expects to have access to the mutations
634    /// to the `storage_txn` that occurred in [`prepare_initialization`].
635    ///
636    /// # Panics
637    /// If this function is called before [`prepare_initialization`].
638    #[instrument(name = "controller::new")]
639    pub async fn new(
640        config: ControllerConfig,
641        envd_epoch: NonZeroI64,
642        read_only: bool,
643        storage_txn: &dyn StorageTxn,
644    ) -> Self {
645        if read_only {
646            tracing::info!("starting controllers in read-only mode!");
647        }
648
649        let now_fn = config.now.clone();
650        let wallclock_lag_fn = WallclockLagFn::new(now_fn);
651
652        let controller_metrics = ControllerMetrics::new(&config.metrics_registry);
653
654        let txns_metrics = Arc::new(TxnMetrics::new(&config.metrics_registry));
655        let collections_ctl = storage_collections::StorageCollectionsImpl::new(
656            config.persist_location.clone(),
657            Arc::clone(&config.persist_clients),
658            &config.metrics_registry,
659            config.now.clone(),
660            Arc::clone(&txns_metrics),
661            envd_epoch,
662            read_only,
663            config.connection_context.clone(),
664            storage_txn,
665        )
666        .await;
667
668        let collections_ctl: Arc<dyn StorageCollections + Send + Sync> = Arc::new(collections_ctl);
669
670        let storage_controller = mz_storage_controller::Controller::new(
671            config.build_info,
672            config.persist_location.clone(),
673            config.persist_clients,
674            config.now.clone(),
675            wallclock_lag_fn.clone(),
676            Arc::clone(&txns_metrics),
677            read_only,
678            &config.metrics_registry,
679            controller_metrics.clone(),
680            config.connection_context,
681            storage_txn,
682            Arc::clone(&collections_ctl),
683        )
684        .await;
685
686        let storage_collections = Arc::clone(&collections_ctl);
687        let compute_controller = ComputeController::new(
688            config.build_info,
689            storage_collections,
690            read_only,
691            &config.metrics_registry,
692            config.persist_location,
693            controller_metrics,
694            config.now.clone(),
695            wallclock_lag_fn,
696        );
697        let (metrics_tx, metrics_rx) = mpsc::unbounded_channel();
698
699        let this = Self {
700            storage: Box::new(storage_controller),
701            storage_collections: collections_ctl,
702            compute: compute_controller,
703            clusterd_image: config.clusterd_image,
704            init_container_image: config.init_container_image,
705            deploy_generation: config.deploy_generation,
706            read_only,
707            orchestrator: config.orchestrator.namespace("cluster"),
708            readiness: Readiness::NotReady,
709            metrics_tasks: BTreeMap::new(),
710            metrics_tx,
711            metrics_rx,
712            now: config.now,
713            persist_pubsub_url: config.persist_pubsub_url,
714            secrets_args: config.secrets_args,
715            unfulfilled_watch_sets_by_object: BTreeMap::new(),
716            unfulfilled_watch_sets: BTreeMap::new(),
717            watch_set_id_gen: Gen::default(),
718            immediate_watch_sets: Vec::new(),
719            dyncfg: mz_dyncfgs::all_dyncfgs(),
720            replica_http_locator: config.replica_http_locator,
721        };
722
723        if !this.read_only {
724            this.remove_past_generation_replicas_in_background();
725        }
726
727        this
728    }
729}