Skip to main content

mz_controller/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A representative of STORAGE and COMPUTE that maintains summaries of the involved objects.
11//!
12//! The `Controller` provides the ability to create and manipulate storage and compute instances.
13//! Each of Storage and Compute provide their own controllers, accessed through the `storage()`
14//! and `compute(instance_id)` methods. It is an error to access a compute instance before it has
15//! been created.
16//!
17//! The controller also provides a `recv()` method that returns responses from the storage and
18//! compute layers, which may remain of value to the interested user. With time, these responses
19//! may be thinned down in an effort to make the controller more self contained.
20//!
21//! Consult the `StorageController` and `ComputeController` documentation for more information
22//! about each of these interfaces.
23
24use std::collections::btree_map::Entry;
25use std::collections::{BTreeMap, BTreeSet};
26use std::mem;
27use std::num::NonZeroI64;
28use std::sync::Arc;
29use std::time::Duration;
30
31use futures::future::BoxFuture;
32use mz_build_info::BuildInfo;
33use mz_cluster_client::metrics::ControllerMetrics;
34use mz_cluster_client::{ReplicaId, WallclockLagFn};
35use mz_compute_client::controller::error::CollectionLookupError;
36use mz_compute_client::controller::{
37    ComputeController, ComputeControllerResponse, ComputeControllerTimestamp, PeekNotification,
38};
39use mz_compute_client::protocol::response::SubscribeBatch;
40use mz_controller_types::WatchSetId;
41use mz_dyncfg::{ConfigSet, ConfigUpdates};
42use mz_orchestrator::{NamespacedOrchestrator, Orchestrator, ServiceProcessMetrics};
43use mz_ore::cast::CastFrom;
44use mz_ore::id_gen::Gen;
45use mz_ore::instrument;
46use mz_ore::metrics::MetricsRegistry;
47use mz_ore::now::{EpochMillis, NowFn};
48use mz_ore::task::AbortOnDropHandle;
49use mz_ore::tracing::OpenTelemetryContext;
50use mz_persist_client::PersistLocation;
51use mz_persist_client::cache::PersistClientCache;
52use mz_persist_types::Codec64;
53use mz_repr::{Datum, GlobalId, Row, TimestampManipulation};
54use mz_service::secrets::SecretsReaderCliArgs;
55use mz_storage_client::controller::{
56    IntrospectionType, StorageController, StorageMetadata, StorageTxn,
57};
58use mz_storage_client::storage_collections::{self, StorageCollections};
59use mz_storage_types::configuration::StorageConfiguration;
60use mz_storage_types::connections::ConnectionContext;
61use mz_storage_types::controller::StorageError;
62use mz_txn_wal::metrics::Metrics as TxnMetrics;
63use timely::progress::{Antichain, Timestamp};
64use tokio::sync::mpsc;
65use uuid::Uuid;
66
67pub mod clusters;
68pub mod replica_http_locator;
69
70// Export this on behalf of the storage controller to provide a unified
71// interface, allowing other crates to depend on this crate alone.
72pub use mz_storage_controller::prepare_initialization;
73pub use replica_http_locator::ReplicaHttpLocator;
74
75/// Configures a controller.
76#[derive(Debug, Clone)]
77pub struct ControllerConfig {
78    /// The build information for this process.
79    pub build_info: &'static BuildInfo,
80    /// The orchestrator implementation to use.
81    pub orchestrator: Arc<dyn Orchestrator>,
82    /// The persist location where all storage collections will be written to.
83    pub persist_location: PersistLocation,
84    /// A process-global cache of (blob_uri, consensus_uri) ->
85    /// PersistClient.
86    /// This is intentionally shared between workers.
87    pub persist_clients: Arc<PersistClientCache>,
88    /// The clusterd image to use when starting new cluster processes.
89    pub clusterd_image: String,
90    /// The init container image to use for clusterd.
91    pub init_container_image: Option<String>,
92    /// A number representing the environment's generation.
93    ///
94    /// This is incremented to request that the new process perform a graceful
95    /// transition of power from the prior generation.
96    pub deploy_generation: u64,
97    /// The now function to advance the controller's introspection collections.
98    pub now: NowFn,
99    /// The metrics registry.
100    pub metrics_registry: MetricsRegistry,
101    /// The URL for Persist PubSub.
102    pub persist_pubsub_url: String,
103    /// Arguments for secrets readers.
104    pub secrets_args: SecretsReaderCliArgs,
105    /// The connection context, to thread through to clusterd, with cli flags.
106    pub connection_context: ConnectionContext,
107    /// Locator for HTTP addresses of cluster replicas, used to proxy HTTP
108    /// requests from environmentd to clusterd.
109    pub replica_http_locator: Arc<ReplicaHttpLocator>,
110}
111
112/// Responses that [`Controller`] can produce.
113#[derive(Debug)]
114pub enum ControllerResponse<T = mz_repr::Timestamp> {
115    /// Notification of a worker's response to a specified (by connection id) peek.
116    ///
117    /// Additionally, an `OpenTelemetryContext` to forward trace information
118    /// back into coord. This allows coord traces to be children of work
119    /// done in compute!
120    PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
121    /// The worker's next response to a specified subscribe.
122    SubscribeResponse(GlobalId, SubscribeBatch<T>),
123    /// The worker's next response to a specified copy to.
124    CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
125    /// Notification that a watch set has finished. See
126    /// [`Controller::install_compute_watch_set`] and
127    /// [`Controller::install_storage_watch_set`] for details.
128    WatchSetFinished(Vec<WatchSetId>),
129}
130
131/// Whether one of the underlying controllers is ready for their `process`
132/// method to be called.
133#[derive(Debug, Default)]
134pub enum Readiness<T> {
135    /// No underlying controllers are ready.
136    #[default]
137    NotReady,
138    /// The storage controller is ready.
139    Storage,
140    /// The compute controller is ready.
141    Compute,
142    /// A batch of metric data is ready.
143    Metrics((ReplicaId, Vec<ServiceProcessMetrics>)),
144    /// An internally-generated message is ready to be returned.
145    Internal(ControllerResponse<T>),
146}
147
148/// A client that maintains soft state and validates commands, in addition to forwarding them.
149pub struct Controller<T: ComputeControllerTimestamp = mz_repr::Timestamp> {
150    pub storage: Box<dyn StorageController<Timestamp = T>>,
151    pub storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
152    pub compute: ComputeController<T>,
153    /// The clusterd image to use when starting new cluster processes.
154    clusterd_image: String,
155    /// The init container image to use for clusterd.
156    init_container_image: Option<String>,
157    /// A number representing the environment's generation.
158    deploy_generation: u64,
159    /// Whether or not this controller is in read-only mode.
160    ///
161    /// When in read-only mode, neither this controller nor the instances
162    /// controlled by it are allowed to affect changes to external systems
163    /// (largely persist).
164    read_only: bool,
165    /// The cluster orchestrator.
166    orchestrator: Arc<dyn NamespacedOrchestrator>,
167    /// Tracks the readiness of the underlying controllers.
168    readiness: Readiness<T>,
169    /// Tasks for collecting replica metrics.
170    metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>,
171    /// Sender for the channel over which replica metrics are sent.
172    metrics_tx: mpsc::UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>,
173    /// Receiver for the channel over which replica metrics are sent.
174    metrics_rx: mpsc::UnboundedReceiver<(ReplicaId, Vec<ServiceProcessMetrics>)>,
175    /// A function providing the current wallclock time.
176    now: NowFn,
177
178    /// The URL for Persist PubSub.
179    persist_pubsub_url: String,
180
181    /// Arguments for secrets readers.
182    secrets_args: SecretsReaderCliArgs,
183
184    /// A map associating a global ID to the set of all the unfulfilled watch
185    /// set ids that include it.
186    ///
187    /// See [`Controller::install_compute_watch_set`]/[`Controller::install_storage_watch_set`] for a description of watch sets.
188    // When a watch set is fulfilled for a given object (that is, when
189    // the object's frontier advances to at least the watch set's
190    // timestamp), the corresponding entry will be removed from the set.
191    unfulfilled_watch_sets_by_object: BTreeMap<GlobalId, BTreeSet<WatchSetId>>,
192    /// A map of installed watch sets indexed by id.
193    unfulfilled_watch_sets: BTreeMap<WatchSetId, (BTreeSet<GlobalId>, T)>,
194    /// A sequence of numbers used to mint unique WatchSetIds.
195    watch_set_id_gen: Gen<WatchSetId>,
196
197    /// A list of watch sets that were already fulfilled as soon as
198    /// they were installed, and thus that must be returned to the
199    /// client on the next call to [`Controller::process_compute_response`]/[`Controller::process_storage_response`].
200    ///
201    /// See [`Controller::install_compute_watch_set`]/[`Controller::install_storage_watch_set`] for a description of watch sets.
202    immediate_watch_sets: Vec<WatchSetId>,
203
204    /// Dynamic system configuration.
205    dyncfg: ConfigSet,
206
207    /// Locator for HTTP addresses of cluster replicas.
208    replica_http_locator: Arc<ReplicaHttpLocator>,
209}
210
211impl<T: ComputeControllerTimestamp> Controller<T> {
212    /// Update the controller configuration.
213    pub fn update_configuration(&mut self, updates: ConfigUpdates) {
214        updates.apply(&self.dyncfg);
215    }
216
217    /// Start sinking the compute controller's introspection data into storage.
218    ///
219    /// This method should be called once the introspection collections have been registered with
220    /// the storage controller. It will panic if invoked earlier than that.
221    pub fn start_compute_introspection_sink(&mut self) {
222        self.compute.start_introspection_sink(&*self.storage);
223    }
224
225    /// Returns the connection context installed in the controller.
226    ///
227    /// This is purely a helper, and can be obtained from `self.storage`.
228    pub fn connection_context(&self) -> &ConnectionContext {
229        &self.storage.config().connection_context
230    }
231
232    /// Returns the storage configuration installed in the storage controller.
233    ///
234    /// This is purely a helper, and can be obtained from `self.storage`.
235    pub fn storage_configuration(&self) -> &StorageConfiguration {
236        self.storage.config()
237    }
238
239    /// Returns the state of the [`Controller`] formatted as JSON.
240    ///
241    /// The returned value is not guaranteed to be stable and may change at any point in time.
242    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
243        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
244        // returned object as a tradeoff between usability and stability. `serde_json` will fail
245        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
246        // prevents a future unrelated change from silently breaking this method.
247
248        // Destructure `self` here so we don't forget to consider dumping newly added fields.
249        let Self {
250            storage_collections,
251            storage,
252            compute,
253            clusterd_image: _,
254            init_container_image: _,
255            deploy_generation,
256            read_only,
257            orchestrator: _,
258            readiness,
259            metrics_tasks: _,
260            metrics_tx: _,
261            metrics_rx: _,
262            now: _,
263            persist_pubsub_url: _,
264            secrets_args: _,
265            unfulfilled_watch_sets_by_object: _,
266            unfulfilled_watch_sets,
267            watch_set_id_gen: _,
268            immediate_watch_sets,
269            dyncfg: _,
270            replica_http_locator: _,
271        } = self;
272
273        let storage_collections = storage_collections.dump()?;
274        let storage = storage.dump()?;
275        let compute = compute.dump().await?;
276
277        let unfulfilled_watch_sets: BTreeMap<_, _> = unfulfilled_watch_sets
278            .iter()
279            .map(|(ws_id, watches)| (format!("{ws_id:?}"), format!("{watches:?}")))
280            .collect();
281        let immediate_watch_sets: Vec<_> = immediate_watch_sets
282            .iter()
283            .map(|watch| format!("{watch:?}"))
284            .collect();
285
286        Ok(serde_json::json!({
287            "storage_collections": storage_collections,
288            "storage": storage,
289            "compute": compute,
290            "deploy_generation": deploy_generation,
291            "read_only": read_only,
292            "readiness": format!("{readiness:?}"),
293            "unfulfilled_watch_sets": unfulfilled_watch_sets,
294            "immediate_watch_sets": immediate_watch_sets,
295        }))
296    }
297}
298
299impl<T> Controller<T>
300where
301    T: ComputeControllerTimestamp,
302{
303    pub fn update_orchestrator_scheduling_config(
304        &self,
305        config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
306    ) {
307        self.orchestrator.update_scheduling_config(config);
308    }
309    /// Marks the end of any initialization commands.
310    ///
311    /// The implementor may wait for this method to be called before implementing prior commands,
312    /// and so it is important for a user to invoke this method as soon as it is comfortable.
313    /// This method can be invoked immediately, at the potential expense of performance.
314    pub fn initialization_complete(&mut self) {
315        self.storage.initialization_complete();
316        self.compute.initialization_complete();
317    }
318
319    /// Reports whether the controller is in read only mode.
320    pub fn read_only(&self) -> bool {
321        self.read_only
322    }
323
324    /// Returns `Some` if there is an immediately available
325    /// internally-generated response that we need to return to the
326    /// client (as opposed to waiting for a response from compute or storage).
327    fn take_internal_response(&mut self) -> Option<ControllerResponse<T>> {
328        let ws = std::mem::take(&mut self.immediate_watch_sets);
329        (!ws.is_empty()).then_some(ControllerResponse::WatchSetFinished(ws))
330    }
331
332    /// Waits until the controller is ready to process a response.
333    ///
334    /// This method may block for an arbitrarily long time.
335    ///
336    /// When the method returns, the owner should call [`Controller::ready`] to
337    /// process the ready message.
338    ///
339    /// This method is cancellation safe.
340    pub async fn ready(&mut self) {
341        if let Readiness::NotReady = self.readiness {
342            // the coordinator wants to be able to make a simple
343            // sequence of ready, process, ready, process, .... calls,
344            // but the controller sometimes has responses immediately
345            // ready to be processed and should do so before calling
346            // into either of the lower-level controllers. This `if`
347            // statement handles that case.
348            if let Some(response) = self.take_internal_response() {
349                self.readiness = Readiness::Internal(response);
350            } else {
351                // The underlying `ready` methods are cancellation safe, so it is
352                // safe to construct this `select!`.
353                tokio::select! {
354                    () = self.storage.ready() => {
355                        self.readiness = Readiness::Storage;
356                    }
357                    () = self.compute.ready() => {
358                        self.readiness = Readiness::Compute;
359                    }
360                    Some(metrics) = self.metrics_rx.recv() => {
361                        self.readiness = Readiness::Metrics(metrics);
362                    }
363                }
364            }
365        }
366    }
367
368    /// Returns the [Readiness] status of this controller.
369    pub fn get_readiness(&self) -> &Readiness<T> {
370        &self.readiness
371    }
372
373    /// Install a _watch set_ in the controller.
374    ///
375    /// A _watch set_ is a request to be informed by the controller when
376    /// all of the frontiers of a particular set of objects have advanced at
377    /// least to a particular timestamp.
378    ///
379    /// When all the objects in `objects` have advanced to `t`, the watchset id
380    /// is returned to the client on the next call to [`Self::process`].
381    pub fn install_compute_watch_set(
382        &mut self,
383        mut objects: BTreeSet<GlobalId>,
384        t: T,
385    ) -> Result<WatchSetId, CollectionLookupError> {
386        let ws_id = self.watch_set_id_gen.allocate_id();
387
388        // Collect all frontiers first, returning any errors
389        let frontiers: BTreeMap<GlobalId, _> = objects
390            .iter()
391            .map(|id| {
392                self.compute
393                    .collection_frontiers(*id, None)
394                    .map(|f| (*id, f.write_frontier))
395            })
396            .collect::<Result<_, _>>()?;
397        objects.retain(|id| {
398            let frontier = frontiers.get(id).expect("just collected");
399            frontier.less_equal(&t)
400        });
401        if objects.is_empty() {
402            self.immediate_watch_sets.push(ws_id);
403        } else {
404            for id in objects.iter() {
405                self.unfulfilled_watch_sets_by_object
406                    .entry(*id)
407                    .or_default()
408                    .insert(ws_id);
409            }
410            self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
411        }
412
413        Ok(ws_id)
414    }
415
416    /// Install a _watch set_ in the controller.
417    ///
418    /// A _watch set_ is a request to be informed by the controller when
419    /// all of the frontiers of a particular set of objects have advanced at
420    /// least to a particular timestamp.
421    ///
422    /// When all the objects in `objects` have advanced to `t`, the watchset id
423    /// is returned to the client on the next call to [`Self::process`].
424    pub fn install_storage_watch_set(
425        &mut self,
426        mut objects: BTreeSet<GlobalId>,
427        t: T,
428    ) -> Result<WatchSetId, CollectionLookupError> {
429        let ws_id = self.watch_set_id_gen.allocate_id();
430
431        let uppers = self
432            .storage
433            .collections_frontiers(objects.iter().cloned().collect())?
434            .into_iter()
435            .map(|(id, _since, upper)| (id, upper))
436            .collect::<BTreeMap<_, _>>();
437
438        objects.retain(|id| {
439            let upper = uppers.get(id).expect("missing collection");
440            upper.less_equal(&t)
441        });
442        if objects.is_empty() {
443            self.immediate_watch_sets.push(ws_id);
444        } else {
445            for id in objects.iter() {
446                self.unfulfilled_watch_sets_by_object
447                    .entry(*id)
448                    .or_default()
449                    .insert(ws_id);
450            }
451            self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
452        }
453        Ok(ws_id)
454    }
455
456    /// Uninstalls a previously installed WatchSetId. The method is a no-op if the watch set has
457    /// already finished and therefore it's safe to call this function unconditionally.
458    ///
459    /// # Panics
460    /// This method panics if called with a WatchSetId that was never returned by the function.
461    pub fn uninstall_watch_set(&mut self, ws_id: &WatchSetId) {
462        if let Some((obj_ids, _)) = self.unfulfilled_watch_sets.remove(ws_id) {
463            for obj_id in obj_ids {
464                let mut entry = match self.unfulfilled_watch_sets_by_object.entry(obj_id) {
465                    Entry::Occupied(entry) => entry,
466                    Entry::Vacant(_) => panic!("corrupted watchset state"),
467                };
468                entry.get_mut().remove(ws_id);
469                if entry.get().is_empty() {
470                    entry.remove();
471                }
472            }
473        }
474    }
475
476    /// Process a pending response from the storage controller. If necessary,
477    /// return a higher-level response to our client.
478    fn process_storage_response(
479        &mut self,
480        storage_metadata: &StorageMetadata,
481    ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
482        let maybe_response = self.storage.process(storage_metadata)?;
483        Ok(maybe_response.and_then(
484            |mz_storage_client::controller::Response::FrontierUpdates(r)| {
485                self.handle_frontier_updates(&r)
486            },
487        ))
488    }
489
490    /// Process a pending response from the compute controller. If necessary,
491    /// return a higher-level response to our client.
492    fn process_compute_response(&mut self) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
493        let response = self.compute.process();
494
495        let response = response.and_then(|r| match r {
496            ComputeControllerResponse::PeekNotification(uuid, peek, otel_ctx) => {
497                Some(ControllerResponse::PeekNotification(uuid, peek, otel_ctx))
498            }
499            ComputeControllerResponse::SubscribeResponse(id, tail) => {
500                Some(ControllerResponse::SubscribeResponse(id, tail))
501            }
502            ComputeControllerResponse::CopyToResponse(id, tail) => {
503                Some(ControllerResponse::CopyToResponse(id, tail))
504            }
505            ComputeControllerResponse::FrontierUpper { id, upper } => {
506                self.handle_frontier_updates(&[(id, upper)])
507            }
508        });
509        Ok(response)
510    }
511
512    /// Processes the work queued by [`Controller::ready`].
513    ///
514    /// This method is guaranteed to return "quickly" unless doing so would
515    /// compromise the correctness of the system.
516    ///
517    /// This method is **not** guaranteed to be cancellation safe. It **must**
518    /// be awaited to completion.
519    #[mz_ore::instrument(level = "debug")]
520    pub fn process(
521        &mut self,
522        storage_metadata: &StorageMetadata,
523    ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
524        match mem::take(&mut self.readiness) {
525            Readiness::NotReady => Ok(None),
526            Readiness::Storage => self.process_storage_response(storage_metadata),
527            Readiness::Compute => self.process_compute_response(),
528            Readiness::Metrics((id, metrics)) => self.process_replica_metrics(id, metrics),
529            Readiness::Internal(message) => Ok(Some(message)),
530        }
531    }
532
533    /// Record updates to frontiers, and propagate any necessary responses.
534    /// As of this writing (2/29/2024), the only response that can be generated
535    /// from a frontier update is `WatchSetCompleted`.
536    fn handle_frontier_updates(
537        &mut self,
538        updates: &[(GlobalId, Antichain<T>)],
539    ) -> Option<ControllerResponse<T>> {
540        let mut finished = vec![];
541        for (obj_id, antichain) in updates {
542            let ws_ids = self.unfulfilled_watch_sets_by_object.entry(*obj_id);
543            if let Entry::Occupied(mut ws_ids) = ws_ids {
544                ws_ids.get_mut().retain(|ws_id| {
545                    let mut entry = match self.unfulfilled_watch_sets.entry(*ws_id) {
546                        Entry::Occupied(entry) => entry,
547                        Entry::Vacant(_) => panic!("corrupted watchset state"),
548                    };
549                    // If this object has made more progress than required by this watchset we:
550                    if !antichain.less_equal(&entry.get().1) {
551                        // 1. Remove the object from the set of pending objects for the watchset
552                        entry.get_mut().0.remove(obj_id);
553                        // 2. Mark the watchset as finished if this was the last watched object
554                        if entry.get().0.is_empty() {
555                            entry.remove();
556                            finished.push(*ws_id);
557                        }
558                        // 3. Remove the watchset from the set of pending watchsets for the object
559                        false
560                    } else {
561                        // Otherwise we keep the watchset around to re-check in the future
562                        true
563                    }
564                });
565                // Clear the entry if this was the last watchset that was interested in obj_id
566                if ws_ids.get().is_empty() {
567                    ws_ids.remove();
568                }
569            }
570        }
571        (!(finished.is_empty())).then(|| ControllerResponse::WatchSetFinished(finished))
572    }
573
574    fn process_replica_metrics(
575        &mut self,
576        id: ReplicaId,
577        metrics: Vec<ServiceProcessMetrics>,
578    ) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
579        self.record_replica_metrics(id, &metrics);
580        Ok(None)
581    }
582
583    fn record_replica_metrics(&mut self, replica_id: ReplicaId, metrics: &[ServiceProcessMetrics]) {
584        if self.read_only() {
585            return;
586        }
587
588        let now = mz_ore::now::to_datetime((self.now)());
589        let now_tz = now.try_into().expect("must fit");
590
591        let replica_id = replica_id.to_string();
592        let mut row = Row::default();
593        let updates = metrics
594            .iter()
595            .enumerate()
596            .map(|(process_id, m)| {
597                row.packer().extend(&[
598                    Datum::String(&replica_id),
599                    Datum::UInt64(u64::cast_from(process_id)),
600                    m.cpu_nano_cores.into(),
601                    m.memory_bytes.into(),
602                    m.disk_bytes.into(),
603                    Datum::TimestampTz(now_tz),
604                    m.heap_bytes.into(),
605                    m.heap_limit.into(),
606                ]);
607                (row.clone(), mz_repr::Diff::ONE)
608            })
609            .collect();
610
611        self.storage
612            .append_introspection_updates(IntrospectionType::ReplicaMetricsHistory, updates);
613    }
614
615    /// Determine the "real-time recency" timestamp for all `ids`.
616    ///
617    /// Real-time recency is defined as the minimum value of `T` that all
618    /// objects can be queried at to return all data visible in the upstream
619    /// system the query was issued. In this case, "the upstream systems" are
620    /// any user sources that connect to objects outside of Materialize, such as
621    /// Kafka sources.
622    ///
623    /// If no items in `ids` connect to external systems, this function will
624    /// return `Ok(T::minimum)`.
625    pub async fn determine_real_time_recent_timestamp(
626        &self,
627        ids: BTreeSet<GlobalId>,
628        timeout: Duration,
629    ) -> Result<BoxFuture<'static, Result<T, StorageError<T>>>, StorageError<T>> {
630        self.storage.real_time_recent_timestamp(ids, timeout).await
631    }
632}
633
634impl<T> Controller<T>
635where
636    // Bounds needed by `StorageController` and/or `Controller`:
637    T: Timestamp
638        + Codec64
639        + From<EpochMillis>
640        + TimestampManipulation
641        + std::fmt::Display
642        + Into<mz_repr::Timestamp>,
643    // Bounds needed by `ComputeController`:
644    T: ComputeControllerTimestamp,
645{
646    /// Creates a new controller.
647    ///
648    /// For correctness, this function expects to have access to the mutations
649    /// to the `storage_txn` that occurred in [`prepare_initialization`].
650    ///
651    /// # Panics
652    /// If this function is called before [`prepare_initialization`].
653    #[instrument(name = "controller::new")]
654    pub async fn new(
655        config: ControllerConfig,
656        envd_epoch: NonZeroI64,
657        read_only: bool,
658        storage_txn: &dyn StorageTxn<T>,
659    ) -> Self {
660        if read_only {
661            tracing::info!("starting controllers in read-only mode!");
662        }
663
664        let now_fn = config.now.clone();
665        let wallclock_lag_fn = WallclockLagFn::new(now_fn);
666
667        let controller_metrics = ControllerMetrics::new(&config.metrics_registry);
668
669        let txns_metrics = Arc::new(TxnMetrics::new(&config.metrics_registry));
670        let collections_ctl = storage_collections::StorageCollectionsImpl::new(
671            config.persist_location.clone(),
672            Arc::clone(&config.persist_clients),
673            &config.metrics_registry,
674            config.now.clone(),
675            Arc::clone(&txns_metrics),
676            envd_epoch,
677            read_only,
678            config.connection_context.clone(),
679            storage_txn,
680        )
681        .await;
682
683        let collections_ctl: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync> =
684            Arc::new(collections_ctl);
685
686        let storage_controller = mz_storage_controller::Controller::new(
687            config.build_info,
688            config.persist_location.clone(),
689            config.persist_clients,
690            config.now.clone(),
691            wallclock_lag_fn.clone(),
692            Arc::clone(&txns_metrics),
693            read_only,
694            &config.metrics_registry,
695            controller_metrics.clone(),
696            config.connection_context,
697            storage_txn,
698            Arc::clone(&collections_ctl),
699        )
700        .await;
701
702        let storage_collections = Arc::clone(&collections_ctl);
703        let compute_controller = ComputeController::new(
704            config.build_info,
705            storage_collections,
706            read_only,
707            &config.metrics_registry,
708            config.persist_location,
709            controller_metrics,
710            config.now.clone(),
711            wallclock_lag_fn,
712        );
713        let (metrics_tx, metrics_rx) = mpsc::unbounded_channel();
714
715        let this = Self {
716            storage: Box::new(storage_controller),
717            storage_collections: collections_ctl,
718            compute: compute_controller,
719            clusterd_image: config.clusterd_image,
720            init_container_image: config.init_container_image,
721            deploy_generation: config.deploy_generation,
722            read_only,
723            orchestrator: config.orchestrator.namespace("cluster"),
724            readiness: Readiness::NotReady,
725            metrics_tasks: BTreeMap::new(),
726            metrics_tx,
727            metrics_rx,
728            now: config.now,
729            persist_pubsub_url: config.persist_pubsub_url,
730            secrets_args: config.secrets_args,
731            unfulfilled_watch_sets_by_object: BTreeMap::new(),
732            unfulfilled_watch_sets: BTreeMap::new(),
733            watch_set_id_gen: Gen::default(),
734            immediate_watch_sets: Vec::new(),
735            dyncfg: mz_dyncfgs::all_dyncfgs(),
736            replica_http_locator: config.replica_http_locator,
737        };
738
739        if !this.read_only {
740            this.remove_past_generation_replicas_in_background();
741        }
742
743        this
744    }
745}