mz_controller/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A representative of STORAGE and COMPUTE that maintains summaries of the involved objects.
11//!
12//! The `Controller` provides the ability to create and manipulate storage and compute instances.
13//! Each of Storage and Compute provide their own controllers, accessed through the `storage()`
14//! and `compute(instance_id)` methods. It is an error to access a compute instance before it has
15//! been created.
16//!
17//! The controller also provides a `recv()` method that returns responses from the storage and
18//! compute layers, which may remain of value to the interested user. With time, these responses
19//! may be thinned down in an effort to make the controller more self contained.
20//!
21//! Consult the `StorageController` and `ComputeController` documentation for more information
22//! about each of these interfaces.
23
24use std::collections::btree_map::Entry;
25use std::collections::{BTreeMap, BTreeSet};
26use std::mem;
27use std::num::NonZeroI64;
28use std::sync::Arc;
29use std::time::Duration;
30
31use futures::future::BoxFuture;
32use mz_build_info::BuildInfo;
33use mz_cluster_client::metrics::ControllerMetrics;
34use mz_cluster_client::{ReplicaId, WallclockLagFn};
35use mz_compute_client::controller::{
36    ComputeController, ComputeControllerResponse, ComputeControllerTimestamp, PeekNotification,
37};
38use mz_compute_client::protocol::response::SubscribeBatch;
39use mz_compute_client::service::{ComputeClient, ComputeGrpcClient};
40use mz_controller_types::WatchSetId;
41use mz_dyncfg::{ConfigSet, ConfigUpdates};
42use mz_orchestrator::{NamespacedOrchestrator, Orchestrator, ServiceProcessMetrics};
43use mz_ore::id_gen::Gen;
44use mz_ore::instrument;
45use mz_ore::metrics::MetricsRegistry;
46use mz_ore::now::{EpochMillis, NowFn};
47use mz_ore::task::AbortOnDropHandle;
48use mz_ore::tracing::OpenTelemetryContext;
49use mz_persist_client::PersistLocation;
50use mz_persist_client::cache::PersistClientCache;
51use mz_persist_types::Codec64;
52use mz_proto::RustType;
53use mz_repr::{Datum, GlobalId, Row, TimestampManipulation};
54use mz_service::secrets::SecretsReaderCliArgs;
55use mz_storage_client::client::{
56    ProtoStorageCommand, ProtoStorageResponse, StorageCommand, StorageResponse,
57};
58use mz_storage_client::controller::{
59    IntrospectionType, StorageController, StorageMetadata, StorageTxn,
60};
61use mz_storage_client::storage_collections::{self, StorageCollections};
62use mz_storage_types::configuration::StorageConfiguration;
63use mz_storage_types::connections::ConnectionContext;
64use mz_storage_types::controller::StorageError;
65use mz_txn_wal::metrics::Metrics as TxnMetrics;
66use serde::Serialize;
67use timely::progress::{Antichain, Timestamp};
68use tokio::sync::mpsc;
69use uuid::Uuid;
70
71pub mod clusters;
72
73// Export this on behalf of the storage controller to provide a unified
74// interface, allowing other crates to depend on this crate alone.
75pub use mz_storage_controller::prepare_initialization;
76
77/// Configures a controller.
78#[derive(Debug, Clone)]
79pub struct ControllerConfig {
80    /// The build information for this process.
81    pub build_info: &'static BuildInfo,
82    /// The orchestrator implementation to use.
83    pub orchestrator: Arc<dyn Orchestrator>,
84    /// The persist location where all storage collections will be written to.
85    pub persist_location: PersistLocation,
86    /// A process-global cache of (blob_uri, consensus_uri) ->
87    /// PersistClient.
88    /// This is intentionally shared between workers.
89    pub persist_clients: Arc<PersistClientCache>,
90    /// The clusterd image to use when starting new cluster processes.
91    pub clusterd_image: String,
92    /// The init container image to use for clusterd.
93    pub init_container_image: Option<String>,
94    /// A number representing the environment's generation.
95    ///
96    /// This is incremented to request that the new process perform a graceful
97    /// transition of power from the prior generation.
98    pub deploy_generation: u64,
99    /// The now function to advance the controller's introspection collections.
100    pub now: NowFn,
101    /// The metrics registry.
102    pub metrics_registry: MetricsRegistry,
103    /// The URL for Persist PubSub.
104    pub persist_pubsub_url: String,
105    /// Arguments for secrets readers.
106    pub secrets_args: SecretsReaderCliArgs,
107    /// The connection context, to thread through to clusterd, with cli flags.
108    pub connection_context: ConnectionContext,
109}
110
111/// Responses that [`Controller`] can produce.
112#[derive(Debug)]
113pub enum ControllerResponse<T = mz_repr::Timestamp> {
114    /// Notification of a worker's response to a specified (by connection id) peek.
115    ///
116    /// Additionally, an `OpenTelemetryContext` to forward trace information
117    /// back into coord. This allows coord traces to be children of work
118    /// done in compute!
119    PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
120    /// The worker's next response to a specified subscribe.
121    SubscribeResponse(GlobalId, SubscribeBatch<T>),
122    /// The worker's next response to a specified copy to.
123    CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
124    /// Notification that a watch set has finished. See
125    /// [`Controller::install_compute_watch_set`] and
126    /// [`Controller::install_storage_watch_set`] for details.
127    WatchSetFinished(Vec<WatchSetId>),
128}
129
130/// Whether one of the underlying controllers is ready for their `process`
131/// method to be called.
132#[derive(Debug, Default)]
133pub enum Readiness<T> {
134    /// No underlying controllers are ready.
135    #[default]
136    NotReady,
137    /// The storage controller is ready.
138    Storage,
139    /// The compute controller is ready.
140    Compute,
141    /// A batch of metric data is ready.
142    Metrics((ReplicaId, Vec<ServiceProcessMetrics>)),
143    /// An internally-generated message is ready to be returned.
144    Internal(ControllerResponse<T>),
145}
146
147/// A client that maintains soft state and validates commands, in addition to forwarding them.
148pub struct Controller<T: ComputeControllerTimestamp = mz_repr::Timestamp> {
149    pub storage: Box<dyn StorageController<Timestamp = T>>,
150    pub storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
151    pub compute: ComputeController<T>,
152    /// The clusterd image to use when starting new cluster processes.
153    clusterd_image: String,
154    /// The init container image to use for clusterd.
155    init_container_image: Option<String>,
156    /// A number representing the environment's generation.
157    deploy_generation: u64,
158    /// Whether or not this controller is in read-only mode.
159    ///
160    /// When in read-only mode, neither this controller nor the instances
161    /// controlled by it are allowed to affect changes to external systems
162    /// (largely persist).
163    read_only: bool,
164    /// The cluster orchestrator.
165    orchestrator: Arc<dyn NamespacedOrchestrator>,
166    /// Tracks the readiness of the underlying controllers.
167    readiness: Readiness<T>,
168    /// Tasks for collecting replica metrics.
169    metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>,
170    /// Sender for the channel over which replica metrics are sent.
171    metrics_tx: mpsc::UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>,
172    /// Receiver for the channel over which replica metrics are sent.
173    metrics_rx: mpsc::UnboundedReceiver<(ReplicaId, Vec<ServiceProcessMetrics>)>,
174    /// A function providing the current wallclock time.
175    now: NowFn,
176
177    /// The URL for Persist PubSub.
178    persist_pubsub_url: String,
179
180    /// Arguments for secrets readers.
181    secrets_args: SecretsReaderCliArgs,
182
183    /// A map associating a global ID to the set of all the unfulfilled watch
184    /// set ids that include it.
185    ///
186    /// See [`Controller::install_compute_watch_set`]/[`Controller::install_storage_watch_set`] for a description of watch sets.
187    // When a watch set is fulfilled for a given object (that is, when
188    // the object's frontier advances to at least the watch set's
189    // timestamp), the corresponding entry will be removed from the set.
190    unfulfilled_watch_sets_by_object: BTreeMap<GlobalId, BTreeSet<WatchSetId>>,
191    /// A map of installed watch sets indexed by id.
192    unfulfilled_watch_sets: BTreeMap<WatchSetId, (BTreeSet<GlobalId>, T)>,
193    /// A sequence of numbers used to mint unique WatchSetIds.
194    watch_set_id_gen: Gen<WatchSetId>,
195
196    /// A list of watch sets that were already fulfilled as soon as
197    /// they were installed, and thus that must be returned to the
198    /// client on the next call to [`Controller::process_compute_response`]/[`Controller::process_storage_response`].
199    ///
200    /// See [`Controller::install_compute_watch_set`]/[`Controller::install_storage_watch_set`] for a description of watch sets.
201    immediate_watch_sets: Vec<WatchSetId>,
202
203    /// Dynamic system configuration.
204    dyncfg: ConfigSet,
205}
206
207impl<T: ComputeControllerTimestamp> Controller<T> {
208    /// Update the controller configuration.
209    pub fn update_configuration(&mut self, updates: ConfigUpdates) {
210        updates.apply(&self.dyncfg);
211    }
212
213    /// Start sinking the compute controller's introspection data into storage.
214    ///
215    /// This method should be called once the introspection collections have been registered with
216    /// the storage controller. It will panic if invoked earlier than that.
217    pub fn start_compute_introspection_sink(&mut self) {
218        self.compute.start_introspection_sink(&*self.storage);
219    }
220
221    /// Returns the connection context installed in the controller.
222    ///
223    /// This is purely a helper, and can be obtained from `self.storage`.
224    pub fn connection_context(&self) -> &ConnectionContext {
225        &self.storage.config().connection_context
226    }
227
228    /// Returns the storage configuration installed in the storage controller.
229    ///
230    /// This is purely a helper, and can be obtained from `self.storage`.
231    pub fn storage_configuration(&self) -> &StorageConfiguration {
232        self.storage.config()
233    }
234
235    /// Returns the state of the [`Controller`] formatted as JSON.
236    ///
237    /// The returned value is not guaranteed to be stable and may change at any point in time.
238    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
239        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
240        // returned object as a tradeoff between usability and stability. `serde_json` will fail
241        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
242        // prevents a future unrelated change from silently breaking this method.
243
244        // Destructure `self` here so we don't forget to consider dumping newly added fields.
245        let Self {
246            storage_collections: _,
247            storage: _,
248            compute,
249            clusterd_image: _,
250            init_container_image: _,
251            deploy_generation,
252            read_only,
253            orchestrator: _,
254            readiness,
255            metrics_tasks: _,
256            metrics_tx: _,
257            metrics_rx: _,
258            now: _,
259            persist_pubsub_url: _,
260            secrets_args: _,
261            unfulfilled_watch_sets_by_object: _,
262            unfulfilled_watch_sets,
263            watch_set_id_gen: _,
264            immediate_watch_sets,
265            dyncfg: _,
266        } = self;
267
268        let compute = compute.dump().await?;
269
270        let unfulfilled_watch_sets: BTreeMap<_, _> = unfulfilled_watch_sets
271            .iter()
272            .map(|(ws_id, watches)| (format!("{ws_id:?}"), format!("{watches:?}")))
273            .collect();
274        let immediate_watch_sets: Vec<_> = immediate_watch_sets
275            .iter()
276            .map(|watch| format!("{watch:?}"))
277            .collect();
278
279        fn field(
280            key: &str,
281            value: impl Serialize,
282        ) -> Result<(String, serde_json::Value), anyhow::Error> {
283            let value = serde_json::to_value(value)?;
284            Ok((key.to_string(), value))
285        }
286
287        let map = serde_json::Map::from_iter([
288            field("compute", compute)?,
289            field("deploy_generation", deploy_generation)?,
290            field("read_only", read_only)?,
291            field("readiness", format!("{readiness:?}"))?,
292            field("unfulfilled_watch_sets", unfulfilled_watch_sets)?,
293            field("immediate_watch_sets", immediate_watch_sets)?,
294        ]);
295        Ok(serde_json::Value::Object(map))
296    }
297}
298
299impl<T> Controller<T>
300where
301    T: ComputeControllerTimestamp,
302    ComputeGrpcClient: ComputeClient<T>,
303{
304    pub fn update_orchestrator_scheduling_config(
305        &self,
306        config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
307    ) {
308        self.orchestrator.update_scheduling_config(config);
309    }
310    /// Marks the end of any initialization commands.
311    ///
312    /// The implementor may wait for this method to be called before implementing prior commands,
313    /// and so it is important for a user to invoke this method as soon as it is comfortable.
314    /// This method can be invoked immediately, at the potential expense of performance.
315    pub fn initialization_complete(&mut self) {
316        self.storage.initialization_complete();
317        self.compute.initialization_complete();
318    }
319
320    /// Reports whether the controller is in read only mode.
321    pub fn read_only(&self) -> bool {
322        self.read_only
323    }
324
325    /// Returns `Some` if there is an immediately available
326    /// internally-generated response that we need to return to the
327    /// client (as opposed to waiting for a response from compute or storage).
328    fn take_internal_response(&mut self) -> Option<ControllerResponse<T>> {
329        let ws = std::mem::take(&mut self.immediate_watch_sets);
330        (!ws.is_empty()).then_some(ControllerResponse::WatchSetFinished(ws))
331    }
332
333    /// Waits until the controller is ready to process a response.
334    ///
335    /// This method may block for an arbitrarily long time.
336    ///
337    /// When the method returns, the owner should call [`Controller::ready`] to
338    /// process the ready message.
339    ///
340    /// This method is cancellation safe.
341    pub async fn ready(&mut self) {
342        if let Readiness::NotReady = self.readiness {
343            // the coordinator wants to be able to make a simple
344            // sequence of ready, process, ready, process, .... calls,
345            // but the controller sometimes has responses immediately
346            // ready to be processed and should do so before calling
347            // into either of the lower-level controllers. This `if`
348            // statement handles that case.
349            if let Some(response) = self.take_internal_response() {
350                self.readiness = Readiness::Internal(response);
351            } else {
352                // The underlying `ready` methods are cancellation safe, so it is
353                // safe to construct this `select!`.
354                tokio::select! {
355                    () = self.storage.ready() => {
356                        self.readiness = Readiness::Storage;
357                    }
358                    () = self.compute.ready() => {
359                        self.readiness = Readiness::Compute;
360                    }
361                    Some(metrics) = self.metrics_rx.recv() => {
362                        self.readiness = Readiness::Metrics(metrics);
363                    }
364                }
365            }
366        }
367    }
368
369    /// Returns the [Readiness] status of this controller.
370    pub fn get_readiness(&self) -> &Readiness<T> {
371        &self.readiness
372    }
373
374    /// Install a _watch set_ in the controller.
375    ///
376    /// A _watch set_ is a request to be informed by the controller when
377    /// all of the frontiers of a particular set of objects have advanced at
378    /// least to a particular timestamp.
379    ///
380    /// When all the objects in `objects` have advanced to `t`, the watchset id
381    /// is returned to the client on the next call to [`Self::process`].
382    pub fn install_compute_watch_set(
383        &mut self,
384        mut objects: BTreeSet<GlobalId>,
385        t: T,
386    ) -> WatchSetId {
387        let ws_id = self.watch_set_id_gen.allocate_id();
388
389        objects.retain(|id| {
390            let frontier = self
391                .compute
392                .collection_frontiers(*id, None)
393                .map(|f| f.write_frontier)
394                .expect("missing compute dependency");
395            frontier.less_equal(&t)
396        });
397        if objects.is_empty() {
398            self.immediate_watch_sets.push(ws_id);
399        } else {
400            for id in objects.iter() {
401                self.unfulfilled_watch_sets_by_object
402                    .entry(*id)
403                    .or_default()
404                    .insert(ws_id);
405            }
406            self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
407        }
408
409        ws_id
410    }
411
412    /// Install a _watch set_ in the controller.
413    ///
414    /// A _watch set_ is a request to be informed by the controller when
415    /// all of the frontiers of a particular set of objects have advanced at
416    /// least to a particular timestamp.
417    ///
418    /// When all the objects in `objects` have advanced to `t`, the watchset id
419    /// is returned to the client on the next call to [`Self::process`].
420    pub fn install_storage_watch_set(
421        &mut self,
422        mut objects: BTreeSet<GlobalId>,
423        t: T,
424    ) -> WatchSetId {
425        let ws_id = self.watch_set_id_gen.allocate_id();
426
427        let uppers = self
428            .storage
429            .collections_frontiers(objects.iter().cloned().collect())
430            .expect("missing storage dependencies")
431            .into_iter()
432            .map(|(id, _since, upper)| (id, upper))
433            .collect::<BTreeMap<_, _>>();
434
435        objects.retain(|id| {
436            let upper = uppers.get(id).expect("missing collection");
437            upper.less_equal(&t)
438        });
439        if objects.is_empty() {
440            self.immediate_watch_sets.push(ws_id);
441        } else {
442            for id in objects.iter() {
443                self.unfulfilled_watch_sets_by_object
444                    .entry(*id)
445                    .or_default()
446                    .insert(ws_id);
447            }
448            self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
449        }
450        ws_id
451    }
452
453    /// Uninstalls a previously installed WatchSetId. The method is a no-op if the watch set has
454    /// already finished and therefore it's safe to call this function unconditionally.
455    ///
456    /// # Panics
457    /// This method panics if called with a WatchSetId that was never returned by the function.
458    pub fn uninstall_watch_set(&mut self, ws_id: &WatchSetId) {
459        if let Some((obj_ids, _)) = self.unfulfilled_watch_sets.remove(ws_id) {
460            for obj_id in obj_ids {
461                let mut entry = match self.unfulfilled_watch_sets_by_object.entry(obj_id) {
462                    Entry::Occupied(entry) => entry,
463                    Entry::Vacant(_) => panic!("corrupted watchset state"),
464                };
465                entry.get_mut().remove(ws_id);
466                if entry.get().is_empty() {
467                    entry.remove();
468                }
469            }
470        }
471    }
472
473    /// Process a pending response from the storage controller. If necessary,
474    /// return a higher-level response to our client.
475    fn process_storage_response(
476        &mut self,
477        storage_metadata: &StorageMetadata,
478    ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
479        let maybe_response = self.storage.process(storage_metadata)?;
480        Ok(maybe_response.and_then(
481            |mz_storage_client::controller::Response::FrontierUpdates(r)| {
482                self.handle_frontier_updates(&r)
483            },
484        ))
485    }
486
487    /// Process a pending response from the compute controller. If necessary,
488    /// return a higher-level response to our client.
489    fn process_compute_response(&mut self) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
490        let response = self.compute.process();
491
492        let response = response.and_then(|r| match r {
493            ComputeControllerResponse::PeekNotification(uuid, peek, otel_ctx) => {
494                Some(ControllerResponse::PeekNotification(uuid, peek, otel_ctx))
495            }
496            ComputeControllerResponse::SubscribeResponse(id, tail) => {
497                Some(ControllerResponse::SubscribeResponse(id, tail))
498            }
499            ComputeControllerResponse::CopyToResponse(id, tail) => {
500                Some(ControllerResponse::CopyToResponse(id, tail))
501            }
502            ComputeControllerResponse::FrontierUpper { id, upper } => {
503                self.handle_frontier_updates(&[(id, upper)])
504            }
505        });
506        Ok(response)
507    }
508
509    /// Processes the work queued by [`Controller::ready`].
510    ///
511    /// This method is guaranteed to return "quickly" unless doing so would
512    /// compromise the correctness of the system.
513    ///
514    /// This method is **not** guaranteed to be cancellation safe. It **must**
515    /// be awaited to completion.
516    #[mz_ore::instrument(level = "debug")]
517    pub fn process(
518        &mut self,
519        storage_metadata: &StorageMetadata,
520    ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
521        match mem::take(&mut self.readiness) {
522            Readiness::NotReady => Ok(None),
523            Readiness::Storage => self.process_storage_response(storage_metadata),
524            Readiness::Compute => self.process_compute_response(),
525            Readiness::Metrics((id, metrics)) => self.process_replica_metrics(id, metrics),
526            Readiness::Internal(message) => Ok(Some(message)),
527        }
528    }
529
530    /// Record updates to frontiers, and propagate any necessary responses.
531    /// As of this writing (2/29/2024), the only response that can be generated
532    /// from a frontier update is `WatchSetCompleted`.
533    fn handle_frontier_updates(
534        &mut self,
535        updates: &[(GlobalId, Antichain<T>)],
536    ) -> Option<ControllerResponse<T>> {
537        let mut finished = vec![];
538        for (obj_id, antichain) in updates {
539            let ws_ids = self.unfulfilled_watch_sets_by_object.entry(*obj_id);
540            if let Entry::Occupied(mut ws_ids) = ws_ids {
541                ws_ids.get_mut().retain(|ws_id| {
542                    let mut entry = match self.unfulfilled_watch_sets.entry(*ws_id) {
543                        Entry::Occupied(entry) => entry,
544                        Entry::Vacant(_) => panic!("corrupted watchset state"),
545                    };
546                    // If this object has made more progress than required by this watchset we:
547                    if !antichain.less_equal(&entry.get().1) {
548                        // 1. Remove the object from the set of pending objects for the watchset
549                        entry.get_mut().0.remove(obj_id);
550                        // 2. Mark the watchset as finished if this was the last watched object
551                        if entry.get().0.is_empty() {
552                            entry.remove();
553                            finished.push(*ws_id);
554                        }
555                        // 3. Remove the watchset from the set of pending watchsets for the object
556                        false
557                    } else {
558                        // Otherwise we keep the watchset around to re-check in the future
559                        true
560                    }
561                });
562                // Clear the entry if this was the last watchset that was interested in obj_id
563                if ws_ids.get().is_empty() {
564                    ws_ids.remove();
565                }
566            }
567        }
568        (!(finished.is_empty())).then(|| ControllerResponse::WatchSetFinished(finished))
569    }
570
571    fn process_replica_metrics(
572        &mut self,
573        id: ReplicaId,
574        metrics: Vec<ServiceProcessMetrics>,
575    ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
576        self.record_replica_metrics(id, &metrics);
577        Ok(None)
578    }
579
580    fn record_replica_metrics(&mut self, replica_id: ReplicaId, metrics: &[ServiceProcessMetrics]) {
581        if self.read_only() {
582            return;
583        }
584
585        let now = mz_ore::now::to_datetime((self.now)());
586        let now_tz = now.try_into().expect("must fit");
587
588        let replica_id = replica_id.to_string();
589        let mut row = Row::default();
590        let updates = metrics
591            .iter()
592            .zip(0..)
593            .map(|(m, process_id)| {
594                row.packer().extend(&[
595                    Datum::String(&replica_id),
596                    Datum::UInt64(process_id),
597                    m.cpu_nano_cores.into(),
598                    m.memory_bytes.into(),
599                    m.disk_usage_bytes.into(),
600                    Datum::TimestampTz(now_tz),
601                ]);
602                (row.clone(), mz_repr::Diff::ONE)
603            })
604            .collect();
605
606        self.storage
607            .append_introspection_updates(IntrospectionType::ReplicaMetricsHistory, updates);
608    }
609
610    /// Determine the "real-time recency" timestamp for all `ids`.
611    ///
612    /// Real-time recency is defined as the minimum value of `T` that all
613    /// objects can be queried at to return all data visible in the upstream
614    /// system the query was issued. In this case, "the upstream systems" are
615    /// any user sources that connect to objects outside of Materialize, such as
616    /// Kafka sources.
617    ///
618    /// If no items in `ids` connect to external systems, this function will
619    /// return `Ok(T::minimum)`.
620    pub async fn determine_real_time_recent_timestamp(
621        &self,
622        ids: BTreeSet<GlobalId>,
623        timeout: Duration,
624    ) -> Result<BoxFuture<'static, Result<T, StorageError<T>>>, StorageError<T>> {
625        self.storage.real_time_recent_timestamp(ids, timeout).await
626    }
627}
628
629impl<T> Controller<T>
630where
631    // Bounds needed by `StorageController` and/or `Controller`:
632    T: Timestamp
633        + Codec64
634        + From<EpochMillis>
635        + TimestampManipulation
636        + std::fmt::Display
637        + Into<mz_repr::Timestamp>,
638    StorageCommand<T>: RustType<ProtoStorageCommand>,
639    StorageResponse<T>: RustType<ProtoStorageResponse>,
640    ComputeGrpcClient: ComputeClient<T>,
641    // Bounds needed by `ComputeController`:
642    T: ComputeControllerTimestamp,
643{
644    /// Creates a new controller.
645    ///
646    /// For correctness, this function expects to have access to the mutations
647    /// to the `storage_txn` that occurred in [`prepare_initialization`].
648    ///
649    /// # Panics
650    /// If this function is called before [`prepare_initialization`].
651    #[instrument(name = "controller::new")]
652    pub async fn new(
653        config: ControllerConfig,
654        envd_epoch: NonZeroI64,
655        read_only: bool,
656        storage_txn: &dyn StorageTxn<T>,
657    ) -> Self {
658        if read_only {
659            tracing::info!("starting controllers in read-only mode!");
660        }
661
662        let now_fn = config.now.clone();
663        let wallclock_lag_fn = WallclockLagFn::new(now_fn);
664
665        let controller_metrics = ControllerMetrics::new(&config.metrics_registry);
666
667        let txns_metrics = Arc::new(TxnMetrics::new(&config.metrics_registry));
668        let collections_ctl = storage_collections::StorageCollectionsImpl::new(
669            config.persist_location.clone(),
670            Arc::clone(&config.persist_clients),
671            &config.metrics_registry,
672            config.now.clone(),
673            Arc::clone(&txns_metrics),
674            envd_epoch,
675            read_only,
676            config.connection_context.clone(),
677            storage_txn,
678        )
679        .await;
680
681        let collections_ctl: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync> =
682            Arc::new(collections_ctl);
683
684        let storage_controller = mz_storage_controller::Controller::new(
685            config.build_info,
686            config.persist_location.clone(),
687            config.persist_clients,
688            config.now.clone(),
689            wallclock_lag_fn.clone(),
690            Arc::clone(&txns_metrics),
691            read_only,
692            &config.metrics_registry,
693            controller_metrics.clone(),
694            config.connection_context,
695            storage_txn,
696            Arc::clone(&collections_ctl),
697        )
698        .await;
699
700        let storage_collections = Arc::clone(&collections_ctl);
701        let compute_controller = ComputeController::new(
702            config.build_info,
703            storage_collections,
704            read_only,
705            &config.metrics_registry,
706            config.persist_location,
707            controller_metrics,
708            config.now.clone(),
709            wallclock_lag_fn,
710        );
711        let (metrics_tx, metrics_rx) = mpsc::unbounded_channel();
712
713        let this = Self {
714            storage: Box::new(storage_controller),
715            storage_collections: collections_ctl,
716            compute: compute_controller,
717            clusterd_image: config.clusterd_image,
718            init_container_image: config.init_container_image,
719            deploy_generation: config.deploy_generation,
720            read_only,
721            orchestrator: config.orchestrator.namespace("cluster"),
722            readiness: Readiness::NotReady,
723            metrics_tasks: BTreeMap::new(),
724            metrics_tx,
725            metrics_rx,
726            now: config.now,
727            persist_pubsub_url: config.persist_pubsub_url,
728            secrets_args: config.secrets_args,
729            unfulfilled_watch_sets_by_object: BTreeMap::new(),
730            unfulfilled_watch_sets: BTreeMap::new(),
731            watch_set_id_gen: Gen::default(),
732            immediate_watch_sets: Vec::new(),
733            dyncfg: mz_dyncfgs::all_dyncfgs(),
734        };
735
736        if !this.read_only {
737            this.remove_past_generation_replicas_in_background();
738        }
739
740        this
741    }
742}