mz_controller/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A representative of STORAGE and COMPUTE that maintains summaries of the involved objects.
11//!
12//! The `Controller` provides the ability to create and manipulate storage and compute instances.
13//! Each of Storage and Compute provide their own controllers, accessed through the `storage()`
14//! and `compute(instance_id)` methods. It is an error to access a compute instance before it has
15//! been created.
16//!
17//! The controller also provides a `recv()` method that returns responses from the storage and
18//! compute layers, which may remain of value to the interested user. With time, these responses
19//! may be thinned down in an effort to make the controller more self contained.
20//!
21//! Consult the `StorageController` and `ComputeController` documentation for more information
22//! about each of these interfaces.
23
24use std::collections::btree_map::Entry;
25use std::collections::{BTreeMap, BTreeSet};
26use std::mem;
27use std::num::NonZeroI64;
28use std::sync::Arc;
29use std::time::Duration;
30
31use futures::future::BoxFuture;
32use mz_build_info::BuildInfo;
33use mz_cluster_client::metrics::ControllerMetrics;
34use mz_cluster_client::{ReplicaId, WallclockLagFn};
35use mz_compute_client::controller::{
36    ComputeController, ComputeControllerResponse, ComputeControllerTimestamp, PeekNotification,
37};
38use mz_compute_client::protocol::response::SubscribeBatch;
39use mz_controller_types::WatchSetId;
40use mz_dyncfg::{ConfigSet, ConfigUpdates};
41use mz_orchestrator::{NamespacedOrchestrator, Orchestrator, ServiceProcessMetrics};
42use mz_ore::cast::CastFrom;
43use mz_ore::id_gen::Gen;
44use mz_ore::instrument;
45use mz_ore::metrics::MetricsRegistry;
46use mz_ore::now::{EpochMillis, NowFn};
47use mz_ore::task::AbortOnDropHandle;
48use mz_ore::tracing::OpenTelemetryContext;
49use mz_persist_client::PersistLocation;
50use mz_persist_client::cache::PersistClientCache;
51use mz_persist_types::Codec64;
52use mz_repr::{Datum, GlobalId, Row, TimestampManipulation};
53use mz_service::secrets::SecretsReaderCliArgs;
54use mz_storage_client::controller::{
55    IntrospectionType, StorageController, StorageMetadata, StorageTxn,
56};
57use mz_storage_client::storage_collections::{self, StorageCollections};
58use mz_storage_types::configuration::StorageConfiguration;
59use mz_storage_types::connections::ConnectionContext;
60use mz_storage_types::controller::StorageError;
61use mz_txn_wal::metrics::Metrics as TxnMetrics;
62use timely::progress::{Antichain, Timestamp};
63use tokio::sync::mpsc;
64use uuid::Uuid;
65
66pub mod clusters;
67
68// Export this on behalf of the storage controller to provide a unified
69// interface, allowing other crates to depend on this crate alone.
70pub use mz_storage_controller::prepare_initialization;
71
72/// Configures a controller.
73#[derive(Debug, Clone)]
74pub struct ControllerConfig {
75    /// The build information for this process.
76    pub build_info: &'static BuildInfo,
77    /// The orchestrator implementation to use.
78    pub orchestrator: Arc<dyn Orchestrator>,
79    /// The persist location where all storage collections will be written to.
80    pub persist_location: PersistLocation,
81    /// A process-global cache of (blob_uri, consensus_uri) ->
82    /// PersistClient.
83    /// This is intentionally shared between workers.
84    pub persist_clients: Arc<PersistClientCache>,
85    /// The clusterd image to use when starting new cluster processes.
86    pub clusterd_image: String,
87    /// The init container image to use for clusterd.
88    pub init_container_image: Option<String>,
89    /// A number representing the environment's generation.
90    ///
91    /// This is incremented to request that the new process perform a graceful
92    /// transition of power from the prior generation.
93    pub deploy_generation: u64,
94    /// The now function to advance the controller's introspection collections.
95    pub now: NowFn,
96    /// The metrics registry.
97    pub metrics_registry: MetricsRegistry,
98    /// The URL for Persist PubSub.
99    pub persist_pubsub_url: String,
100    /// Arguments for secrets readers.
101    pub secrets_args: SecretsReaderCliArgs,
102    /// The connection context, to thread through to clusterd, with cli flags.
103    pub connection_context: ConnectionContext,
104}
105
106/// Responses that [`Controller`] can produce.
107#[derive(Debug)]
108pub enum ControllerResponse<T = mz_repr::Timestamp> {
109    /// Notification of a worker's response to a specified (by connection id) peek.
110    ///
111    /// Additionally, an `OpenTelemetryContext` to forward trace information
112    /// back into coord. This allows coord traces to be children of work
113    /// done in compute!
114    PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
115    /// The worker's next response to a specified subscribe.
116    SubscribeResponse(GlobalId, SubscribeBatch<T>),
117    /// The worker's next response to a specified copy to.
118    CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
119    /// Notification that a watch set has finished. See
120    /// [`Controller::install_compute_watch_set`] and
121    /// [`Controller::install_storage_watch_set`] for details.
122    WatchSetFinished(Vec<WatchSetId>),
123}
124
125/// Whether one of the underlying controllers is ready for their `process`
126/// method to be called.
127#[derive(Debug, Default)]
128pub enum Readiness<T> {
129    /// No underlying controllers are ready.
130    #[default]
131    NotReady,
132    /// The storage controller is ready.
133    Storage,
134    /// The compute controller is ready.
135    Compute,
136    /// A batch of metric data is ready.
137    Metrics((ReplicaId, Vec<ServiceProcessMetrics>)),
138    /// An internally-generated message is ready to be returned.
139    Internal(ControllerResponse<T>),
140}
141
142/// A client that maintains soft state and validates commands, in addition to forwarding them.
143pub struct Controller<T: ComputeControllerTimestamp = mz_repr::Timestamp> {
144    pub storage: Box<dyn StorageController<Timestamp = T>>,
145    pub storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
146    pub compute: ComputeController<T>,
147    /// The clusterd image to use when starting new cluster processes.
148    clusterd_image: String,
149    /// The init container image to use for clusterd.
150    init_container_image: Option<String>,
151    /// A number representing the environment's generation.
152    deploy_generation: u64,
153    /// Whether or not this controller is in read-only mode.
154    ///
155    /// When in read-only mode, neither this controller nor the instances
156    /// controlled by it are allowed to affect changes to external systems
157    /// (largely persist).
158    read_only: bool,
159    /// The cluster orchestrator.
160    orchestrator: Arc<dyn NamespacedOrchestrator>,
161    /// Tracks the readiness of the underlying controllers.
162    readiness: Readiness<T>,
163    /// Tasks for collecting replica metrics.
164    metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>,
165    /// Sender for the channel over which replica metrics are sent.
166    metrics_tx: mpsc::UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>,
167    /// Receiver for the channel over which replica metrics are sent.
168    metrics_rx: mpsc::UnboundedReceiver<(ReplicaId, Vec<ServiceProcessMetrics>)>,
169    /// A function providing the current wallclock time.
170    now: NowFn,
171
172    /// The URL for Persist PubSub.
173    persist_pubsub_url: String,
174
175    /// Arguments for secrets readers.
176    secrets_args: SecretsReaderCliArgs,
177
178    /// A map associating a global ID to the set of all the unfulfilled watch
179    /// set ids that include it.
180    ///
181    /// See [`Controller::install_compute_watch_set`]/[`Controller::install_storage_watch_set`] for a description of watch sets.
182    // When a watch set is fulfilled for a given object (that is, when
183    // the object's frontier advances to at least the watch set's
184    // timestamp), the corresponding entry will be removed from the set.
185    unfulfilled_watch_sets_by_object: BTreeMap<GlobalId, BTreeSet<WatchSetId>>,
186    /// A map of installed watch sets indexed by id.
187    unfulfilled_watch_sets: BTreeMap<WatchSetId, (BTreeSet<GlobalId>, T)>,
188    /// A sequence of numbers used to mint unique WatchSetIds.
189    watch_set_id_gen: Gen<WatchSetId>,
190
191    /// A list of watch sets that were already fulfilled as soon as
192    /// they were installed, and thus that must be returned to the
193    /// client on the next call to [`Controller::process_compute_response`]/[`Controller::process_storage_response`].
194    ///
195    /// See [`Controller::install_compute_watch_set`]/[`Controller::install_storage_watch_set`] for a description of watch sets.
196    immediate_watch_sets: Vec<WatchSetId>,
197
198    /// Dynamic system configuration.
199    dyncfg: ConfigSet,
200}
201
202impl<T: ComputeControllerTimestamp> Controller<T> {
203    /// Update the controller configuration.
204    pub fn update_configuration(&mut self, updates: ConfigUpdates) {
205        updates.apply(&self.dyncfg);
206    }
207
208    /// Start sinking the compute controller's introspection data into storage.
209    ///
210    /// This method should be called once the introspection collections have been registered with
211    /// the storage controller. It will panic if invoked earlier than that.
212    pub fn start_compute_introspection_sink(&mut self) {
213        self.compute.start_introspection_sink(&*self.storage);
214    }
215
216    /// Returns the connection context installed in the controller.
217    ///
218    /// This is purely a helper, and can be obtained from `self.storage`.
219    pub fn connection_context(&self) -> &ConnectionContext {
220        &self.storage.config().connection_context
221    }
222
223    /// Returns the storage configuration installed in the storage controller.
224    ///
225    /// This is purely a helper, and can be obtained from `self.storage`.
226    pub fn storage_configuration(&self) -> &StorageConfiguration {
227        self.storage.config()
228    }
229
230    /// Returns the state of the [`Controller`] formatted as JSON.
231    ///
232    /// The returned value is not guaranteed to be stable and may change at any point in time.
233    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
234        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
235        // returned object as a tradeoff between usability and stability. `serde_json` will fail
236        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
237        // prevents a future unrelated change from silently breaking this method.
238
239        // Destructure `self` here so we don't forget to consider dumping newly added fields.
240        let Self {
241            storage_collections,
242            storage,
243            compute,
244            clusterd_image: _,
245            init_container_image: _,
246            deploy_generation,
247            read_only,
248            orchestrator: _,
249            readiness,
250            metrics_tasks: _,
251            metrics_tx: _,
252            metrics_rx: _,
253            now: _,
254            persist_pubsub_url: _,
255            secrets_args: _,
256            unfulfilled_watch_sets_by_object: _,
257            unfulfilled_watch_sets,
258            watch_set_id_gen: _,
259            immediate_watch_sets,
260            dyncfg: _,
261        } = self;
262
263        let storage_collections = storage_collections.dump()?;
264        let storage = storage.dump()?;
265        let compute = compute.dump().await?;
266
267        let unfulfilled_watch_sets: BTreeMap<_, _> = unfulfilled_watch_sets
268            .iter()
269            .map(|(ws_id, watches)| (format!("{ws_id:?}"), format!("{watches:?}")))
270            .collect();
271        let immediate_watch_sets: Vec<_> = immediate_watch_sets
272            .iter()
273            .map(|watch| format!("{watch:?}"))
274            .collect();
275
276        Ok(serde_json::json!({
277            "storage_collections": storage_collections,
278            "storage": storage,
279            "compute": compute,
280            "deploy_generation": deploy_generation,
281            "read_only": read_only,
282            "readiness": format!("{readiness:?}"),
283            "unfulfilled_watch_sets": unfulfilled_watch_sets,
284            "immediate_watch_sets": immediate_watch_sets,
285        }))
286    }
287}
288
289impl<T> Controller<T>
290where
291    T: ComputeControllerTimestamp,
292{
293    pub fn update_orchestrator_scheduling_config(
294        &self,
295        config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
296    ) {
297        self.orchestrator.update_scheduling_config(config);
298    }
299    /// Marks the end of any initialization commands.
300    ///
301    /// The implementor may wait for this method to be called before implementing prior commands,
302    /// and so it is important for a user to invoke this method as soon as it is comfortable.
303    /// This method can be invoked immediately, at the potential expense of performance.
304    pub fn initialization_complete(&mut self) {
305        self.storage.initialization_complete();
306        self.compute.initialization_complete();
307    }
308
309    /// Reports whether the controller is in read only mode.
310    pub fn read_only(&self) -> bool {
311        self.read_only
312    }
313
314    /// Returns `Some` if there is an immediately available
315    /// internally-generated response that we need to return to the
316    /// client (as opposed to waiting for a response from compute or storage).
317    fn take_internal_response(&mut self) -> Option<ControllerResponse<T>> {
318        let ws = std::mem::take(&mut self.immediate_watch_sets);
319        (!ws.is_empty()).then_some(ControllerResponse::WatchSetFinished(ws))
320    }
321
322    /// Waits until the controller is ready to process a response.
323    ///
324    /// This method may block for an arbitrarily long time.
325    ///
326    /// When the method returns, the owner should call [`Controller::ready`] to
327    /// process the ready message.
328    ///
329    /// This method is cancellation safe.
330    pub async fn ready(&mut self) {
331        if let Readiness::NotReady = self.readiness {
332            // the coordinator wants to be able to make a simple
333            // sequence of ready, process, ready, process, .... calls,
334            // but the controller sometimes has responses immediately
335            // ready to be processed and should do so before calling
336            // into either of the lower-level controllers. This `if`
337            // statement handles that case.
338            if let Some(response) = self.take_internal_response() {
339                self.readiness = Readiness::Internal(response);
340            } else {
341                // The underlying `ready` methods are cancellation safe, so it is
342                // safe to construct this `select!`.
343                tokio::select! {
344                    () = self.storage.ready() => {
345                        self.readiness = Readiness::Storage;
346                    }
347                    () = self.compute.ready() => {
348                        self.readiness = Readiness::Compute;
349                    }
350                    Some(metrics) = self.metrics_rx.recv() => {
351                        self.readiness = Readiness::Metrics(metrics);
352                    }
353                }
354            }
355        }
356    }
357
358    /// Returns the [Readiness] status of this controller.
359    pub fn get_readiness(&self) -> &Readiness<T> {
360        &self.readiness
361    }
362
363    /// Install a _watch set_ in the controller.
364    ///
365    /// A _watch set_ is a request to be informed by the controller when
366    /// all of the frontiers of a particular set of objects have advanced at
367    /// least to a particular timestamp.
368    ///
369    /// When all the objects in `objects` have advanced to `t`, the watchset id
370    /// is returned to the client on the next call to [`Self::process`].
371    pub fn install_compute_watch_set(
372        &mut self,
373        mut objects: BTreeSet<GlobalId>,
374        t: T,
375    ) -> WatchSetId {
376        let ws_id = self.watch_set_id_gen.allocate_id();
377
378        objects.retain(|id| {
379            let frontier = self
380                .compute
381                .collection_frontiers(*id, None)
382                .map(|f| f.write_frontier)
383                .expect("missing compute dependency");
384            frontier.less_equal(&t)
385        });
386        if objects.is_empty() {
387            self.immediate_watch_sets.push(ws_id);
388        } else {
389            for id in objects.iter() {
390                self.unfulfilled_watch_sets_by_object
391                    .entry(*id)
392                    .or_default()
393                    .insert(ws_id);
394            }
395            self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
396        }
397
398        ws_id
399    }
400
401    /// Install a _watch set_ in the controller.
402    ///
403    /// A _watch set_ is a request to be informed by the controller when
404    /// all of the frontiers of a particular set of objects have advanced at
405    /// least to a particular timestamp.
406    ///
407    /// When all the objects in `objects` have advanced to `t`, the watchset id
408    /// is returned to the client on the next call to [`Self::process`].
409    pub fn install_storage_watch_set(
410        &mut self,
411        mut objects: BTreeSet<GlobalId>,
412        t: T,
413    ) -> WatchSetId {
414        let ws_id = self.watch_set_id_gen.allocate_id();
415
416        let uppers = self
417            .storage
418            .collections_frontiers(objects.iter().cloned().collect())
419            .expect("missing storage dependencies")
420            .into_iter()
421            .map(|(id, _since, upper)| (id, upper))
422            .collect::<BTreeMap<_, _>>();
423
424        objects.retain(|id| {
425            let upper = uppers.get(id).expect("missing collection");
426            upper.less_equal(&t)
427        });
428        if objects.is_empty() {
429            self.immediate_watch_sets.push(ws_id);
430        } else {
431            for id in objects.iter() {
432                self.unfulfilled_watch_sets_by_object
433                    .entry(*id)
434                    .or_default()
435                    .insert(ws_id);
436            }
437            self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
438        }
439        ws_id
440    }
441
442    /// Uninstalls a previously installed WatchSetId. The method is a no-op if the watch set has
443    /// already finished and therefore it's safe to call this function unconditionally.
444    ///
445    /// # Panics
446    /// This method panics if called with a WatchSetId that was never returned by the function.
447    pub fn uninstall_watch_set(&mut self, ws_id: &WatchSetId) {
448        if let Some((obj_ids, _)) = self.unfulfilled_watch_sets.remove(ws_id) {
449            for obj_id in obj_ids {
450                let mut entry = match self.unfulfilled_watch_sets_by_object.entry(obj_id) {
451                    Entry::Occupied(entry) => entry,
452                    Entry::Vacant(_) => panic!("corrupted watchset state"),
453                };
454                entry.get_mut().remove(ws_id);
455                if entry.get().is_empty() {
456                    entry.remove();
457                }
458            }
459        }
460    }
461
462    /// Process a pending response from the storage controller. If necessary,
463    /// return a higher-level response to our client.
464    fn process_storage_response(
465        &mut self,
466        storage_metadata: &StorageMetadata,
467    ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
468        let maybe_response = self.storage.process(storage_metadata)?;
469        Ok(maybe_response.and_then(
470            |mz_storage_client::controller::Response::FrontierUpdates(r)| {
471                self.handle_frontier_updates(&r)
472            },
473        ))
474    }
475
476    /// Process a pending response from the compute controller. If necessary,
477    /// return a higher-level response to our client.
478    fn process_compute_response(&mut self) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
479        let response = self.compute.process();
480
481        let response = response.and_then(|r| match r {
482            ComputeControllerResponse::PeekNotification(uuid, peek, otel_ctx) => {
483                Some(ControllerResponse::PeekNotification(uuid, peek, otel_ctx))
484            }
485            ComputeControllerResponse::SubscribeResponse(id, tail) => {
486                Some(ControllerResponse::SubscribeResponse(id, tail))
487            }
488            ComputeControllerResponse::CopyToResponse(id, tail) => {
489                Some(ControllerResponse::CopyToResponse(id, tail))
490            }
491            ComputeControllerResponse::FrontierUpper { id, upper } => {
492                self.handle_frontier_updates(&[(id, upper)])
493            }
494        });
495        Ok(response)
496    }
497
498    /// Processes the work queued by [`Controller::ready`].
499    ///
500    /// This method is guaranteed to return "quickly" unless doing so would
501    /// compromise the correctness of the system.
502    ///
503    /// This method is **not** guaranteed to be cancellation safe. It **must**
504    /// be awaited to completion.
505    #[mz_ore::instrument(level = "debug")]
506    pub fn process(
507        &mut self,
508        storage_metadata: &StorageMetadata,
509    ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
510        match mem::take(&mut self.readiness) {
511            Readiness::NotReady => Ok(None),
512            Readiness::Storage => self.process_storage_response(storage_metadata),
513            Readiness::Compute => self.process_compute_response(),
514            Readiness::Metrics((id, metrics)) => self.process_replica_metrics(id, metrics),
515            Readiness::Internal(message) => Ok(Some(message)),
516        }
517    }
518
519    /// Record updates to frontiers, and propagate any necessary responses.
520    /// As of this writing (2/29/2024), the only response that can be generated
521    /// from a frontier update is `WatchSetCompleted`.
522    fn handle_frontier_updates(
523        &mut self,
524        updates: &[(GlobalId, Antichain<T>)],
525    ) -> Option<ControllerResponse<T>> {
526        let mut finished = vec![];
527        for (obj_id, antichain) in updates {
528            let ws_ids = self.unfulfilled_watch_sets_by_object.entry(*obj_id);
529            if let Entry::Occupied(mut ws_ids) = ws_ids {
530                ws_ids.get_mut().retain(|ws_id| {
531                    let mut entry = match self.unfulfilled_watch_sets.entry(*ws_id) {
532                        Entry::Occupied(entry) => entry,
533                        Entry::Vacant(_) => panic!("corrupted watchset state"),
534                    };
535                    // If this object has made more progress than required by this watchset we:
536                    if !antichain.less_equal(&entry.get().1) {
537                        // 1. Remove the object from the set of pending objects for the watchset
538                        entry.get_mut().0.remove(obj_id);
539                        // 2. Mark the watchset as finished if this was the last watched object
540                        if entry.get().0.is_empty() {
541                            entry.remove();
542                            finished.push(*ws_id);
543                        }
544                        // 3. Remove the watchset from the set of pending watchsets for the object
545                        false
546                    } else {
547                        // Otherwise we keep the watchset around to re-check in the future
548                        true
549                    }
550                });
551                // Clear the entry if this was the last watchset that was interested in obj_id
552                if ws_ids.get().is_empty() {
553                    ws_ids.remove();
554                }
555            }
556        }
557        (!(finished.is_empty())).then(|| ControllerResponse::WatchSetFinished(finished))
558    }
559
560    fn process_replica_metrics(
561        &mut self,
562        id: ReplicaId,
563        metrics: Vec<ServiceProcessMetrics>,
564    ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
565        self.record_replica_metrics(id, &metrics);
566        Ok(None)
567    }
568
569    fn record_replica_metrics(&mut self, replica_id: ReplicaId, metrics: &[ServiceProcessMetrics]) {
570        if self.read_only() {
571            return;
572        }
573
574        let now = mz_ore::now::to_datetime((self.now)());
575        let now_tz = now.try_into().expect("must fit");
576
577        let replica_id = replica_id.to_string();
578        let mut row = Row::default();
579        let updates = metrics
580            .iter()
581            .enumerate()
582            .map(|(process_id, m)| {
583                row.packer().extend(&[
584                    Datum::String(&replica_id),
585                    Datum::UInt64(u64::cast_from(process_id)),
586                    m.cpu_nano_cores.into(),
587                    m.memory_bytes.into(),
588                    m.disk_bytes.into(),
589                    Datum::TimestampTz(now_tz),
590                    m.heap_bytes.into(),
591                    m.heap_limit.into(),
592                ]);
593                (row.clone(), mz_repr::Diff::ONE)
594            })
595            .collect();
596
597        self.storage
598            .append_introspection_updates(IntrospectionType::ReplicaMetricsHistory, updates);
599    }
600
601    /// Determine the "real-time recency" timestamp for all `ids`.
602    ///
603    /// Real-time recency is defined as the minimum value of `T` that all
604    /// objects can be queried at to return all data visible in the upstream
605    /// system the query was issued. In this case, "the upstream systems" are
606    /// any user sources that connect to objects outside of Materialize, such as
607    /// Kafka sources.
608    ///
609    /// If no items in `ids` connect to external systems, this function will
610    /// return `Ok(T::minimum)`.
611    pub async fn determine_real_time_recent_timestamp(
612        &self,
613        ids: BTreeSet<GlobalId>,
614        timeout: Duration,
615    ) -> Result<BoxFuture<'static, Result<T, StorageError<T>>>, StorageError<T>> {
616        self.storage.real_time_recent_timestamp(ids, timeout).await
617    }
618}
619
620impl<T> Controller<T>
621where
622    // Bounds needed by `StorageController` and/or `Controller`:
623    T: Timestamp
624        + Codec64
625        + From<EpochMillis>
626        + TimestampManipulation
627        + std::fmt::Display
628        + Into<mz_repr::Timestamp>,
629    // Bounds needed by `ComputeController`:
630    T: ComputeControllerTimestamp,
631{
632    /// Creates a new controller.
633    ///
634    /// For correctness, this function expects to have access to the mutations
635    /// to the `storage_txn` that occurred in [`prepare_initialization`].
636    ///
637    /// # Panics
638    /// If this function is called before [`prepare_initialization`].
639    #[instrument(name = "controller::new")]
640    pub async fn new(
641        config: ControllerConfig,
642        envd_epoch: NonZeroI64,
643        read_only: bool,
644        storage_txn: &dyn StorageTxn<T>,
645    ) -> Self {
646        if read_only {
647            tracing::info!("starting controllers in read-only mode!");
648        }
649
650        let now_fn = config.now.clone();
651        let wallclock_lag_fn = WallclockLagFn::new(now_fn);
652
653        let controller_metrics = ControllerMetrics::new(&config.metrics_registry);
654
655        let txns_metrics = Arc::new(TxnMetrics::new(&config.metrics_registry));
656        let collections_ctl = storage_collections::StorageCollectionsImpl::new(
657            config.persist_location.clone(),
658            Arc::clone(&config.persist_clients),
659            &config.metrics_registry,
660            config.now.clone(),
661            Arc::clone(&txns_metrics),
662            envd_epoch,
663            read_only,
664            config.connection_context.clone(),
665            storage_txn,
666        )
667        .await;
668
669        let collections_ctl: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync> =
670            Arc::new(collections_ctl);
671
672        let storage_controller = mz_storage_controller::Controller::new(
673            config.build_info,
674            config.persist_location.clone(),
675            config.persist_clients,
676            config.now.clone(),
677            wallclock_lag_fn.clone(),
678            Arc::clone(&txns_metrics),
679            read_only,
680            &config.metrics_registry,
681            controller_metrics.clone(),
682            config.connection_context,
683            storage_txn,
684            Arc::clone(&collections_ctl),
685        )
686        .await;
687
688        let storage_collections = Arc::clone(&collections_ctl);
689        let compute_controller = ComputeController::new(
690            config.build_info,
691            storage_collections,
692            read_only,
693            &config.metrics_registry,
694            config.persist_location,
695            controller_metrics,
696            config.now.clone(),
697            wallclock_lag_fn,
698        );
699        let (metrics_tx, metrics_rx) = mpsc::unbounded_channel();
700
701        let this = Self {
702            storage: Box::new(storage_controller),
703            storage_collections: collections_ctl,
704            compute: compute_controller,
705            clusterd_image: config.clusterd_image,
706            init_container_image: config.init_container_image,
707            deploy_generation: config.deploy_generation,
708            read_only,
709            orchestrator: config.orchestrator.namespace("cluster"),
710            readiness: Readiness::NotReady,
711            metrics_tasks: BTreeMap::new(),
712            metrics_tx,
713            metrics_rx,
714            now: config.now,
715            persist_pubsub_url: config.persist_pubsub_url,
716            secrets_args: config.secrets_args,
717            unfulfilled_watch_sets_by_object: BTreeMap::new(),
718            unfulfilled_watch_sets: BTreeMap::new(),
719            watch_set_id_gen: Gen::default(),
720            immediate_watch_sets: Vec::new(),
721            dyncfg: mz_dyncfgs::all_dyncfgs(),
722        };
723
724        if !this.read_only {
725            this.remove_past_generation_replicas_in_background();
726        }
727
728        this
729    }
730}