mz_controller/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A representative of STORAGE and COMPUTE that maintains summaries of the involved objects.
11//!
12//! The `Controller` provides the ability to create and manipulate storage and compute instances.
13//! Each of Storage and Compute provide their own controllers, accessed through the `storage()`
14//! and `compute(instance_id)` methods. It is an error to access a compute instance before it has
15//! been created.
16//!
17//! The controller also provides a `recv()` method that returns responses from the storage and
18//! compute layers, which may remain of value to the interested user. With time, these responses
19//! may be thinned down in an effort to make the controller more self contained.
20//!
21//! Consult the `StorageController` and `ComputeController` documentation for more information
22//! about each of these interfaces.
23
24use std::collections::btree_map::Entry;
25use std::collections::{BTreeMap, BTreeSet};
26use std::mem;
27use std::num::NonZeroI64;
28use std::sync::Arc;
29use std::time::Duration;
30
31use futures::future::BoxFuture;
32use mz_build_info::BuildInfo;
33use mz_cluster_client::metrics::ControllerMetrics;
34use mz_cluster_client::{ReplicaId, WallclockLagFn};
35use mz_compute_client::controller::{
36    ComputeController, ComputeControllerResponse, ComputeControllerTimestamp, PeekNotification,
37};
38use mz_compute_client::protocol::response::SubscribeBatch;
39use mz_compute_client::service::{ComputeClient, ComputeGrpcClient};
40use mz_controller_types::WatchSetId;
41use mz_orchestrator::{NamespacedOrchestrator, Orchestrator, ServiceProcessMetrics};
42use mz_ore::id_gen::Gen;
43use mz_ore::instrument;
44use mz_ore::metrics::MetricsRegistry;
45use mz_ore::now::{EpochMillis, NowFn};
46use mz_ore::task::AbortOnDropHandle;
47use mz_ore::tracing::OpenTelemetryContext;
48use mz_persist_client::PersistLocation;
49use mz_persist_client::cache::PersistClientCache;
50use mz_persist_types::Codec64;
51use mz_proto::RustType;
52use mz_repr::{Datum, GlobalId, Row, TimestampManipulation};
53use mz_service::secrets::SecretsReaderCliArgs;
54use mz_storage_client::client::{
55    ProtoStorageCommand, ProtoStorageResponse, StorageCommand, StorageResponse,
56};
57use mz_storage_client::controller::{
58    IntrospectionType, StorageController, StorageMetadata, StorageTxn,
59};
60use mz_storage_client::storage_collections::{self, StorageCollections};
61use mz_storage_types::configuration::StorageConfiguration;
62use mz_storage_types::connections::ConnectionContext;
63use mz_storage_types::controller::StorageError;
64use mz_txn_wal::metrics::Metrics as TxnMetrics;
65use serde::Serialize;
66use timely::progress::{Antichain, Timestamp};
67use tokio::sync::mpsc;
68use uuid::Uuid;
69
70pub mod clusters;
71
72// Export this on behalf of the storage controller to provide a unified
73// interface, allowing other crates to depend on this crate alone.
74pub use mz_storage_controller::prepare_initialization;
75
76/// Configures a controller.
77#[derive(Debug, Clone)]
78pub struct ControllerConfig {
79    /// The build information for this process.
80    pub build_info: &'static BuildInfo,
81    /// The orchestrator implementation to use.
82    pub orchestrator: Arc<dyn Orchestrator>,
83    /// The persist location where all storage collections will be written to.
84    pub persist_location: PersistLocation,
85    /// A process-global cache of (blob_uri, consensus_uri) ->
86    /// PersistClient.
87    /// This is intentionally shared between workers.
88    pub persist_clients: Arc<PersistClientCache>,
89    /// The clusterd image to use when starting new cluster processes.
90    pub clusterd_image: String,
91    /// The init container image to use for clusterd.
92    pub init_container_image: Option<String>,
93    /// A number representing the environment's generation.
94    ///
95    /// This is incremented to request that the new process perform a graceful
96    /// transition of power from the prior generation.
97    pub deploy_generation: u64,
98    /// The now function to advance the controller's introspection collections.
99    pub now: NowFn,
100    /// The metrics registry.
101    pub metrics_registry: MetricsRegistry,
102    /// The URL for Persist PubSub.
103    pub persist_pubsub_url: String,
104    /// Arguments for secrets readers.
105    pub secrets_args: SecretsReaderCliArgs,
106    /// The connection context, to thread through to clusterd, with cli flags.
107    pub connection_context: ConnectionContext,
108}
109
110/// Responses that [`Controller`] can produce.
111#[derive(Debug)]
112pub enum ControllerResponse<T = mz_repr::Timestamp> {
113    /// Notification of a worker's response to a specified (by connection id) peek.
114    ///
115    /// Additionally, an `OpenTelemetryContext` to forward trace information
116    /// back into coord. This allows coord traces to be children of work
117    /// done in compute!
118    PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
119    /// The worker's next response to a specified subscribe.
120    SubscribeResponse(GlobalId, SubscribeBatch<T>),
121    /// The worker's next response to a specified copy to.
122    CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
123    /// Notification that new resource usage metrics are available for a given replica.
124    ComputeReplicaMetrics(ReplicaId, Vec<ServiceProcessMetrics>),
125    /// Notification that a watch set has finished. See
126    /// [`Controller::install_compute_watch_set`] and
127    /// [`Controller::install_storage_watch_set`] for details.
128    WatchSetFinished(Vec<WatchSetId>),
129}
130
131/// Whether one of the underlying controllers is ready for their `process`
132/// method to be called.
133#[derive(Debug, Default)]
134enum Readiness<T> {
135    /// No underlying controllers are ready.
136    #[default]
137    NotReady,
138    /// The storage controller is ready.
139    Storage,
140    /// The compute controller is ready.
141    Compute,
142    /// A batch of metric data is ready.
143    Metrics((ReplicaId, Vec<ServiceProcessMetrics>)),
144    /// An internally-generated message is ready to be returned.
145    Internal(ControllerResponse<T>),
146}
147
148/// A client that maintains soft state and validates commands, in addition to forwarding them.
149pub struct Controller<T: ComputeControllerTimestamp = mz_repr::Timestamp> {
150    pub storage: Box<dyn StorageController<Timestamp = T>>,
151    pub storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
152    pub compute: ComputeController<T>,
153    /// The clusterd image to use when starting new cluster processes.
154    clusterd_image: String,
155    /// The init container image to use for clusterd.
156    init_container_image: Option<String>,
157    /// A number representing the environment's generation.
158    deploy_generation: u64,
159    /// Whether or not this controller is in read-only mode.
160    ///
161    /// When in read-only mode, neither this controller nor the instances
162    /// controlled by it are allowed to affect changes to external systems
163    /// (largely persist).
164    read_only: bool,
165    /// The cluster orchestrator.
166    orchestrator: Arc<dyn NamespacedOrchestrator>,
167    /// Tracks the readiness of the underlying controllers.
168    readiness: Readiness<T>,
169    /// Tasks for collecting replica metrics.
170    metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>,
171    /// Sender for the channel over which replica metrics are sent.
172    metrics_tx: mpsc::UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>,
173    /// Receiver for the channel over which replica metrics are sent.
174    metrics_rx: mpsc::UnboundedReceiver<(ReplicaId, Vec<ServiceProcessMetrics>)>,
175    /// A function providing the current wallclock time.
176    now: NowFn,
177
178    /// The URL for Persist PubSub.
179    persist_pubsub_url: String,
180
181    /// Arguments for secrets readers.
182    secrets_args: SecretsReaderCliArgs,
183
184    /// A map associating a global ID to the set of all the unfulfilled watch
185    /// set ids that include it.
186    ///
187    /// See [`self.install_watch_set`] for a description of watch sets.
188    // When a watch set is fulfilled for a given object (that is, when
189    // the object's frontier advances to at least the watch set's
190    // timestamp), the corresponding entry will be removed from the set.
191    unfulfilled_watch_sets_by_object: BTreeMap<GlobalId, BTreeSet<WatchSetId>>,
192    /// A map of installed watch sets indexed by id.
193    unfulfilled_watch_sets: BTreeMap<WatchSetId, (BTreeSet<GlobalId>, T)>,
194    /// A sequence of numbers used to mint unique WatchSetIds.
195    watch_set_id_gen: Gen<WatchSetId>,
196
197    /// A list of watch sets that were already fulfilled as soon as
198    /// they were installed, and thus that must be returned to the
199    /// client on the next call to [`self.process`].
200    ///
201    /// See [`self.install_watch_set`] for a description of watch sets.
202    immediate_watch_sets: Vec<WatchSetId>,
203}
204
205impl<T: ComputeControllerTimestamp> Controller<T> {
206    pub fn set_arrangement_exert_proportionality(&mut self, value: u32) {
207        self.compute.set_arrangement_exert_proportionality(value);
208    }
209
210    /// Start sinking the compute controller's introspection data into storage.
211    ///
212    /// This method should be called once the introspection collections have been registered with
213    /// the storage controller. It will panic if invoked earlier than that.
214    pub fn start_compute_introspection_sink(&mut self) {
215        self.compute.start_introspection_sink(&*self.storage);
216    }
217
218    /// Returns the connection context installed in the controller.
219    ///
220    /// This is purely a helper, and can be obtained from `self.storage`.
221    pub fn connection_context(&self) -> &ConnectionContext {
222        &self.storage.config().connection_context
223    }
224
225    /// Returns the storage configuration installed in the storage controller.
226    ///
227    /// This is purely a helper, and can be obtained from `self.storage`.
228    pub fn storage_configuration(&self) -> &StorageConfiguration {
229        self.storage.config()
230    }
231
232    /// Returns the state of the [`Controller`] formatted as JSON.
233    ///
234    /// The returned value is not guaranteed to be stable and may change at any point in time.
235    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
236        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
237        // returned object as a tradeoff between usability and stability. `serde_json` will fail
238        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
239        // prevents a future unrelated change from silently breaking this method.
240
241        // Destructure `self` here so we don't forget to consider dumping newly added fields.
242        let Self {
243            storage_collections: _,
244            storage: _,
245            compute,
246            clusterd_image: _,
247            init_container_image: _,
248            deploy_generation,
249            read_only,
250            orchestrator: _,
251            readiness,
252            metrics_tasks: _,
253            metrics_tx: _,
254            metrics_rx: _,
255            now: _,
256            persist_pubsub_url: _,
257            secrets_args: _,
258            unfulfilled_watch_sets_by_object: _,
259            unfulfilled_watch_sets,
260            watch_set_id_gen: _,
261            immediate_watch_sets,
262        } = self;
263
264        let compute = compute.dump().await?;
265
266        let unfulfilled_watch_sets: BTreeMap<_, _> = unfulfilled_watch_sets
267            .iter()
268            .map(|(ws_id, watches)| (format!("{ws_id:?}"), format!("{watches:?}")))
269            .collect();
270        let immediate_watch_sets: Vec<_> = immediate_watch_sets
271            .iter()
272            .map(|watch| format!("{watch:?}"))
273            .collect();
274
275        fn field(
276            key: &str,
277            value: impl Serialize,
278        ) -> Result<(String, serde_json::Value), anyhow::Error> {
279            let value = serde_json::to_value(value)?;
280            Ok((key.to_string(), value))
281        }
282
283        let map = serde_json::Map::from_iter([
284            field("compute", compute)?,
285            field("deploy_generation", deploy_generation)?,
286            field("read_only", read_only)?,
287            field("readiness", format!("{readiness:?}"))?,
288            field("unfulfilled_watch_sets", unfulfilled_watch_sets)?,
289            field("immediate_watch_sets", immediate_watch_sets)?,
290        ]);
291        Ok(serde_json::Value::Object(map))
292    }
293}
294
295impl<T> Controller<T>
296where
297    T: ComputeControllerTimestamp,
298    ComputeGrpcClient: ComputeClient<T>,
299{
300    pub fn update_orchestrator_scheduling_config(
301        &self,
302        config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
303    ) {
304        self.orchestrator.update_scheduling_config(config);
305    }
306    /// Marks the end of any initialization commands.
307    ///
308    /// The implementor may wait for this method to be called before implementing prior commands,
309    /// and so it is important for a user to invoke this method as soon as it is comfortable.
310    /// This method can be invoked immediately, at the potential expense of performance.
311    pub fn initialization_complete(&mut self) {
312        self.storage.initialization_complete();
313        self.compute.initialization_complete();
314    }
315
316    /// Reports whether the controller is in read only mode.
317    pub fn read_only(&self) -> bool {
318        self.read_only
319    }
320
321    /// Returns `Some` if there is an immediately available
322    /// internally-generated response that we need to return to the
323    /// client (as opposed to waiting for a response from compute or storage).
324    fn take_internal_response(&mut self) -> Option<ControllerResponse<T>> {
325        let ws = std::mem::take(&mut self.immediate_watch_sets);
326        (!ws.is_empty()).then_some(ControllerResponse::WatchSetFinished(ws))
327    }
328
329    /// Waits until the controller is ready to process a response.
330    ///
331    /// This method may block for an arbitrarily long time.
332    ///
333    /// When the method returns, the owner should call [`Controller::ready`] to
334    /// process the ready message.
335    ///
336    /// This method is cancellation safe.
337    pub async fn ready(&mut self) {
338        if let Readiness::NotReady = self.readiness {
339            // the coordinator wants to be able to make a simple
340            // sequence of ready, process, ready, process, .... calls,
341            // but the controller sometimes has responses immediately
342            // ready to be processed and should do so before calling
343            // into either of the lower-level controllers. This `if`
344            // statement handles that case.
345            if let Some(response) = self.take_internal_response() {
346                self.readiness = Readiness::Internal(response);
347            } else {
348                // The underlying `ready` methods are cancellation safe, so it is
349                // safe to construct this `select!`.
350                tokio::select! {
351                    () = self.storage.ready() => {
352                        self.readiness = Readiness::Storage;
353                    }
354                    () = self.compute.ready() => {
355                        self.readiness = Readiness::Compute;
356                    }
357                    Some(metrics) = self.metrics_rx.recv() => {
358                        self.readiness = Readiness::Metrics(metrics);
359                    }
360                }
361            }
362        }
363    }
364
365    /// Install a _watch set_ in the controller.
366    ///
367    /// A _watch set_ is a request to be informed by the controller when
368    /// all of the frontiers of a particular set of objects have advanced at
369    /// least to a particular timestamp.
370    ///
371    /// When all the objects in `objects` have advanced to `t`, the watchset id
372    /// is returned to the client on the next call to [`Self::process`].
373    pub fn install_compute_watch_set(
374        &mut self,
375        mut objects: BTreeSet<GlobalId>,
376        t: T,
377    ) -> WatchSetId {
378        let ws_id = self.watch_set_id_gen.allocate_id();
379
380        objects.retain(|id| {
381            let frontier = self
382                .compute
383                .collection_frontiers(*id, None)
384                .map(|f| f.write_frontier)
385                .expect("missing compute dependency");
386            frontier.less_equal(&t)
387        });
388        if objects.is_empty() {
389            self.immediate_watch_sets.push(ws_id);
390        } else {
391            for id in objects.iter() {
392                self.unfulfilled_watch_sets_by_object
393                    .entry(*id)
394                    .or_default()
395                    .insert(ws_id);
396            }
397            self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
398        }
399
400        ws_id
401    }
402
403    /// Install a _watch set_ in the controller.
404    ///
405    /// A _watch set_ is a request to be informed by the controller when
406    /// all of the frontiers of a particular set of objects have advanced at
407    /// least to a particular timestamp.
408    ///
409    /// When all the objects in `objects` have advanced to `t`, the watchset id
410    /// is returned to the client on the next call to [`Self::process`].
411    pub fn install_storage_watch_set(
412        &mut self,
413        mut objects: BTreeSet<GlobalId>,
414        t: T,
415    ) -> WatchSetId {
416        let ws_id = self.watch_set_id_gen.allocate_id();
417
418        let uppers = self
419            .storage
420            .collections_frontiers(objects.iter().cloned().collect())
421            .expect("missing storage dependencies")
422            .into_iter()
423            .map(|(id, _since, upper)| (id, upper))
424            .collect::<BTreeMap<_, _>>();
425
426        objects.retain(|id| {
427            let upper = uppers.get(id).expect("missing collection");
428            upper.less_equal(&t)
429        });
430        if objects.is_empty() {
431            self.immediate_watch_sets.push(ws_id);
432        } else {
433            for id in objects.iter() {
434                self.unfulfilled_watch_sets_by_object
435                    .entry(*id)
436                    .or_default()
437                    .insert(ws_id);
438            }
439            self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
440        }
441        ws_id
442    }
443
444    /// Uninstalls a previously installed WatchSetId. The method is a no-op if the watch set has
445    /// already finished and therefore it's safe to call this function unconditionally.
446    ///
447    /// # Panics
448    /// This method panics if called with a WatchSetId that was never returned by the function.
449    pub fn uninstall_watch_set(&mut self, ws_id: &WatchSetId) {
450        if let Some((obj_ids, _)) = self.unfulfilled_watch_sets.remove(ws_id) {
451            for obj_id in obj_ids {
452                let mut entry = match self.unfulfilled_watch_sets_by_object.entry(obj_id) {
453                    Entry::Occupied(entry) => entry,
454                    Entry::Vacant(_) => panic!("corrupted watchset state"),
455                };
456                entry.get_mut().remove(ws_id);
457                if entry.get().is_empty() {
458                    entry.remove();
459                }
460            }
461        }
462    }
463
464    /// Process a pending response from the storage controller. If necessary,
465    /// return a higher-level response to our client.
466    fn process_storage_response(
467        &mut self,
468        storage_metadata: &StorageMetadata,
469    ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
470        let maybe_response = self.storage.process(storage_metadata)?;
471        Ok(maybe_response.and_then(
472            |mz_storage_client::controller::Response::FrontierUpdates(r)| {
473                self.handle_frontier_updates(&r)
474            },
475        ))
476    }
477
478    /// Process a pending response from the compute controller. If necessary,
479    /// return a higher-level response to our client.
480    fn process_compute_response(&mut self) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
481        let response = self.compute.process();
482
483        let response = response.and_then(|r| match r {
484            ComputeControllerResponse::PeekNotification(uuid, peek, otel_ctx) => {
485                Some(ControllerResponse::PeekNotification(uuid, peek, otel_ctx))
486            }
487            ComputeControllerResponse::SubscribeResponse(id, tail) => {
488                Some(ControllerResponse::SubscribeResponse(id, tail))
489            }
490            ComputeControllerResponse::CopyToResponse(id, tail) => {
491                Some(ControllerResponse::CopyToResponse(id, tail))
492            }
493            ComputeControllerResponse::FrontierUpper { id, upper } => {
494                self.handle_frontier_updates(&[(id, upper)])
495            }
496        });
497        Ok(response)
498    }
499
500    /// Processes the work queued by [`Controller::ready`].
501    ///
502    /// This method is guaranteed to return "quickly" unless doing so would
503    /// compromise the correctness of the system.
504    ///
505    /// This method is **not** guaranteed to be cancellation safe. It **must**
506    /// be awaited to completion.
507    #[mz_ore::instrument(level = "debug")]
508    pub fn process(
509        &mut self,
510        storage_metadata: &StorageMetadata,
511    ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
512        match mem::take(&mut self.readiness) {
513            Readiness::NotReady => Ok(None),
514            Readiness::Storage => self.process_storage_response(storage_metadata),
515            Readiness::Compute => self.process_compute_response(),
516            Readiness::Metrics((id, metrics)) => self.process_replica_metrics(id, metrics),
517            Readiness::Internal(message) => Ok(Some(message)),
518        }
519    }
520
521    /// Record updates to frontiers, and propagate any necessary responses.
522    /// As of this writing (2/29/2024), the only response that can be generated
523    /// from a frontier update is `WatchSetCompleted`.
524    fn handle_frontier_updates(
525        &mut self,
526        updates: &[(GlobalId, Antichain<T>)],
527    ) -> Option<ControllerResponse<T>> {
528        let mut finished = vec![];
529        for (obj_id, antichain) in updates {
530            let ws_ids = self.unfulfilled_watch_sets_by_object.entry(*obj_id);
531            if let Entry::Occupied(mut ws_ids) = ws_ids {
532                ws_ids.get_mut().retain(|ws_id| {
533                    let mut entry = match self.unfulfilled_watch_sets.entry(*ws_id) {
534                        Entry::Occupied(entry) => entry,
535                        Entry::Vacant(_) => panic!("corrupted watchset state"),
536                    };
537                    // If this object has made more progress than required by this watchset we:
538                    if !antichain.less_equal(&entry.get().1) {
539                        // 1. Remove the object from the set of pending objects for the watchset
540                        entry.get_mut().0.remove(obj_id);
541                        // 2. Mark the watchset as finished if this was the last watched object
542                        if entry.get().0.is_empty() {
543                            entry.remove();
544                            finished.push(*ws_id);
545                        }
546                        // 3. Remove the watchset from the set of pending watchsets for the object
547                        false
548                    } else {
549                        // Otherwise we keep the watchset around to re-check in the future
550                        true
551                    }
552                });
553                // Clear the entry if this was the last watchset that was interested in obj_id
554                if ws_ids.get().is_empty() {
555                    ws_ids.remove();
556                }
557            }
558        }
559        (!(finished.is_empty())).then(|| ControllerResponse::WatchSetFinished(finished))
560    }
561
562    fn process_replica_metrics(
563        &mut self,
564        id: ReplicaId,
565        metrics: Vec<ServiceProcessMetrics>,
566    ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
567        self.record_replica_metrics(id, &metrics);
568        Ok(Some(ControllerResponse::ComputeReplicaMetrics(id, metrics)))
569    }
570
571    fn record_replica_metrics(&mut self, replica_id: ReplicaId, metrics: &[ServiceProcessMetrics]) {
572        if self.read_only() {
573            return;
574        }
575
576        let now = mz_ore::now::to_datetime((self.now)());
577        let now_tz = now.try_into().expect("must fit");
578
579        let replica_id = replica_id.to_string();
580        let mut row = Row::default();
581        let updates = metrics
582            .iter()
583            .zip(0..)
584            .map(|(m, process_id)| {
585                row.packer().extend(&[
586                    Datum::String(&replica_id),
587                    Datum::UInt64(process_id),
588                    m.cpu_nano_cores.into(),
589                    m.memory_bytes.into(),
590                    m.disk_usage_bytes.into(),
591                    Datum::TimestampTz(now_tz),
592                ]);
593                (row.clone(), mz_repr::Diff::ONE)
594            })
595            .collect();
596
597        self.storage
598            .append_introspection_updates(IntrospectionType::ReplicaMetricsHistory, updates);
599    }
600
601    /// Determine the "real-time recency" timestamp for all `ids`.
602    ///
603    /// Real-time recency is defined as the minimum value of `T` that all
604    /// objects can be queried at to return all data visible in the upstream
605    /// system the query was issued. In this case, "the upstream systems" are
606    /// any user sources that connect to objects outside of Materialize, such as
607    /// Kafka sources.
608    ///
609    /// If no items in `ids` connect to external systems, this function will
610    /// return `Ok(T::minimum)`.
611    pub async fn determine_real_time_recent_timestamp(
612        &self,
613        ids: BTreeSet<GlobalId>,
614        timeout: Duration,
615    ) -> Result<BoxFuture<'static, Result<T, StorageError<T>>>, StorageError<T>> {
616        self.storage.real_time_recent_timestamp(ids, timeout).await
617    }
618}
619
620impl<T> Controller<T>
621where
622    // Bounds needed by `StorageController` and/or `Controller`:
623    T: Timestamp
624        + Codec64
625        + From<EpochMillis>
626        + TimestampManipulation
627        + std::fmt::Display
628        + Into<mz_repr::Timestamp>,
629    StorageCommand<T>: RustType<ProtoStorageCommand>,
630    StorageResponse<T>: RustType<ProtoStorageResponse>,
631    ComputeGrpcClient: ComputeClient<T>,
632    // Bounds needed by `ComputeController`:
633    T: ComputeControllerTimestamp,
634{
635    /// Creates a new controller.
636    ///
637    /// For correctness, this function expects to have access to the mutations
638    /// to the `storage_txn` that occurred in [`prepare_initialization`].
639    ///
640    /// # Panics
641    /// If this function is called before [`prepare_initialization`].
642    #[instrument(name = "controller::new")]
643    pub async fn new(
644        config: ControllerConfig,
645        envd_epoch: NonZeroI64,
646        read_only: bool,
647        storage_txn: &dyn StorageTxn<T>,
648    ) -> Self {
649        if read_only {
650            tracing::info!("starting controllers in read-only mode!");
651        }
652
653        let now_fn = config.now.clone();
654        let wallclock_lag: WallclockLagFn<_> = Arc::new(move |time: &T| {
655            let now = mz_repr::Timestamp::new(now_fn());
656            let time_ts: mz_repr::Timestamp = time.clone().into();
657            let lag_ts = now.saturating_sub(time_ts);
658            Duration::from(lag_ts)
659        });
660
661        let controller_metrics = ControllerMetrics::new(&config.metrics_registry);
662
663        let txns_metrics = Arc::new(TxnMetrics::new(&config.metrics_registry));
664        let collections_ctl = storage_collections::StorageCollectionsImpl::new(
665            config.persist_location.clone(),
666            Arc::clone(&config.persist_clients),
667            &config.metrics_registry,
668            config.now.clone(),
669            Arc::clone(&txns_metrics),
670            envd_epoch,
671            read_only,
672            config.connection_context.clone(),
673            storage_txn,
674        )
675        .await;
676
677        let collections_ctl: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync> =
678            Arc::new(collections_ctl);
679
680        let storage_controller = mz_storage_controller::Controller::new(
681            config.build_info,
682            config.persist_location,
683            config.persist_clients,
684            config.now.clone(),
685            Arc::clone(&wallclock_lag),
686            Arc::clone(&txns_metrics),
687            envd_epoch,
688            read_only,
689            &config.metrics_registry,
690            controller_metrics.clone(),
691            config.connection_context,
692            storage_txn,
693            Arc::clone(&collections_ctl),
694        )
695        .await;
696
697        let storage_collections = Arc::clone(&collections_ctl);
698        let compute_controller = ComputeController::new(
699            config.build_info,
700            storage_collections,
701            envd_epoch,
702            read_only,
703            &config.metrics_registry,
704            controller_metrics,
705            config.now.clone(),
706            wallclock_lag,
707        );
708        let (metrics_tx, metrics_rx) = mpsc::unbounded_channel();
709
710        let this = Self {
711            storage: Box::new(storage_controller),
712            storage_collections: collections_ctl,
713            compute: compute_controller,
714            clusterd_image: config.clusterd_image,
715            init_container_image: config.init_container_image,
716            deploy_generation: config.deploy_generation,
717            read_only,
718            orchestrator: config.orchestrator.namespace("cluster"),
719            readiness: Readiness::NotReady,
720            metrics_tasks: BTreeMap::new(),
721            metrics_tx,
722            metrics_rx,
723            now: config.now,
724            persist_pubsub_url: config.persist_pubsub_url,
725            secrets_args: config.secrets_args,
726            unfulfilled_watch_sets_by_object: BTreeMap::new(),
727            unfulfilled_watch_sets: BTreeMap::new(),
728            watch_set_id_gen: Gen::default(),
729            immediate_watch_sets: Vec::new(),
730        };
731
732        if !this.read_only {
733            this.remove_past_generation_replicas_in_background();
734        }
735
736        this
737    }
738}