mz_controller/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A representative of STORAGE and COMPUTE that maintains summaries of the involved objects.
11//!
12//! The `Controller` provides the ability to create and manipulate storage and compute instances.
13//! Each of Storage and Compute provide their own controllers, accessed through the `storage()`
14//! and `compute(instance_id)` methods. It is an error to access a compute instance before it has
15//! been created.
16//!
17//! The controller also provides a `recv()` method that returns responses from the storage and
18//! compute layers, which may remain of value to the interested user. With time, these responses
19//! may be thinned down in an effort to make the controller more self contained.
20//!
21//! Consult the `StorageController` and `ComputeController` documentation for more information
22//! about each of these interfaces.
23
24use std::collections::btree_map::Entry;
25use std::collections::{BTreeMap, BTreeSet};
26use std::mem;
27use std::num::NonZeroI64;
28use std::sync::Arc;
29use std::time::Duration;
30
31use futures::future::BoxFuture;
32use mz_build_info::BuildInfo;
33use mz_cluster_client::metrics::ControllerMetrics;
34use mz_cluster_client::{ReplicaId, WallclockLagFn};
35use mz_compute_client::controller::error::CollectionLookupError;
36use mz_compute_client::controller::{
37    ComputeController, ComputeControllerResponse, ComputeControllerTimestamp, PeekNotification,
38};
39use mz_compute_client::protocol::response::SubscribeBatch;
40use mz_controller_types::WatchSetId;
41use mz_dyncfg::{ConfigSet, ConfigUpdates};
42use mz_orchestrator::{NamespacedOrchestrator, Orchestrator, ServiceProcessMetrics};
43use mz_ore::cast::CastFrom;
44use mz_ore::id_gen::Gen;
45use mz_ore::instrument;
46use mz_ore::metrics::MetricsRegistry;
47use mz_ore::now::{EpochMillis, NowFn};
48use mz_ore::task::AbortOnDropHandle;
49use mz_ore::tracing::OpenTelemetryContext;
50use mz_persist_client::PersistLocation;
51use mz_persist_client::cache::PersistClientCache;
52use mz_persist_types::Codec64;
53use mz_repr::{Datum, GlobalId, Row, TimestampManipulation};
54use mz_service::secrets::SecretsReaderCliArgs;
55use mz_storage_client::controller::{
56    IntrospectionType, StorageController, StorageMetadata, StorageTxn,
57};
58use mz_storage_client::storage_collections::{self, StorageCollections};
59use mz_storage_types::configuration::StorageConfiguration;
60use mz_storage_types::connections::ConnectionContext;
61use mz_storage_types::controller::StorageError;
62use mz_txn_wal::metrics::Metrics as TxnMetrics;
63use timely::progress::{Antichain, Timestamp};
64use tokio::sync::mpsc;
65use uuid::Uuid;
66
67pub mod clusters;
68
69// Export this on behalf of the storage controller to provide a unified
70// interface, allowing other crates to depend on this crate alone.
71pub use mz_storage_controller::prepare_initialization;
72
73/// Configures a controller.
74#[derive(Debug, Clone)]
75pub struct ControllerConfig {
76    /// The build information for this process.
77    pub build_info: &'static BuildInfo,
78    /// The orchestrator implementation to use.
79    pub orchestrator: Arc<dyn Orchestrator>,
80    /// The persist location where all storage collections will be written to.
81    pub persist_location: PersistLocation,
82    /// A process-global cache of (blob_uri, consensus_uri) ->
83    /// PersistClient.
84    /// This is intentionally shared between workers.
85    pub persist_clients: Arc<PersistClientCache>,
86    /// The clusterd image to use when starting new cluster processes.
87    pub clusterd_image: String,
88    /// The init container image to use for clusterd.
89    pub init_container_image: Option<String>,
90    /// A number representing the environment's generation.
91    ///
92    /// This is incremented to request that the new process perform a graceful
93    /// transition of power from the prior generation.
94    pub deploy_generation: u64,
95    /// The now function to advance the controller's introspection collections.
96    pub now: NowFn,
97    /// The metrics registry.
98    pub metrics_registry: MetricsRegistry,
99    /// The URL for Persist PubSub.
100    pub persist_pubsub_url: String,
101    /// Arguments for secrets readers.
102    pub secrets_args: SecretsReaderCliArgs,
103    /// The connection context, to thread through to clusterd, with cli flags.
104    pub connection_context: ConnectionContext,
105}
106
107/// Responses that [`Controller`] can produce.
108#[derive(Debug)]
109pub enum ControllerResponse<T = mz_repr::Timestamp> {
110    /// Notification of a worker's response to a specified (by connection id) peek.
111    ///
112    /// Additionally, an `OpenTelemetryContext` to forward trace information
113    /// back into coord. This allows coord traces to be children of work
114    /// done in compute!
115    PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
116    /// The worker's next response to a specified subscribe.
117    SubscribeResponse(GlobalId, SubscribeBatch<T>),
118    /// The worker's next response to a specified copy to.
119    CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
120    /// Notification that a watch set has finished. See
121    /// [`Controller::install_compute_watch_set`] and
122    /// [`Controller::install_storage_watch_set`] for details.
123    WatchSetFinished(Vec<WatchSetId>),
124}
125
126/// Whether one of the underlying controllers is ready for their `process`
127/// method to be called.
128#[derive(Debug, Default)]
129pub enum Readiness<T> {
130    /// No underlying controllers are ready.
131    #[default]
132    NotReady,
133    /// The storage controller is ready.
134    Storage,
135    /// The compute controller is ready.
136    Compute,
137    /// A batch of metric data is ready.
138    Metrics((ReplicaId, Vec<ServiceProcessMetrics>)),
139    /// An internally-generated message is ready to be returned.
140    Internal(ControllerResponse<T>),
141}
142
143/// A client that maintains soft state and validates commands, in addition to forwarding them.
144pub struct Controller<T: ComputeControllerTimestamp = mz_repr::Timestamp> {
145    pub storage: Box<dyn StorageController<Timestamp = T>>,
146    pub storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
147    pub compute: ComputeController<T>,
148    /// The clusterd image to use when starting new cluster processes.
149    clusterd_image: String,
150    /// The init container image to use for clusterd.
151    init_container_image: Option<String>,
152    /// A number representing the environment's generation.
153    deploy_generation: u64,
154    /// Whether or not this controller is in read-only mode.
155    ///
156    /// When in read-only mode, neither this controller nor the instances
157    /// controlled by it are allowed to affect changes to external systems
158    /// (largely persist).
159    read_only: bool,
160    /// The cluster orchestrator.
161    orchestrator: Arc<dyn NamespacedOrchestrator>,
162    /// Tracks the readiness of the underlying controllers.
163    readiness: Readiness<T>,
164    /// Tasks for collecting replica metrics.
165    metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>,
166    /// Sender for the channel over which replica metrics are sent.
167    metrics_tx: mpsc::UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>,
168    /// Receiver for the channel over which replica metrics are sent.
169    metrics_rx: mpsc::UnboundedReceiver<(ReplicaId, Vec<ServiceProcessMetrics>)>,
170    /// A function providing the current wallclock time.
171    now: NowFn,
172
173    /// The URL for Persist PubSub.
174    persist_pubsub_url: String,
175
176    /// Arguments for secrets readers.
177    secrets_args: SecretsReaderCliArgs,
178
179    /// A map associating a global ID to the set of all the unfulfilled watch
180    /// set ids that include it.
181    ///
182    /// See [`Controller::install_compute_watch_set`]/[`Controller::install_storage_watch_set`] for a description of watch sets.
183    // When a watch set is fulfilled for a given object (that is, when
184    // the object's frontier advances to at least the watch set's
185    // timestamp), the corresponding entry will be removed from the set.
186    unfulfilled_watch_sets_by_object: BTreeMap<GlobalId, BTreeSet<WatchSetId>>,
187    /// A map of installed watch sets indexed by id.
188    unfulfilled_watch_sets: BTreeMap<WatchSetId, (BTreeSet<GlobalId>, T)>,
189    /// A sequence of numbers used to mint unique WatchSetIds.
190    watch_set_id_gen: Gen<WatchSetId>,
191
192    /// A list of watch sets that were already fulfilled as soon as
193    /// they were installed, and thus that must be returned to the
194    /// client on the next call to [`Controller::process_compute_response`]/[`Controller::process_storage_response`].
195    ///
196    /// See [`Controller::install_compute_watch_set`]/[`Controller::install_storage_watch_set`] for a description of watch sets.
197    immediate_watch_sets: Vec<WatchSetId>,
198
199    /// Dynamic system configuration.
200    dyncfg: ConfigSet,
201}
202
203impl<T: ComputeControllerTimestamp> Controller<T> {
204    /// Update the controller configuration.
205    pub fn update_configuration(&mut self, updates: ConfigUpdates) {
206        updates.apply(&self.dyncfg);
207    }
208
209    /// Start sinking the compute controller's introspection data into storage.
210    ///
211    /// This method should be called once the introspection collections have been registered with
212    /// the storage controller. It will panic if invoked earlier than that.
213    pub fn start_compute_introspection_sink(&mut self) {
214        self.compute.start_introspection_sink(&*self.storage);
215    }
216
217    /// Returns the connection context installed in the controller.
218    ///
219    /// This is purely a helper, and can be obtained from `self.storage`.
220    pub fn connection_context(&self) -> &ConnectionContext {
221        &self.storage.config().connection_context
222    }
223
224    /// Returns the storage configuration installed in the storage controller.
225    ///
226    /// This is purely a helper, and can be obtained from `self.storage`.
227    pub fn storage_configuration(&self) -> &StorageConfiguration {
228        self.storage.config()
229    }
230
231    /// Returns the state of the [`Controller`] formatted as JSON.
232    ///
233    /// The returned value is not guaranteed to be stable and may change at any point in time.
234    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
235        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
236        // returned object as a tradeoff between usability and stability. `serde_json` will fail
237        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
238        // prevents a future unrelated change from silently breaking this method.
239
240        // Destructure `self` here so we don't forget to consider dumping newly added fields.
241        let Self {
242            storage_collections,
243            storage,
244            compute,
245            clusterd_image: _,
246            init_container_image: _,
247            deploy_generation,
248            read_only,
249            orchestrator: _,
250            readiness,
251            metrics_tasks: _,
252            metrics_tx: _,
253            metrics_rx: _,
254            now: _,
255            persist_pubsub_url: _,
256            secrets_args: _,
257            unfulfilled_watch_sets_by_object: _,
258            unfulfilled_watch_sets,
259            watch_set_id_gen: _,
260            immediate_watch_sets,
261            dyncfg: _,
262        } = self;
263
264        let storage_collections = storage_collections.dump()?;
265        let storage = storage.dump()?;
266        let compute = compute.dump().await?;
267
268        let unfulfilled_watch_sets: BTreeMap<_, _> = unfulfilled_watch_sets
269            .iter()
270            .map(|(ws_id, watches)| (format!("{ws_id:?}"), format!("{watches:?}")))
271            .collect();
272        let immediate_watch_sets: Vec<_> = immediate_watch_sets
273            .iter()
274            .map(|watch| format!("{watch:?}"))
275            .collect();
276
277        Ok(serde_json::json!({
278            "storage_collections": storage_collections,
279            "storage": storage,
280            "compute": compute,
281            "deploy_generation": deploy_generation,
282            "read_only": read_only,
283            "readiness": format!("{readiness:?}"),
284            "unfulfilled_watch_sets": unfulfilled_watch_sets,
285            "immediate_watch_sets": immediate_watch_sets,
286        }))
287    }
288}
289
290impl<T> Controller<T>
291where
292    T: ComputeControllerTimestamp,
293{
294    pub fn update_orchestrator_scheduling_config(
295        &self,
296        config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
297    ) {
298        self.orchestrator.update_scheduling_config(config);
299    }
300    /// Marks the end of any initialization commands.
301    ///
302    /// The implementor may wait for this method to be called before implementing prior commands,
303    /// and so it is important for a user to invoke this method as soon as it is comfortable.
304    /// This method can be invoked immediately, at the potential expense of performance.
305    pub fn initialization_complete(&mut self) {
306        self.storage.initialization_complete();
307        self.compute.initialization_complete();
308    }
309
310    /// Reports whether the controller is in read only mode.
311    pub fn read_only(&self) -> bool {
312        self.read_only
313    }
314
315    /// Returns `Some` if there is an immediately available
316    /// internally-generated response that we need to return to the
317    /// client (as opposed to waiting for a response from compute or storage).
318    fn take_internal_response(&mut self) -> Option<ControllerResponse<T>> {
319        let ws = std::mem::take(&mut self.immediate_watch_sets);
320        (!ws.is_empty()).then_some(ControllerResponse::WatchSetFinished(ws))
321    }
322
323    /// Waits until the controller is ready to process a response.
324    ///
325    /// This method may block for an arbitrarily long time.
326    ///
327    /// When the method returns, the owner should call [`Controller::ready`] to
328    /// process the ready message.
329    ///
330    /// This method is cancellation safe.
331    pub async fn ready(&mut self) {
332        if let Readiness::NotReady = self.readiness {
333            // the coordinator wants to be able to make a simple
334            // sequence of ready, process, ready, process, .... calls,
335            // but the controller sometimes has responses immediately
336            // ready to be processed and should do so before calling
337            // into either of the lower-level controllers. This `if`
338            // statement handles that case.
339            if let Some(response) = self.take_internal_response() {
340                self.readiness = Readiness::Internal(response);
341            } else {
342                // The underlying `ready` methods are cancellation safe, so it is
343                // safe to construct this `select!`.
344                tokio::select! {
345                    () = self.storage.ready() => {
346                        self.readiness = Readiness::Storage;
347                    }
348                    () = self.compute.ready() => {
349                        self.readiness = Readiness::Compute;
350                    }
351                    Some(metrics) = self.metrics_rx.recv() => {
352                        self.readiness = Readiness::Metrics(metrics);
353                    }
354                }
355            }
356        }
357    }
358
359    /// Returns the [Readiness] status of this controller.
360    pub fn get_readiness(&self) -> &Readiness<T> {
361        &self.readiness
362    }
363
364    /// Install a _watch set_ in the controller.
365    ///
366    /// A _watch set_ is a request to be informed by the controller when
367    /// all of the frontiers of a particular set of objects have advanced at
368    /// least to a particular timestamp.
369    ///
370    /// When all the objects in `objects` have advanced to `t`, the watchset id
371    /// is returned to the client on the next call to [`Self::process`].
372    pub fn install_compute_watch_set(
373        &mut self,
374        mut objects: BTreeSet<GlobalId>,
375        t: T,
376    ) -> Result<WatchSetId, CollectionLookupError> {
377        let ws_id = self.watch_set_id_gen.allocate_id();
378
379        // Collect all frontiers first, returning any errors
380        let frontiers: BTreeMap<GlobalId, _> = objects
381            .iter()
382            .map(|id| {
383                self.compute
384                    .collection_frontiers(*id, None)
385                    .map(|f| (*id, f.write_frontier))
386            })
387            .collect::<Result<_, _>>()?;
388        objects.retain(|id| {
389            let frontier = frontiers.get(id).expect("just collected");
390            frontier.less_equal(&t)
391        });
392        if objects.is_empty() {
393            self.immediate_watch_sets.push(ws_id);
394        } else {
395            for id in objects.iter() {
396                self.unfulfilled_watch_sets_by_object
397                    .entry(*id)
398                    .or_default()
399                    .insert(ws_id);
400            }
401            self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
402        }
403
404        Ok(ws_id)
405    }
406
407    /// Install a _watch set_ in the controller.
408    ///
409    /// A _watch set_ is a request to be informed by the controller when
410    /// all of the frontiers of a particular set of objects have advanced at
411    /// least to a particular timestamp.
412    ///
413    /// When all the objects in `objects` have advanced to `t`, the watchset id
414    /// is returned to the client on the next call to [`Self::process`].
415    pub fn install_storage_watch_set(
416        &mut self,
417        mut objects: BTreeSet<GlobalId>,
418        t: T,
419    ) -> Result<WatchSetId, CollectionLookupError> {
420        let ws_id = self.watch_set_id_gen.allocate_id();
421
422        let uppers = self
423            .storage
424            .collections_frontiers(objects.iter().cloned().collect())?
425            .into_iter()
426            .map(|(id, _since, upper)| (id, upper))
427            .collect::<BTreeMap<_, _>>();
428
429        objects.retain(|id| {
430            let upper = uppers.get(id).expect("missing collection");
431            upper.less_equal(&t)
432        });
433        if objects.is_empty() {
434            self.immediate_watch_sets.push(ws_id);
435        } else {
436            for id in objects.iter() {
437                self.unfulfilled_watch_sets_by_object
438                    .entry(*id)
439                    .or_default()
440                    .insert(ws_id);
441            }
442            self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
443        }
444        Ok(ws_id)
445    }
446
447    /// Uninstalls a previously installed WatchSetId. The method is a no-op if the watch set has
448    /// already finished and therefore it's safe to call this function unconditionally.
449    ///
450    /// # Panics
451    /// This method panics if called with a WatchSetId that was never returned by the function.
452    pub fn uninstall_watch_set(&mut self, ws_id: &WatchSetId) {
453        if let Some((obj_ids, _)) = self.unfulfilled_watch_sets.remove(ws_id) {
454            for obj_id in obj_ids {
455                let mut entry = match self.unfulfilled_watch_sets_by_object.entry(obj_id) {
456                    Entry::Occupied(entry) => entry,
457                    Entry::Vacant(_) => panic!("corrupted watchset state"),
458                };
459                entry.get_mut().remove(ws_id);
460                if entry.get().is_empty() {
461                    entry.remove();
462                }
463            }
464        }
465    }
466
467    /// Process a pending response from the storage controller. If necessary,
468    /// return a higher-level response to our client.
469    fn process_storage_response(
470        &mut self,
471        storage_metadata: &StorageMetadata,
472    ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
473        let maybe_response = self.storage.process(storage_metadata)?;
474        Ok(maybe_response.and_then(
475            |mz_storage_client::controller::Response::FrontierUpdates(r)| {
476                self.handle_frontier_updates(&r)
477            },
478        ))
479    }
480
481    /// Process a pending response from the compute controller. If necessary,
482    /// return a higher-level response to our client.
483    fn process_compute_response(&mut self) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
484        let response = self.compute.process();
485
486        let response = response.and_then(|r| match r {
487            ComputeControllerResponse::PeekNotification(uuid, peek, otel_ctx) => {
488                Some(ControllerResponse::PeekNotification(uuid, peek, otel_ctx))
489            }
490            ComputeControllerResponse::SubscribeResponse(id, tail) => {
491                Some(ControllerResponse::SubscribeResponse(id, tail))
492            }
493            ComputeControllerResponse::CopyToResponse(id, tail) => {
494                Some(ControllerResponse::CopyToResponse(id, tail))
495            }
496            ComputeControllerResponse::FrontierUpper { id, upper } => {
497                self.handle_frontier_updates(&[(id, upper)])
498            }
499        });
500        Ok(response)
501    }
502
503    /// Processes the work queued by [`Controller::ready`].
504    ///
505    /// This method is guaranteed to return "quickly" unless doing so would
506    /// compromise the correctness of the system.
507    ///
508    /// This method is **not** guaranteed to be cancellation safe. It **must**
509    /// be awaited to completion.
510    #[mz_ore::instrument(level = "debug")]
511    pub fn process(
512        &mut self,
513        storage_metadata: &StorageMetadata,
514    ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
515        match mem::take(&mut self.readiness) {
516            Readiness::NotReady => Ok(None),
517            Readiness::Storage => self.process_storage_response(storage_metadata),
518            Readiness::Compute => self.process_compute_response(),
519            Readiness::Metrics((id, metrics)) => self.process_replica_metrics(id, metrics),
520            Readiness::Internal(message) => Ok(Some(message)),
521        }
522    }
523
524    /// Record updates to frontiers, and propagate any necessary responses.
525    /// As of this writing (2/29/2024), the only response that can be generated
526    /// from a frontier update is `WatchSetCompleted`.
527    fn handle_frontier_updates(
528        &mut self,
529        updates: &[(GlobalId, Antichain<T>)],
530    ) -> Option<ControllerResponse<T>> {
531        let mut finished = vec![];
532        for (obj_id, antichain) in updates {
533            let ws_ids = self.unfulfilled_watch_sets_by_object.entry(*obj_id);
534            if let Entry::Occupied(mut ws_ids) = ws_ids {
535                ws_ids.get_mut().retain(|ws_id| {
536                    let mut entry = match self.unfulfilled_watch_sets.entry(*ws_id) {
537                        Entry::Occupied(entry) => entry,
538                        Entry::Vacant(_) => panic!("corrupted watchset state"),
539                    };
540                    // If this object has made more progress than required by this watchset we:
541                    if !antichain.less_equal(&entry.get().1) {
542                        // 1. Remove the object from the set of pending objects for the watchset
543                        entry.get_mut().0.remove(obj_id);
544                        // 2. Mark the watchset as finished if this was the last watched object
545                        if entry.get().0.is_empty() {
546                            entry.remove();
547                            finished.push(*ws_id);
548                        }
549                        // 3. Remove the watchset from the set of pending watchsets for the object
550                        false
551                    } else {
552                        // Otherwise we keep the watchset around to re-check in the future
553                        true
554                    }
555                });
556                // Clear the entry if this was the last watchset that was interested in obj_id
557                if ws_ids.get().is_empty() {
558                    ws_ids.remove();
559                }
560            }
561        }
562        (!(finished.is_empty())).then(|| ControllerResponse::WatchSetFinished(finished))
563    }
564
565    fn process_replica_metrics(
566        &mut self,
567        id: ReplicaId,
568        metrics: Vec<ServiceProcessMetrics>,
569    ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
570        self.record_replica_metrics(id, &metrics);
571        Ok(None)
572    }
573
574    fn record_replica_metrics(&mut self, replica_id: ReplicaId, metrics: &[ServiceProcessMetrics]) {
575        if self.read_only() {
576            return;
577        }
578
579        let now = mz_ore::now::to_datetime((self.now)());
580        let now_tz = now.try_into().expect("must fit");
581
582        let replica_id = replica_id.to_string();
583        let mut row = Row::default();
584        let updates = metrics
585            .iter()
586            .enumerate()
587            .map(|(process_id, m)| {
588                row.packer().extend(&[
589                    Datum::String(&replica_id),
590                    Datum::UInt64(u64::cast_from(process_id)),
591                    m.cpu_nano_cores.into(),
592                    m.memory_bytes.into(),
593                    m.disk_bytes.into(),
594                    Datum::TimestampTz(now_tz),
595                    m.heap_bytes.into(),
596                    m.heap_limit.into(),
597                ]);
598                (row.clone(), mz_repr::Diff::ONE)
599            })
600            .collect();
601
602        self.storage
603            .append_introspection_updates(IntrospectionType::ReplicaMetricsHistory, updates);
604    }
605
606    /// Determine the "real-time recency" timestamp for all `ids`.
607    ///
608    /// Real-time recency is defined as the minimum value of `T` that all
609    /// objects can be queried at to return all data visible in the upstream
610    /// system the query was issued. In this case, "the upstream systems" are
611    /// any user sources that connect to objects outside of Materialize, such as
612    /// Kafka sources.
613    ///
614    /// If no items in `ids` connect to external systems, this function will
615    /// return `Ok(T::minimum)`.
616    pub async fn determine_real_time_recent_timestamp(
617        &self,
618        ids: BTreeSet<GlobalId>,
619        timeout: Duration,
620    ) -> Result<BoxFuture<'static, Result<T, StorageError<T>>>, StorageError<T>> {
621        self.storage.real_time_recent_timestamp(ids, timeout).await
622    }
623}
624
625impl<T> Controller<T>
626where
627    // Bounds needed by `StorageController` and/or `Controller`:
628    T: Timestamp
629        + Codec64
630        + From<EpochMillis>
631        + TimestampManipulation
632        + std::fmt::Display
633        + Into<mz_repr::Timestamp>,
634    // Bounds needed by `ComputeController`:
635    T: ComputeControllerTimestamp,
636{
637    /// Creates a new controller.
638    ///
639    /// For correctness, this function expects to have access to the mutations
640    /// to the `storage_txn` that occurred in [`prepare_initialization`].
641    ///
642    /// # Panics
643    /// If this function is called before [`prepare_initialization`].
644    #[instrument(name = "controller::new")]
645    pub async fn new(
646        config: ControllerConfig,
647        envd_epoch: NonZeroI64,
648        read_only: bool,
649        storage_txn: &dyn StorageTxn<T>,
650    ) -> Self {
651        if read_only {
652            tracing::info!("starting controllers in read-only mode!");
653        }
654
655        let now_fn = config.now.clone();
656        let wallclock_lag_fn = WallclockLagFn::new(now_fn);
657
658        let controller_metrics = ControllerMetrics::new(&config.metrics_registry);
659
660        let txns_metrics = Arc::new(TxnMetrics::new(&config.metrics_registry));
661        let collections_ctl = storage_collections::StorageCollectionsImpl::new(
662            config.persist_location.clone(),
663            Arc::clone(&config.persist_clients),
664            &config.metrics_registry,
665            config.now.clone(),
666            Arc::clone(&txns_metrics),
667            envd_epoch,
668            read_only,
669            config.connection_context.clone(),
670            storage_txn,
671        )
672        .await;
673
674        let collections_ctl: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync> =
675            Arc::new(collections_ctl);
676
677        let storage_controller = mz_storage_controller::Controller::new(
678            config.build_info,
679            config.persist_location.clone(),
680            config.persist_clients,
681            config.now.clone(),
682            wallclock_lag_fn.clone(),
683            Arc::clone(&txns_metrics),
684            read_only,
685            &config.metrics_registry,
686            controller_metrics.clone(),
687            config.connection_context,
688            storage_txn,
689            Arc::clone(&collections_ctl),
690        )
691        .await;
692
693        let storage_collections = Arc::clone(&collections_ctl);
694        let compute_controller = ComputeController::new(
695            config.build_info,
696            storage_collections,
697            read_only,
698            &config.metrics_registry,
699            config.persist_location,
700            controller_metrics,
701            config.now.clone(),
702            wallclock_lag_fn,
703        );
704        let (metrics_tx, metrics_rx) = mpsc::unbounded_channel();
705
706        let this = Self {
707            storage: Box::new(storage_controller),
708            storage_collections: collections_ctl,
709            compute: compute_controller,
710            clusterd_image: config.clusterd_image,
711            init_container_image: config.init_container_image,
712            deploy_generation: config.deploy_generation,
713            read_only,
714            orchestrator: config.orchestrator.namespace("cluster"),
715            readiness: Readiness::NotReady,
716            metrics_tasks: BTreeMap::new(),
717            metrics_tx,
718            metrics_rx,
719            now: config.now,
720            persist_pubsub_url: config.persist_pubsub_url,
721            secrets_args: config.secrets_args,
722            unfulfilled_watch_sets_by_object: BTreeMap::new(),
723            unfulfilled_watch_sets: BTreeMap::new(),
724            watch_set_id_gen: Gen::default(),
725            immediate_watch_sets: Vec::new(),
726            dyncfg: mz_dyncfgs::all_dyncfgs(),
727        };
728
729        if !this.read_only {
730            this.remove_past_generation_replicas_in_background();
731        }
732
733        this
734    }
735}