mz_compute_client/
controller.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller that provides an interface to the compute layer, and the storage layer below it.
11//!
12//! The compute controller manages the creation, maintenance, and removal of compute instances.
13//! This involves ensuring the intended service state with the orchestrator, as well as maintaining
14//! a dedicated compute instance controller for each active compute instance.
15//!
16//! For each compute instance, the compute controller curates the creation of indexes and sinks
17//! installed on the instance, the progress of readers through these collections, and their
18//! eventual dropping and resource reclamation.
19//!
20//! The state maintained for a compute instance can be viewed as a partial map from `GlobalId` to
21//! collection. It is an error to use an identifier before it has been "created" with
22//! `create_dataflow()`. Once created, the controller holds a read capability for each output
23//! collection of a dataflow, which is manipulated with `set_read_policy()`. Eventually, a
24//! collection is dropped with `drop_collections()`.
25//!
26//! Created dataflows will prevent the compaction of their inputs, including other compute
27//! collections but also collections managed by the storage layer. Each dataflow input is prevented
28//! from compacting beyond the allowed compaction of each of its outputs, ensuring that we can
29//! recover each dataflow to its current state in case of failure or other reconfiguration.
30
31use std::collections::{BTreeMap, BTreeSet};
32use std::sync::{Arc, Mutex};
33use std::time::Duration;
34
35use futures::stream::FuturesUnordered;
36use futures::{FutureExt, StreamExt};
37use mz_build_info::BuildInfo;
38use mz_cluster_client::client::ClusterReplicaLocation;
39use mz_cluster_client::metrics::ControllerMetrics;
40use mz_cluster_client::{ReplicaId, WallclockLagFn};
41use mz_compute_types::ComputeInstanceId;
42use mz_compute_types::config::ComputeReplicaConfig;
43use mz_compute_types::dataflows::DataflowDescription;
44use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
45use mz_dyncfg::ConfigSet;
46use mz_expr::RowSetFinishing;
47use mz_ore::cast::CastFrom;
48use mz_ore::collections::CollectionExt;
49use mz_ore::metrics::MetricsRegistry;
50use mz_ore::now::NowFn;
51use mz_ore::tracing::OpenTelemetryContext;
52use mz_persist_types::PersistLocation;
53use mz_repr::{Datum, GlobalId, RelationDesc, Row, TimestampManipulation};
54use mz_storage_client::controller::StorageController;
55use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
56use mz_storage_types::read_holds::ReadHold;
57use mz_storage_types::read_policy::ReadPolicy;
58use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
59use prometheus::proto::LabelPair;
60use serde::{Deserialize, Serialize};
61use timely::PartialOrder;
62use timely::progress::{Antichain, Timestamp};
63use tokio::sync::{mpsc, oneshot};
64use tokio::time::{self, MissedTickBehavior};
65use tracing::debug_span;
66use uuid::Uuid;
67
68use crate::controller::error::{
69    CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
70    HydrationCheckBadTarget, InstanceExists, InstanceMissing, PeekError, ReadPolicyError,
71    ReplicaCreationError, ReplicaDropError,
72};
73use crate::controller::instance::{Instance, SharedCollectionState};
74use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection_sink};
75use crate::controller::replica::ReplicaConfig;
76use crate::logging::{LogVariant, LoggingConfig};
77use crate::metrics::ComputeControllerMetrics;
78use crate::protocol::command::{ComputeParameters, PeekTarget};
79use crate::protocol::response::{PeekResponse, SubscribeBatch};
80use crate::service::{ComputeClient, ComputeGrpcClient};
81
82mod instance;
83mod introspection;
84mod replica;
85mod sequential_hydration;
86
87pub mod error;
88
89pub(crate) type StorageCollections<T> = Arc<
90    dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T> + Send + Sync,
91>;
92
93/// A composite trait for types that serve as timestamps in the Compute Controller.
94/// `Into<Datum<'a>>` is needed for writing timestamps to introspection collections.
95pub trait ComputeControllerTimestamp: TimestampManipulation + Into<Datum<'static>> + Sync {}
96
97impl ComputeControllerTimestamp for mz_repr::Timestamp {}
98
99/// Responses from the compute controller.
100#[derive(Debug)]
101pub enum ComputeControllerResponse<T> {
102    /// See [`PeekNotification`].
103    PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
104    /// See [`crate::protocol::response::ComputeResponse::SubscribeResponse`].
105    SubscribeResponse(GlobalId, SubscribeBatch<T>),
106    /// The response from a dataflow containing an `CopyToS3Oneshot` sink.
107    ///
108    /// The `GlobalId` identifies the sink. The `Result` is the response from
109    /// the sink, where an `Ok(n)` indicates that `n` rows were successfully
110    /// copied to S3 and an `Err` indicates that an error was encountered
111    /// during the copy operation.
112    ///
113    /// For a given `CopyToS3Oneshot` sink, there will be at most one `CopyToResponse`
114    /// produced. (The sink may produce no responses if its dataflow is dropped
115    /// before completion.)
116    CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
117    /// A response reporting advancement of a collection's upper frontier.
118    ///
119    /// Once a collection's upper (aka "write frontier") has advanced to beyond a given time, the
120    /// contents of the collection as of that time have been sealed and cannot change anymore.
121    FrontierUpper {
122        /// The ID of a compute collection.
123        id: GlobalId,
124        /// The new upper frontier of the identified compute collection.
125        upper: Antichain<T>,
126    },
127}
128
129/// Notification and summary of a received and forwarded [`crate::protocol::response::ComputeResponse::PeekResponse`].
130#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
131pub enum PeekNotification {
132    /// Returned rows of a successful peek.
133    Success {
134        /// Number of rows in the returned peek result.
135        rows: u64,
136        /// Size of the returned peek result in bytes.
137        result_size: u64,
138    },
139    /// Error of an unsuccessful peek, including the reason for the error.
140    Error(String),
141    /// The peek was canceled.
142    Canceled,
143}
144
145impl PeekNotification {
146    /// Construct a new [`PeekNotification`] from a [`PeekResponse`]. The `offset` and `limit`
147    /// parameters are used to calculate the number of rows in the peek result.
148    fn new(peek_response: &PeekResponse, offset: usize, limit: Option<usize>) -> Self {
149        match peek_response {
150            PeekResponse::Rows(rows) => {
151                let num_rows = u64::cast_from(rows.count(offset, limit));
152                let result_size = u64::cast_from(rows.byte_len());
153
154                tracing::trace!(?num_rows, ?result_size, "inline peek result");
155
156                Self::Success {
157                    rows: num_rows,
158                    result_size,
159                }
160            }
161            PeekResponse::Stashed(stashed_response) => {
162                let rows = stashed_response.num_rows(offset, limit);
163                let result_size = stashed_response.size_bytes();
164
165                tracing::trace!(?rows, ?result_size, "stashed peek result");
166
167                Self::Success {
168                    rows: u64::cast_from(rows),
169                    result_size: u64::cast_from(result_size),
170                }
171            }
172            PeekResponse::Error(err) => Self::Error(err.clone()),
173            PeekResponse::Canceled => Self::Canceled,
174        }
175    }
176}
177
178/// A controller for the compute layer.
179pub struct ComputeController<T: ComputeControllerTimestamp> {
180    instances: BTreeMap<ComputeInstanceId, InstanceState<T>>,
181    /// A map from an instance ID to an arbitrary string that describes the
182    /// class of the workload that compute instance is running (e.g.,
183    /// `production` or `staging`).
184    instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>,
185    build_info: &'static BuildInfo,
186    /// A handle providing access to storage collections.
187    storage_collections: StorageCollections<T>,
188    /// Set to `true` once `initialization_complete` has been called.
189    initialized: bool,
190    /// Whether or not this controller is in read-only mode.
191    ///
192    /// When in read-only mode, neither this controller nor the instances
193    /// controlled by it are allowed to affect changes to external systems
194    /// (largely persist).
195    read_only: bool,
196    /// Compute configuration to apply to new instances.
197    config: ComputeParameters,
198    /// The persist location where we can stash large peek results.
199    peek_stash_persist_location: PersistLocation,
200    /// A controller response to be returned on the next call to [`ComputeController::process`].
201    stashed_response: Option<ComputeControllerResponse<T>>,
202    /// The compute controller metrics.
203    metrics: ComputeControllerMetrics,
204    /// A function that produces the current wallclock time.
205    now: NowFn,
206    /// A function that computes the lag between the given time and wallclock time.
207    wallclock_lag: WallclockLagFn<T>,
208    /// Dynamic system configuration.
209    ///
210    /// Updated through `ComputeController::update_configuration` calls and shared with all
211    /// subcomponents of the compute controller.
212    dyncfg: Arc<ConfigSet>,
213
214    /// Receiver for responses produced by `Instance`s.
215    response_rx: mpsc::UnboundedReceiver<ComputeControllerResponse<T>>,
216    /// Response sender that's passed to new `Instance`s.
217    response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
218    /// Receiver for introspection updates produced by `Instance`s.
219    ///
220    /// When [`ComputeController::start_introspection_sink`] is first called, this receiver is
221    /// passed to the introspection sink task.
222    introspection_rx: Option<mpsc::UnboundedReceiver<IntrospectionUpdates>>,
223    /// Introspection updates sender that's passed to new `Instance`s.
224    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
225
226    /// Ticker for scheduling periodic maintenance work.
227    maintenance_ticker: tokio::time::Interval,
228    /// Whether maintenance work was scheduled.
229    maintenance_scheduled: bool,
230}
231
232impl<T: ComputeControllerTimestamp> ComputeController<T> {
233    /// Construct a new [`ComputeController`].
234    pub fn new(
235        build_info: &'static BuildInfo,
236        storage_collections: StorageCollections<T>,
237        read_only: bool,
238        metrics_registry: &MetricsRegistry,
239        peek_stash_persist_location: PersistLocation,
240        controller_metrics: ControllerMetrics,
241        now: NowFn,
242        wallclock_lag: WallclockLagFn<T>,
243    ) -> Self {
244        let (response_tx, response_rx) = mpsc::unbounded_channel();
245        let (introspection_tx, introspection_rx) = mpsc::unbounded_channel();
246
247        let mut maintenance_ticker = time::interval(Duration::from_secs(1));
248        maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
249
250        let instance_workload_classes = Arc::new(Mutex::new(BTreeMap::<
251            ComputeInstanceId,
252            Option<String>,
253        >::new()));
254
255        // Apply a `workload_class` label to all metrics in the registry that
256        // have an `instance_id` label for an instance whose workload class is
257        // known.
258        metrics_registry.register_postprocessor({
259            let instance_workload_classes = Arc::clone(&instance_workload_classes);
260            move |metrics| {
261                let instance_workload_classes = instance_workload_classes
262                    .lock()
263                    .expect("lock poisoned")
264                    .iter()
265                    .map(|(id, workload_class)| (id.to_string(), workload_class.clone()))
266                    .collect::<BTreeMap<String, Option<String>>>();
267                for metric in metrics {
268                    'metric: for metric in metric.mut_metric() {
269                        for label in metric.get_label() {
270                            if label.get_name() == "instance_id" {
271                                if let Some(workload_class) = instance_workload_classes
272                                    .get(label.get_value())
273                                    .cloned()
274                                    .flatten()
275                                {
276                                    let mut label = LabelPair::default();
277                                    label.set_name("workload_class".into());
278                                    label.set_value(workload_class.clone());
279
280                                    let mut labels = metric.take_label();
281                                    labels.push(label);
282                                    metric.set_label(labels);
283                                }
284                                continue 'metric;
285                            }
286                        }
287                    }
288                }
289            }
290        });
291
292        let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics);
293
294        Self {
295            instances: BTreeMap::new(),
296            instance_workload_classes,
297            build_info,
298            storage_collections,
299            initialized: false,
300            read_only,
301            config: Default::default(),
302            peek_stash_persist_location,
303            stashed_response: None,
304            metrics,
305            now,
306            wallclock_lag,
307            dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()),
308            response_rx,
309            response_tx,
310            introspection_rx: Some(introspection_rx),
311            introspection_tx,
312            maintenance_ticker,
313            maintenance_scheduled: false,
314        }
315    }
316
317    /// Start sinking the compute controller's introspection data into storage.
318    ///
319    /// This method should be called once the introspection collections have been registered with
320    /// the storage controller. It will panic if invoked earlier than that.
321    pub fn start_introspection_sink(
322        &mut self,
323        storage_controller: &dyn StorageController<Timestamp = T>,
324    ) {
325        if let Some(rx) = self.introspection_rx.take() {
326            spawn_introspection_sink(rx, storage_controller);
327        }
328    }
329
330    /// TODO(database-issues#7533): Add documentation.
331    pub fn instance_exists(&self, id: ComputeInstanceId) -> bool {
332        self.instances.contains_key(&id)
333    }
334
335    /// Return a reference to the indicated compute instance.
336    fn instance(&self, id: ComputeInstanceId) -> Result<&InstanceState<T>, InstanceMissing> {
337        self.instances.get(&id).ok_or(InstanceMissing(id))
338    }
339
340    /// Return a mutable reference to the indicated compute instance.
341    fn instance_mut(
342        &mut self,
343        id: ComputeInstanceId,
344    ) -> Result<&mut InstanceState<T>, InstanceMissing> {
345        self.instances.get_mut(&id).ok_or(InstanceMissing(id))
346    }
347
348    /// List the IDs of all collections in the identified compute instance.
349    pub fn collection_ids(
350        &self,
351        instance_id: ComputeInstanceId,
352    ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
353        let instance = self.instance(instance_id)?;
354        let ids = instance.collections.keys().copied();
355        Ok(ids)
356    }
357
358    /// Return the frontiers of the indicated collection.
359    ///
360    /// If an `instance_id` is provided, the collection is assumed to be installed on that
361    /// instance. Otherwise all available instances are searched.
362    pub fn collection_frontiers(
363        &self,
364        collection_id: GlobalId,
365        instance_id: Option<ComputeInstanceId>,
366    ) -> Result<CollectionFrontiers<T>, CollectionLookupError> {
367        let collection = match instance_id {
368            Some(id) => self.instance(id)?.collection(collection_id)?,
369            None => self
370                .instances
371                .values()
372                .find_map(|i| i.collections.get(&collection_id))
373                .ok_or(CollectionMissing(collection_id))?,
374        };
375
376        Ok(collection.frontiers())
377    }
378
379    /// List compute collections that depend on the given collection.
380    pub fn collection_reverse_dependencies(
381        &self,
382        instance_id: ComputeInstanceId,
383        id: GlobalId,
384    ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
385        let instance = self.instance(instance_id)?;
386        let collections = instance.collections.iter();
387        let ids = collections
388            .filter_map(move |(cid, c)| c.compute_dependencies.contains(&id).then_some(*cid));
389        Ok(ids)
390    }
391
392    /// Returns `true` if all non-transient, non-excluded collections on all clusters have been
393    /// hydrated.
394    ///
395    /// For this check, zero-replica clusters are always considered hydrated.
396    /// Their collections would never normally be considered hydrated but it's
397    /// clearly intentional that they have no replicas.
398    pub async fn clusters_hydrated(&self, exclude_collections: &BTreeSet<GlobalId>) -> bool {
399        let instances = self.instances.iter();
400        let mut pending: FuturesUnordered<_> = instances
401            .map(|(id, instance)| {
402                let exclude_collections = exclude_collections.clone();
403                instance
404                    .call_sync(move |i| i.collections_hydrated(&exclude_collections))
405                    .map(move |x| (id, x))
406            })
407            .collect();
408
409        let mut result = true;
410        while let Some((id, hydrated)) = pending.next().await {
411            if !hydrated {
412                result = false;
413
414                // We continue with our loop instead of breaking out early, so
415                // that we log all non-hydrated clusters.
416                tracing::info!("cluster {id} is not hydrated");
417            }
418        }
419
420        result
421    }
422
423    /// Returns `true` iff the given collection has been hydrated.
424    ///
425    /// For this check, zero-replica clusters are always considered hydrated.
426    /// Their collections would never normally be considered hydrated but it's
427    /// clearly intentional that they have no replicas.
428    pub async fn collection_hydrated(
429        &self,
430        instance_id: ComputeInstanceId,
431        collection_id: GlobalId,
432    ) -> Result<bool, anyhow::Error> {
433        let instance = self.instance(instance_id)?;
434
435        let res = instance
436            .call_sync(move |i| i.collection_hydrated(collection_id))
437            .await?;
438
439        Ok(res)
440    }
441
442    /// Returns `true` if all non-transient, non-excluded collections are hydrated on any of the
443    /// provided replicas.
444    ///
445    /// For this check, zero-replica clusters are always considered hydrated.
446    /// Their collections would never normally be considered hydrated but it's
447    /// clearly intentional that they have no replicas.
448    pub fn collections_hydrated_for_replicas(
449        &self,
450        instance_id: ComputeInstanceId,
451        replicas: Vec<ReplicaId>,
452        exclude_collections: BTreeSet<GlobalId>,
453    ) -> Result<oneshot::Receiver<bool>, anyhow::Error> {
454        let instance = self.instance(instance_id)?;
455
456        // Validation
457        if instance.replicas.is_empty() && !replicas.iter().any(|id| instance.replicas.contains(id))
458        {
459            return Err(HydrationCheckBadTarget(replicas).into());
460        }
461
462        let (tx, rx) = oneshot::channel();
463        instance.call(move |i| {
464            let result = i
465                .collections_hydrated_on_replicas(Some(replicas), &exclude_collections)
466                .expect("validated");
467            let _ = tx.send(result);
468        });
469
470        Ok(rx)
471    }
472
473    /// Returns the state of the [`ComputeController`] formatted as JSON.
474    ///
475    /// The returned value is not guaranteed to be stable and may change at any point in time.
476    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
477        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
478        // returned object as a tradeoff between usability and stability. `serde_json` will fail
479        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
480        // prevents a future unrelated change from silently breaking this method.
481
482        // Destructure `self` here so we don't forget to consider dumping newly added fields.
483        let Self {
484            instances,
485            instance_workload_classes,
486            build_info: _,
487            storage_collections: _,
488            initialized,
489            read_only,
490            config: _,
491            peek_stash_persist_location: _,
492            stashed_response,
493            metrics: _,
494            now: _,
495            wallclock_lag: _,
496            dyncfg: _,
497            response_rx: _,
498            response_tx: _,
499            introspection_rx: _,
500            introspection_tx: _,
501            maintenance_ticker: _,
502            maintenance_scheduled,
503        } = self;
504
505        let mut instances_dump = BTreeMap::new();
506        for (id, instance) in instances {
507            let dump = instance.dump().await?;
508            instances_dump.insert(id.to_string(), dump);
509        }
510
511        let instance_workload_classes: BTreeMap<_, _> = instance_workload_classes
512            .lock()
513            .expect("lock poisoned")
514            .iter()
515            .map(|(id, wc)| (id.to_string(), format!("{wc:?}")))
516            .collect();
517
518        fn field(
519            key: &str,
520            value: impl Serialize,
521        ) -> Result<(String, serde_json::Value), anyhow::Error> {
522            let value = serde_json::to_value(value)?;
523            Ok((key.to_string(), value))
524        }
525
526        let map = serde_json::Map::from_iter([
527            field("instances", instances_dump)?,
528            field("instance_workload_classes", instance_workload_classes)?,
529            field("initialized", initialized)?,
530            field("read_only", read_only)?,
531            field("stashed_response", format!("{stashed_response:?}"))?,
532            field("maintenance_scheduled", maintenance_scheduled)?,
533        ]);
534        Ok(serde_json::Value::Object(map))
535    }
536}
537
538impl<T> ComputeController<T>
539where
540    T: ComputeControllerTimestamp,
541    ComputeGrpcClient: ComputeClient<T>,
542{
543    /// Create a compute instance.
544    pub fn create_instance(
545        &mut self,
546        id: ComputeInstanceId,
547        arranged_logs: BTreeMap<LogVariant, GlobalId>,
548        workload_class: Option<String>,
549    ) -> Result<(), InstanceExists> {
550        if self.instances.contains_key(&id) {
551            return Err(InstanceExists(id));
552        }
553
554        let mut collections = BTreeMap::new();
555        let mut logs = Vec::with_capacity(arranged_logs.len());
556        for (&log, &id) in &arranged_logs {
557            let collection = Collection::new_log();
558            let shared = collection.shared.clone();
559            collections.insert(id, collection);
560            logs.push((log, id, shared));
561        }
562
563        let client = instance::Client::spawn(
564            id,
565            self.build_info,
566            Arc::clone(&self.storage_collections),
567            self.peek_stash_persist_location.clone(),
568            logs,
569            self.metrics.for_instance(id),
570            self.now.clone(),
571            self.wallclock_lag.clone(),
572            Arc::clone(&self.dyncfg),
573            self.response_tx.clone(),
574            self.introspection_tx.clone(),
575        );
576
577        let instance = InstanceState::new(client, collections);
578        self.instances.insert(id, instance);
579
580        self.instance_workload_classes
581            .lock()
582            .expect("lock poisoned")
583            .insert(id, workload_class.clone());
584
585        let instance = self.instances.get_mut(&id).expect("instance just added");
586        if self.initialized {
587            instance.call(Instance::initialization_complete);
588        }
589
590        if !self.read_only {
591            instance.call(Instance::allow_writes);
592        }
593
594        let mut config_params = self.config.clone();
595        config_params.workload_class = Some(workload_class);
596        instance.call(|i| i.update_configuration(config_params));
597
598        Ok(())
599    }
600
601    /// Updates a compute instance's workload class.
602    pub fn update_instance_workload_class(
603        &mut self,
604        id: ComputeInstanceId,
605        workload_class: Option<String>,
606    ) -> Result<(), InstanceMissing> {
607        // Ensure that the instance exists first.
608        let _ = self.instance(id)?;
609
610        self.instance_workload_classes
611            .lock()
612            .expect("lock poisoned")
613            .insert(id, workload_class);
614
615        // Cause a config update to notify the instance about its new workload class.
616        self.update_configuration(Default::default());
617
618        Ok(())
619    }
620
621    /// Remove a compute instance.
622    ///
623    /// # Panics
624    ///
625    /// Panics if the identified `instance` still has active replicas.
626    pub fn drop_instance(&mut self, id: ComputeInstanceId) {
627        if let Some(instance) = self.instances.remove(&id) {
628            instance.call(|i| i.shutdown());
629        }
630
631        self.instance_workload_classes
632            .lock()
633            .expect("lock poisoned")
634            .remove(&id);
635    }
636
637    /// Returns the compute controller's config set.
638    pub fn dyncfg(&self) -> &Arc<ConfigSet> {
639        &self.dyncfg
640    }
641
642    /// Update compute configuration.
643    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
644        // Apply dyncfg updates.
645        config_params.dyncfg_updates.apply(&self.dyncfg);
646
647        let instance_workload_classes = self
648            .instance_workload_classes
649            .lock()
650            .expect("lock poisoned");
651
652        // Forward updates to existing clusters.
653        // Workload classes are cluster-specific, so we need to overwrite them here.
654        for (id, instance) in self.instances.iter_mut() {
655            let mut params = config_params.clone();
656            params.workload_class = Some(instance_workload_classes[id].clone());
657            instance.call(|i| i.update_configuration(params));
658        }
659
660        let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(&self.dyncfg);
661        match overflowing_behavior.parse() {
662            Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
663            Err(err) => {
664                tracing::error!(
665                    err,
666                    overflowing_behavior,
667                    "Invalid value for ore_overflowing_behavior"
668                );
669            }
670        }
671
672        // Remember updates for future clusters.
673        self.config.update(config_params);
674    }
675
676    /// Mark the end of any initialization commands.
677    ///
678    /// The implementor may wait for this method to be called before implementing prior commands,
679    /// and so it is important for a user to invoke this method as soon as it is comfortable.
680    /// This method can be invoked immediately, at the potential expense of performance.
681    pub fn initialization_complete(&mut self) {
682        self.initialized = true;
683        for instance in self.instances.values_mut() {
684            instance.call(Instance::initialization_complete);
685        }
686    }
687
688    /// Wait until the controller is ready to do some processing.
689    ///
690    /// This method may block for an arbitrarily long time.
691    ///
692    /// When the method returns, the caller should call [`ComputeController::process`].
693    ///
694    /// This method is cancellation safe.
695    pub async fn ready(&mut self) {
696        if self.stashed_response.is_some() {
697            // We still have a response stashed, which we are immediately ready to process.
698            return;
699        }
700        if self.maintenance_scheduled {
701            // Maintenance work has been scheduled.
702            return;
703        }
704
705        tokio::select! {
706            resp = self.response_rx.recv() => {
707                let resp = resp.expect("`self.response_tx` not dropped");
708                self.stashed_response = Some(resp);
709            }
710            _ = self.maintenance_ticker.tick() => {
711                self.maintenance_scheduled = true;
712            },
713        }
714    }
715
716    /// Adds replicas of an instance.
717    pub fn add_replica_to_instance(
718        &mut self,
719        instance_id: ComputeInstanceId,
720        replica_id: ReplicaId,
721        location: ClusterReplicaLocation,
722        config: ComputeReplicaConfig,
723        enable_ctp: bool,
724    ) -> Result<(), ReplicaCreationError> {
725        use ReplicaCreationError::*;
726
727        let instance = self.instance(instance_id)?;
728
729        // Validation
730        if instance.replicas.contains(&replica_id) {
731            return Err(ReplicaExists(replica_id));
732        }
733
734        let (enable_logging, interval) = match config.logging.interval {
735            Some(interval) => (true, interval),
736            None => (false, Duration::from_secs(1)),
737        };
738
739        let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.dyncfg);
740
741        let replica_config = ReplicaConfig {
742            location,
743            logging: LoggingConfig {
744                interval,
745                enable_logging,
746                log_logging: config.logging.log_logging,
747                index_logs: Default::default(),
748            },
749            grpc_client: self.config.grpc_client.clone(),
750            expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
751            enable_ctp,
752        };
753
754        let instance = self.instance_mut(instance_id).expect("validated");
755        instance.replicas.insert(replica_id);
756
757        instance.call(move |i| {
758            i.add_replica(replica_id, replica_config, None)
759                .expect("validated")
760        });
761
762        Ok(())
763    }
764
765    /// Removes a replica from an instance, including its service in the orchestrator.
766    pub fn drop_replica(
767        &mut self,
768        instance_id: ComputeInstanceId,
769        replica_id: ReplicaId,
770    ) -> Result<(), ReplicaDropError> {
771        use ReplicaDropError::*;
772
773        let instance = self.instance_mut(instance_id)?;
774
775        // Validation
776        if !instance.replicas.contains(&replica_id) {
777            return Err(ReplicaMissing(replica_id));
778        }
779
780        instance.replicas.remove(&replica_id);
781
782        instance.call(move |i| i.remove_replica(replica_id).expect("validated"));
783
784        Ok(())
785    }
786
787    /// Creates the described dataflow and initializes state for its output.
788    ///
789    /// If a `subscribe_target_replica` is given, any subscribes exported by the dataflow are
790    /// configured to target that replica, i.e., only subscribe responses sent by that replica are
791    /// considered.
792    pub fn create_dataflow(
793        &mut self,
794        instance_id: ComputeInstanceId,
795        mut dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
796        subscribe_target_replica: Option<ReplicaId>,
797    ) -> Result<(), DataflowCreationError> {
798        use DataflowCreationError::*;
799
800        let instance = self.instance(instance_id)?;
801
802        // Validation: target replica
803        if let Some(replica_id) = subscribe_target_replica {
804            if !instance.replicas.contains(&replica_id) {
805                return Err(ReplicaMissing(replica_id));
806            }
807        }
808
809        // Validation: as_of
810        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
811        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
812            return Err(EmptyAsOfForSubscribe);
813        }
814        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
815            return Err(EmptyAsOfForCopyTo);
816        }
817
818        // Validation: input collections
819        let storage_ids = dataflow.imported_source_ids().collect();
820        let mut import_read_holds = self.storage_collections.acquire_read_holds(storage_ids)?;
821        for id in dataflow.imported_index_ids() {
822            let read_hold = instance.acquire_read_hold(id)?;
823            import_read_holds.push(read_hold);
824        }
825        for hold in &import_read_holds {
826            if PartialOrder::less_than(as_of, hold.since()) {
827                return Err(SinceViolation(hold.id()));
828            }
829        }
830
831        // Validation: storage sink collections
832        for id in dataflow.persist_sink_ids() {
833            if self.storage_collections.check_exists(id).is_err() {
834                return Err(CollectionMissing(id));
835            }
836        }
837        let time_dependence = self
838            .determine_time_dependence(instance_id, &dataflow)
839            .expect("must exist");
840
841        let instance = self.instance_mut(instance_id).expect("validated");
842
843        let mut shared_collection_state = BTreeMap::new();
844        for id in dataflow.export_ids() {
845            let shared = SharedCollectionState::new(as_of.clone());
846            let collection = Collection {
847                write_only: dataflow.sink_exports.contains_key(&id),
848                compute_dependencies: dataflow.imported_index_ids().collect(),
849                shared: shared.clone(),
850                time_dependence: time_dependence.clone(),
851            };
852            instance.collections.insert(id, collection);
853            shared_collection_state.insert(id, shared);
854        }
855
856        dataflow.time_dependence = time_dependence;
857
858        instance.call(move |i| {
859            i.create_dataflow(
860                dataflow,
861                import_read_holds,
862                subscribe_target_replica,
863                shared_collection_state,
864            )
865            .expect("validated")
866        });
867
868        Ok(())
869    }
870
871    /// Drop the read capability for the given collections and allow their resources to be
872    /// reclaimed.
873    pub fn drop_collections(
874        &mut self,
875        instance_id: ComputeInstanceId,
876        collection_ids: Vec<GlobalId>,
877    ) -> Result<(), CollectionUpdateError> {
878        let instance = self.instance_mut(instance_id)?;
879
880        // Validation
881        for id in &collection_ids {
882            instance.collection(*id)?;
883        }
884
885        for id in &collection_ids {
886            instance.collections.remove(id);
887        }
888
889        instance.call(|i| i.drop_collections(collection_ids).expect("validated"));
890
891        Ok(())
892    }
893
894    /// Initiate a peek request for the contents of the given collection at `timestamp`.
895    pub fn peek(
896        &self,
897        instance_id: ComputeInstanceId,
898        peek_target: PeekTarget,
899        literal_constraints: Option<Vec<Row>>,
900        uuid: Uuid,
901        timestamp: T,
902        result_desc: RelationDesc,
903        finishing: RowSetFinishing,
904        map_filter_project: mz_expr::SafeMfpPlan,
905        target_replica: Option<ReplicaId>,
906        peek_response_tx: oneshot::Sender<PeekResponse>,
907    ) -> Result<(), PeekError> {
908        use PeekError::*;
909
910        let instance = self.instance(instance_id)?;
911
912        // Validation: target replica
913        if let Some(replica_id) = target_replica {
914            if !instance.replicas.contains(&replica_id) {
915                return Err(ReplicaMissing(replica_id));
916            }
917        }
918
919        // Validation: peek target
920        let read_hold = match &peek_target {
921            PeekTarget::Index { id } => instance.acquire_read_hold(*id)?,
922            PeekTarget::Persist { id, .. } => self
923                .storage_collections
924                .acquire_read_holds(vec![*id])?
925                .into_element(),
926        };
927        if !read_hold.since().less_equal(&timestamp) {
928            return Err(SinceViolation(peek_target.id()));
929        }
930
931        instance.call(move |i| {
932            i.peek(
933                peek_target,
934                literal_constraints,
935                uuid,
936                timestamp,
937                result_desc,
938                finishing,
939                map_filter_project,
940                read_hold,
941                target_replica,
942                peek_response_tx,
943            )
944            .expect("validated")
945        });
946
947        Ok(())
948    }
949
950    /// Cancel an existing peek request.
951    ///
952    /// Canceling a peek is best effort. The caller may see any of the following
953    /// after canceling a peek request:
954    ///
955    ///   * A `PeekResponse::Rows` indicating that the cancellation request did
956    ///     not take effect in time and the query succeeded.
957    ///   * A `PeekResponse::Canceled` affirming that the peek was canceled.
958    ///   * No `PeekResponse` at all.
959    pub fn cancel_peek(
960        &self,
961        instance_id: ComputeInstanceId,
962        uuid: Uuid,
963        reason: PeekResponse,
964    ) -> Result<(), InstanceMissing> {
965        self.instance(instance_id)?
966            .call(move |i| i.cancel_peek(uuid, reason));
967        Ok(())
968    }
969
970    /// Assign a read policy to specific identifiers.
971    ///
972    /// The policies are assigned in the order presented, and repeated identifiers should
973    /// conclude with the last policy. Changing a policy will immediately downgrade the read
974    /// capability if appropriate, but it will not "recover" the read capability if the prior
975    /// capability is already ahead of it.
976    ///
977    /// Identifiers not present in `policies` retain their existing read policies.
978    ///
979    /// It is an error to attempt to set a read policy for a collection that is not readable in the
980    /// context of compute. At this time, only indexes are readable compute collections.
981    pub fn set_read_policy(
982        &self,
983        instance_id: ComputeInstanceId,
984        policies: Vec<(GlobalId, ReadPolicy<T>)>,
985    ) -> Result<(), ReadPolicyError> {
986        use ReadPolicyError::*;
987
988        let instance = self.instance(instance_id)?;
989
990        // Validation
991        for (id, _) in &policies {
992            let collection = instance.collection(*id)?;
993            if collection.write_only {
994                return Err(WriteOnlyCollection(*id));
995            }
996        }
997
998        self.instance(instance_id)?
999            .call(|i| i.set_read_policy(policies).expect("validated"));
1000
1001        Ok(())
1002    }
1003
1004    /// Acquires a [`ReadHold`] for the identified compute collection.
1005    pub fn acquire_read_hold(
1006        &self,
1007        instance_id: ComputeInstanceId,
1008        collection_id: GlobalId,
1009    ) -> Result<ReadHold<T>, CollectionUpdateError> {
1010        let read_hold = self
1011            .instance(instance_id)?
1012            .acquire_read_hold(collection_id)?;
1013        Ok(read_hold)
1014    }
1015
1016    /// Determine the time dependence for a dataflow.
1017    fn determine_time_dependence(
1018        &self,
1019        instance_id: ComputeInstanceId,
1020        dataflow: &DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1021    ) -> Result<Option<TimeDependence>, TimeDependenceError> {
1022        // TODO(ct3): Continual tasks don't support replica expiration
1023        let is_continual_task = dataflow.continual_task_ids().next().is_some();
1024        if is_continual_task {
1025            return Ok(None);
1026        }
1027
1028        let instance = self
1029            .instance(instance_id)
1030            .map_err(|err| TimeDependenceError::InstanceMissing(err.0))?;
1031        let mut time_dependencies = Vec::new();
1032
1033        for id in dataflow.imported_index_ids() {
1034            let dependence = instance
1035                .get_time_dependence(id)
1036                .map_err(|err| TimeDependenceError::CollectionMissing(err.0))?;
1037            time_dependencies.push(dependence);
1038        }
1039
1040        'source: for id in dataflow.imported_source_ids() {
1041            // We first check whether the id is backed by a compute object, in which case we use
1042            // the time dependence we know. This is true for materialized views, continual tasks,
1043            // etc.
1044            for instance in self.instances.values() {
1045                if let Ok(dependence) = instance.get_time_dependence(id) {
1046                    time_dependencies.push(dependence);
1047                    continue 'source;
1048                }
1049            }
1050
1051            // Not a compute object: Consult the storage collections controller.
1052            time_dependencies.push(self.storage_collections.determine_time_dependence(id)?);
1053        }
1054
1055        Ok(TimeDependence::merge(
1056            time_dependencies,
1057            dataflow.refresh_schedule.as_ref(),
1058        ))
1059    }
1060
1061    /// Processes the work queued by [`ComputeController::ready`].
1062    #[mz_ore::instrument(level = "debug")]
1063    pub fn process(&mut self) -> Option<ComputeControllerResponse<T>> {
1064        // Perform periodic maintenance work.
1065        if self.maintenance_scheduled {
1066            self.maintain();
1067            self.maintenance_scheduled = false;
1068        }
1069
1070        // Return a ready response, if any.
1071        self.stashed_response.take()
1072    }
1073
1074    #[mz_ore::instrument(level = "debug")]
1075    fn maintain(&mut self) {
1076        // Perform instance maintenance work.
1077        for instance in self.instances.values_mut() {
1078            instance.call(Instance::maintain);
1079        }
1080    }
1081}
1082
1083#[derive(Debug)]
1084struct InstanceState<T: ComputeControllerTimestamp> {
1085    client: instance::Client<T>,
1086    replicas: BTreeSet<ReplicaId>,
1087    collections: BTreeMap<GlobalId, Collection<T>>,
1088}
1089
1090impl<T: ComputeControllerTimestamp> InstanceState<T> {
1091    fn new(client: instance::Client<T>, collections: BTreeMap<GlobalId, Collection<T>>) -> Self {
1092        Self {
1093            client,
1094            replicas: Default::default(),
1095            collections,
1096        }
1097    }
1098
1099    fn collection(&self, id: GlobalId) -> Result<&Collection<T>, CollectionMissing> {
1100        self.collections.get(&id).ok_or(CollectionMissing(id))
1101    }
1102
1103    fn call<F>(&self, f: F)
1104    where
1105        F: FnOnce(&mut Instance<T>) + Send + 'static,
1106    {
1107        let otel_ctx = OpenTelemetryContext::obtain();
1108        self.client
1109            .send(Box::new(move |instance| {
1110                let _span = debug_span!("instance::call").entered();
1111                otel_ctx.attach_as_parent();
1112
1113                f(instance)
1114            }))
1115            .expect("instance not dropped");
1116    }
1117
1118    async fn call_sync<F, R>(&self, f: F) -> R
1119    where
1120        F: FnOnce(&mut Instance<T>) -> R + Send + 'static,
1121        R: Send + 'static,
1122    {
1123        let (tx, rx) = oneshot::channel();
1124        let otel_ctx = OpenTelemetryContext::obtain();
1125        self.client
1126            .send(Box::new(move |instance| {
1127                let _span = debug_span!("instance::call_sync").entered();
1128                otel_ctx.attach_as_parent();
1129
1130                let result = f(instance);
1131                let _ = tx.send(result);
1132            }))
1133            .expect("instance not dropped");
1134
1135        rx.await.expect("instance not dropped")
1136    }
1137
1138    /// Acquires a [`ReadHold`] for the identified compute collection.
1139    pub fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
1140        // We acquire read holds at the earliest possible time rather than returning a copy
1141        // of the implied read hold. This is so that in `create_dataflow` we can acquire read holds
1142        // on compute dependencies at frontiers that are held back by other read holds the caller
1143        // has previously taken.
1144        //
1145        // If/when we change the compute API to expect callers to pass in the `ReadHold`s rather
1146        // than acquiring them ourselves, we might tighten this up and instead acquire read holds
1147        // at the implied capability.
1148
1149        let collection = self.collection(id)?;
1150        let since = collection.shared.lock_read_capabilities(|caps| {
1151            let since = caps.frontier().to_owned();
1152            caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
1153            since
1154        });
1155
1156        let hold = ReadHold::new(id, since, self.client.read_hold_tx());
1157        Ok(hold)
1158    }
1159
1160    /// Return the stored time dependence for a collection.
1161    fn get_time_dependence(
1162        &self,
1163        id: GlobalId,
1164    ) -> Result<Option<TimeDependence>, CollectionMissing> {
1165        Ok(self.collection(id)?.time_dependence.clone())
1166    }
1167
1168    /// Returns the [`InstanceState`] formatted as JSON.
1169    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1170        // Destructure `self` here so we don't forget to consider dumping newly added fields.
1171        let Self {
1172            client: _,
1173            replicas,
1174            collections,
1175        } = self;
1176
1177        let instance = self.call_sync(|i| i.dump()).await?;
1178        let replicas: Vec<_> = replicas.iter().map(|id| id.to_string()).collect();
1179        let collections: BTreeMap<_, _> = collections
1180            .iter()
1181            .map(|(id, c)| (id.to_string(), format!("{c:?}")))
1182            .collect();
1183
1184        fn field(
1185            key: &str,
1186            value: impl Serialize,
1187        ) -> Result<(String, serde_json::Value), anyhow::Error> {
1188            let value = serde_json::to_value(value)?;
1189            Ok((key.to_string(), value))
1190        }
1191
1192        let map = serde_json::Map::from_iter([
1193            field("instance", instance)?,
1194            field("replicas", replicas)?,
1195            field("collections", collections)?,
1196        ]);
1197        Ok(serde_json::Value::Object(map))
1198    }
1199}
1200
1201#[derive(Debug)]
1202struct Collection<T> {
1203    write_only: bool,
1204    compute_dependencies: BTreeSet<GlobalId>,
1205    shared: SharedCollectionState<T>,
1206    /// The computed time dependence for this collection. None indicates no specific information,
1207    /// a value describes how the collection relates to wall-clock time.
1208    time_dependence: Option<TimeDependence>,
1209}
1210
1211impl<T: Timestamp> Collection<T> {
1212    fn new_log() -> Self {
1213        let as_of = Antichain::from_elem(T::minimum());
1214        Self {
1215            write_only: false,
1216            compute_dependencies: Default::default(),
1217            shared: SharedCollectionState::new(as_of),
1218            time_dependence: Some(TimeDependence::default()),
1219        }
1220    }
1221
1222    fn frontiers(&self) -> CollectionFrontiers<T> {
1223        let read_frontier = self
1224            .shared
1225            .lock_read_capabilities(|c| c.frontier().to_owned());
1226        let write_frontier = self.shared.lock_write_frontier(|f| f.clone());
1227        CollectionFrontiers {
1228            read_frontier,
1229            write_frontier,
1230        }
1231    }
1232}
1233
1234/// The frontiers of a compute collection.
1235#[derive(Clone, Debug)]
1236pub struct CollectionFrontiers<T> {
1237    /// The read frontier.
1238    pub read_frontier: Antichain<T>,
1239    /// The write frontier.
1240    pub write_frontier: Antichain<T>,
1241}
1242
1243impl<T: Timestamp> Default for CollectionFrontiers<T> {
1244    fn default() -> Self {
1245        Self {
1246            read_frontier: Antichain::from_elem(T::minimum()),
1247            write_frontier: Antichain::from_elem(T::minimum()),
1248        }
1249    }
1250}