mz_compute_client/
controller.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller that provides an interface to the compute layer, and the storage layer below it.
11//!
12//! The compute controller manages the creation, maintenance, and removal of compute instances.
13//! This involves ensuring the intended service state with the orchestrator, as well as maintaining
14//! a dedicated compute instance controller for each active compute instance.
15//!
16//! For each compute instance, the compute controller curates the creation of indexes and sinks
17//! installed on the instance, the progress of readers through these collections, and their
18//! eventual dropping and resource reclamation.
19//!
20//! The state maintained for a compute instance can be viewed as a partial map from `GlobalId` to
21//! collection. It is an error to use an identifier before it has been "created" with
22//! `create_dataflow()`. Once created, the controller holds a read capability for each output
23//! collection of a dataflow, which is manipulated with `set_read_policy()`. Eventually, a
24//! collection is dropped with `drop_collections()`.
25//!
26//! Created dataflows will prevent the compaction of their inputs, including other compute
27//! collections but also collections managed by the storage layer. Each dataflow input is prevented
28//! from compacting beyond the allowed compaction of each of its outputs, ensuring that we can
29//! recover each dataflow to its current state in case of failure or other reconfiguration.
30
31use std::collections::{BTreeMap, BTreeSet};
32use std::sync::{Arc, Mutex};
33use std::time::Duration;
34
35use mz_build_info::BuildInfo;
36use mz_cluster_client::client::ClusterReplicaLocation;
37use mz_cluster_client::metrics::ControllerMetrics;
38use mz_cluster_client::{ReplicaId, WallclockLagFn};
39use mz_compute_types::ComputeInstanceId;
40use mz_compute_types::config::ComputeReplicaConfig;
41use mz_compute_types::dataflows::DataflowDescription;
42use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
43use mz_dyncfg::ConfigSet;
44use mz_expr::RowSetFinishing;
45use mz_ore::cast::CastFrom;
46use mz_ore::collections::CollectionExt;
47use mz_ore::metrics::MetricsRegistry;
48use mz_ore::now::NowFn;
49use mz_ore::tracing::OpenTelemetryContext;
50use mz_persist_types::PersistLocation;
51use mz_repr::{Datum, GlobalId, RelationDesc, Row, TimestampManipulation};
52use mz_storage_client::controller::StorageController;
53use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
54use mz_storage_types::read_holds::ReadHold;
55use mz_storage_types::read_policy::ReadPolicy;
56use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
57use prometheus::proto::LabelPair;
58use serde::{Deserialize, Serialize};
59use timely::PartialOrder;
60use timely::progress::{Antichain, Timestamp};
61use tokio::sync::{mpsc, oneshot};
62use tokio::time::{self, MissedTickBehavior};
63use tracing::debug_span;
64use uuid::Uuid;
65
66use crate::controller::error::{
67    CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
68    HydrationCheckBadTarget, InstanceExists, InstanceMissing, PeekError, ReadPolicyError,
69    ReplicaCreationError, ReplicaDropError,
70};
71use crate::controller::instance::{Instance, SharedCollectionState};
72use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection_sink};
73use crate::controller::replica::ReplicaConfig;
74use crate::logging::{LogVariant, LoggingConfig};
75use crate::metrics::ComputeControllerMetrics;
76use crate::protocol::command::{ComputeParameters, PeekTarget};
77use crate::protocol::response::{PeekResponse, SubscribeBatch};
78
79mod instance;
80mod introspection;
81mod replica;
82mod sequential_hydration;
83
84pub mod error;
85
86pub(crate) type StorageCollections<T> = Arc<
87    dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T> + Send + Sync,
88>;
89
90/// A composite trait for types that serve as timestamps in the Compute Controller.
91/// `Into<Datum<'a>>` is needed for writing timestamps to introspection collections.
92pub trait ComputeControllerTimestamp: TimestampManipulation + Into<Datum<'static>> + Sync {}
93
94impl ComputeControllerTimestamp for mz_repr::Timestamp {}
95
96/// Responses from the compute controller.
97#[derive(Debug)]
98pub enum ComputeControllerResponse<T> {
99    /// See [`PeekNotification`].
100    PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
101    /// See [`crate::protocol::response::ComputeResponse::SubscribeResponse`].
102    SubscribeResponse(GlobalId, SubscribeBatch<T>),
103    /// The response from a dataflow containing an `CopyToS3Oneshot` sink.
104    ///
105    /// The `GlobalId` identifies the sink. The `Result` is the response from
106    /// the sink, where an `Ok(n)` indicates that `n` rows were successfully
107    /// copied to S3 and an `Err` indicates that an error was encountered
108    /// during the copy operation.
109    ///
110    /// For a given `CopyToS3Oneshot` sink, there will be at most one `CopyToResponse`
111    /// produced. (The sink may produce no responses if its dataflow is dropped
112    /// before completion.)
113    CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
114    /// A response reporting advancement of a collection's upper frontier.
115    ///
116    /// Once a collection's upper (aka "write frontier") has advanced to beyond a given time, the
117    /// contents of the collection as of that time have been sealed and cannot change anymore.
118    FrontierUpper {
119        /// The ID of a compute collection.
120        id: GlobalId,
121        /// The new upper frontier of the identified compute collection.
122        upper: Antichain<T>,
123    },
124}
125
126/// Notification and summary of a received and forwarded [`crate::protocol::response::ComputeResponse::PeekResponse`].
127#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
128pub enum PeekNotification {
129    /// Returned rows of a successful peek.
130    Success {
131        /// Number of rows in the returned peek result.
132        rows: u64,
133        /// Size of the returned peek result in bytes.
134        result_size: u64,
135    },
136    /// Error of an unsuccessful peek, including the reason for the error.
137    Error(String),
138    /// The peek was canceled.
139    Canceled,
140}
141
142impl PeekNotification {
143    /// Construct a new [`PeekNotification`] from a [`PeekResponse`]. The `offset` and `limit`
144    /// parameters are used to calculate the number of rows in the peek result.
145    fn new(peek_response: &PeekResponse, offset: usize, limit: Option<usize>) -> Self {
146        match peek_response {
147            PeekResponse::Rows(rows) => {
148                let num_rows = u64::cast_from(rows.count(offset, limit));
149                let result_size = u64::cast_from(rows.byte_len());
150
151                tracing::trace!(?num_rows, ?result_size, "inline peek result");
152
153                Self::Success {
154                    rows: num_rows,
155                    result_size,
156                }
157            }
158            PeekResponse::Stashed(stashed_response) => {
159                let rows = stashed_response.num_rows(offset, limit);
160                let result_size = stashed_response.size_bytes();
161
162                tracing::trace!(?rows, ?result_size, "stashed peek result");
163
164                Self::Success {
165                    rows: u64::cast_from(rows),
166                    result_size: u64::cast_from(result_size),
167                }
168            }
169            PeekResponse::Error(err) => Self::Error(err.clone()),
170            PeekResponse::Canceled => Self::Canceled,
171        }
172    }
173}
174
175/// A controller for the compute layer.
176pub struct ComputeController<T: ComputeControllerTimestamp> {
177    instances: BTreeMap<ComputeInstanceId, InstanceState<T>>,
178    /// A map from an instance ID to an arbitrary string that describes the
179    /// class of the workload that compute instance is running (e.g.,
180    /// `production` or `staging`).
181    instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>,
182    build_info: &'static BuildInfo,
183    /// A handle providing access to storage collections.
184    storage_collections: StorageCollections<T>,
185    /// Set to `true` once `initialization_complete` has been called.
186    initialized: bool,
187    /// Whether or not this controller is in read-only mode.
188    ///
189    /// When in read-only mode, neither this controller nor the instances
190    /// controlled by it are allowed to affect changes to external systems
191    /// (largely persist).
192    read_only: bool,
193    /// Compute configuration to apply to new instances.
194    config: ComputeParameters,
195    /// The persist location where we can stash large peek results.
196    peek_stash_persist_location: PersistLocation,
197    /// A controller response to be returned on the next call to [`ComputeController::process`].
198    stashed_response: Option<ComputeControllerResponse<T>>,
199    /// The compute controller metrics.
200    metrics: ComputeControllerMetrics,
201    /// A function that produces the current wallclock time.
202    now: NowFn,
203    /// A function that computes the lag between the given time and wallclock time.
204    wallclock_lag: WallclockLagFn<T>,
205    /// Dynamic system configuration.
206    ///
207    /// Updated through `ComputeController::update_configuration` calls and shared with all
208    /// subcomponents of the compute controller.
209    dyncfg: Arc<ConfigSet>,
210
211    /// Receiver for responses produced by `Instance`s.
212    response_rx: mpsc::UnboundedReceiver<ComputeControllerResponse<T>>,
213    /// Response sender that's passed to new `Instance`s.
214    response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
215    /// Receiver for introspection updates produced by `Instance`s.
216    ///
217    /// When [`ComputeController::start_introspection_sink`] is first called, this receiver is
218    /// passed to the introspection sink task.
219    introspection_rx: Option<mpsc::UnboundedReceiver<IntrospectionUpdates>>,
220    /// Introspection updates sender that's passed to new `Instance`s.
221    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
222
223    /// Ticker for scheduling periodic maintenance work.
224    maintenance_ticker: tokio::time::Interval,
225    /// Whether maintenance work was scheduled.
226    maintenance_scheduled: bool,
227}
228
229impl<T: ComputeControllerTimestamp> ComputeController<T> {
230    /// Construct a new [`ComputeController`].
231    pub fn new(
232        build_info: &'static BuildInfo,
233        storage_collections: StorageCollections<T>,
234        read_only: bool,
235        metrics_registry: &MetricsRegistry,
236        peek_stash_persist_location: PersistLocation,
237        controller_metrics: ControllerMetrics,
238        now: NowFn,
239        wallclock_lag: WallclockLagFn<T>,
240    ) -> Self {
241        let (response_tx, response_rx) = mpsc::unbounded_channel();
242        let (introspection_tx, introspection_rx) = mpsc::unbounded_channel();
243
244        let mut maintenance_ticker = time::interval(Duration::from_secs(1));
245        maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
246
247        let instance_workload_classes = Arc::new(Mutex::new(BTreeMap::<
248            ComputeInstanceId,
249            Option<String>,
250        >::new()));
251
252        // Apply a `workload_class` label to all metrics in the registry that
253        // have an `instance_id` label for an instance whose workload class is
254        // known.
255        metrics_registry.register_postprocessor({
256            let instance_workload_classes = Arc::clone(&instance_workload_classes);
257            move |metrics| {
258                let instance_workload_classes = instance_workload_classes
259                    .lock()
260                    .expect("lock poisoned")
261                    .iter()
262                    .map(|(id, workload_class)| (id.to_string(), workload_class.clone()))
263                    .collect::<BTreeMap<String, Option<String>>>();
264                for metric in metrics {
265                    'metric: for metric in metric.mut_metric() {
266                        for label in metric.get_label() {
267                            if label.get_name() == "instance_id" {
268                                if let Some(workload_class) = instance_workload_classes
269                                    .get(label.get_value())
270                                    .cloned()
271                                    .flatten()
272                                {
273                                    let mut label = LabelPair::default();
274                                    label.set_name("workload_class".into());
275                                    label.set_value(workload_class.clone());
276
277                                    let mut labels = metric.take_label();
278                                    labels.push(label);
279                                    metric.set_label(labels);
280                                }
281                                continue 'metric;
282                            }
283                        }
284                    }
285                }
286            }
287        });
288
289        let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics);
290
291        Self {
292            instances: BTreeMap::new(),
293            instance_workload_classes,
294            build_info,
295            storage_collections,
296            initialized: false,
297            read_only,
298            config: Default::default(),
299            peek_stash_persist_location,
300            stashed_response: None,
301            metrics,
302            now,
303            wallclock_lag,
304            dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()),
305            response_rx,
306            response_tx,
307            introspection_rx: Some(introspection_rx),
308            introspection_tx,
309            maintenance_ticker,
310            maintenance_scheduled: false,
311        }
312    }
313
314    /// Start sinking the compute controller's introspection data into storage.
315    ///
316    /// This method should be called once the introspection collections have been registered with
317    /// the storage controller. It will panic if invoked earlier than that.
318    pub fn start_introspection_sink(
319        &mut self,
320        storage_controller: &dyn StorageController<Timestamp = T>,
321    ) {
322        if let Some(rx) = self.introspection_rx.take() {
323            spawn_introspection_sink(rx, storage_controller);
324        }
325    }
326
327    /// TODO(database-issues#7533): Add documentation.
328    pub fn instance_exists(&self, id: ComputeInstanceId) -> bool {
329        self.instances.contains_key(&id)
330    }
331
332    /// Return a reference to the indicated compute instance.
333    fn instance(&self, id: ComputeInstanceId) -> Result<&InstanceState<T>, InstanceMissing> {
334        self.instances.get(&id).ok_or(InstanceMissing(id))
335    }
336
337    /// Return a mutable reference to the indicated compute instance.
338    fn instance_mut(
339        &mut self,
340        id: ComputeInstanceId,
341    ) -> Result<&mut InstanceState<T>, InstanceMissing> {
342        self.instances.get_mut(&id).ok_or(InstanceMissing(id))
343    }
344
345    /// List the IDs of all collections in the identified compute instance.
346    pub fn collection_ids(
347        &self,
348        instance_id: ComputeInstanceId,
349    ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
350        let instance = self.instance(instance_id)?;
351        let ids = instance.collections.keys().copied();
352        Ok(ids)
353    }
354
355    /// Return the frontiers of the indicated collection.
356    ///
357    /// If an `instance_id` is provided, the collection is assumed to be installed on that
358    /// instance. Otherwise all available instances are searched.
359    pub fn collection_frontiers(
360        &self,
361        collection_id: GlobalId,
362        instance_id: Option<ComputeInstanceId>,
363    ) -> Result<CollectionFrontiers<T>, CollectionLookupError> {
364        let collection = match instance_id {
365            Some(id) => self.instance(id)?.collection(collection_id)?,
366            None => self
367                .instances
368                .values()
369                .find_map(|i| i.collections.get(&collection_id))
370                .ok_or(CollectionMissing(collection_id))?,
371        };
372
373        Ok(collection.frontiers())
374    }
375
376    /// List compute collections that depend on the given collection.
377    pub fn collection_reverse_dependencies(
378        &self,
379        instance_id: ComputeInstanceId,
380        id: GlobalId,
381    ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
382        let instance = self.instance(instance_id)?;
383        let collections = instance.collections.iter();
384        let ids = collections
385            .filter_map(move |(cid, c)| c.compute_dependencies.contains(&id).then_some(*cid));
386        Ok(ids)
387    }
388
389    /// Returns `true` iff the given collection has been hydrated.
390    ///
391    /// For this check, zero-replica clusters are always considered hydrated.
392    /// Their collections would never normally be considered hydrated but it's
393    /// clearly intentional that they have no replicas.
394    pub async fn collection_hydrated(
395        &self,
396        instance_id: ComputeInstanceId,
397        collection_id: GlobalId,
398    ) -> Result<bool, anyhow::Error> {
399        let instance = self.instance(instance_id)?;
400
401        let res = instance
402            .call_sync(move |i| i.collection_hydrated(collection_id))
403            .await?;
404
405        Ok(res)
406    }
407
408    /// Returns `true` if all non-transient, non-excluded collections are hydrated on any of the
409    /// provided replicas.
410    ///
411    /// For this check, zero-replica clusters are always considered hydrated.
412    /// Their collections would never normally be considered hydrated but it's
413    /// clearly intentional that they have no replicas.
414    pub fn collections_hydrated_for_replicas(
415        &self,
416        instance_id: ComputeInstanceId,
417        replicas: Vec<ReplicaId>,
418        exclude_collections: BTreeSet<GlobalId>,
419    ) -> Result<oneshot::Receiver<bool>, anyhow::Error> {
420        let instance = self.instance(instance_id)?;
421
422        // Validation
423        if instance.replicas.is_empty() && !replicas.iter().any(|id| instance.replicas.contains(id))
424        {
425            return Err(HydrationCheckBadTarget(replicas).into());
426        }
427
428        let (tx, rx) = oneshot::channel();
429        instance.call(move |i| {
430            let result = i
431                .collections_hydrated_on_replicas(Some(replicas), &exclude_collections)
432                .expect("validated");
433            let _ = tx.send(result);
434        });
435
436        Ok(rx)
437    }
438
439    /// Returns the state of the [`ComputeController`] formatted as JSON.
440    ///
441    /// The returned value is not guaranteed to be stable and may change at any point in time.
442    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
443        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
444        // returned object as a tradeoff between usability and stability. `serde_json` will fail
445        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
446        // prevents a future unrelated change from silently breaking this method.
447
448        // Destructure `self` here so we don't forget to consider dumping newly added fields.
449        let Self {
450            instances,
451            instance_workload_classes,
452            build_info: _,
453            storage_collections: _,
454            initialized,
455            read_only,
456            config: _,
457            peek_stash_persist_location: _,
458            stashed_response,
459            metrics: _,
460            now: _,
461            wallclock_lag: _,
462            dyncfg: _,
463            response_rx: _,
464            response_tx: _,
465            introspection_rx: _,
466            introspection_tx: _,
467            maintenance_ticker: _,
468            maintenance_scheduled,
469        } = self;
470
471        let mut instances_dump = BTreeMap::new();
472        for (id, instance) in instances {
473            let dump = instance.dump().await?;
474            instances_dump.insert(id.to_string(), dump);
475        }
476
477        let instance_workload_classes: BTreeMap<_, _> = instance_workload_classes
478            .lock()
479            .expect("lock poisoned")
480            .iter()
481            .map(|(id, wc)| (id.to_string(), format!("{wc:?}")))
482            .collect();
483
484        fn field(
485            key: &str,
486            value: impl Serialize,
487        ) -> Result<(String, serde_json::Value), anyhow::Error> {
488            let value = serde_json::to_value(value)?;
489            Ok((key.to_string(), value))
490        }
491
492        let map = serde_json::Map::from_iter([
493            field("instances", instances_dump)?,
494            field("instance_workload_classes", instance_workload_classes)?,
495            field("initialized", initialized)?,
496            field("read_only", read_only)?,
497            field("stashed_response", format!("{stashed_response:?}"))?,
498            field("maintenance_scheduled", maintenance_scheduled)?,
499        ]);
500        Ok(serde_json::Value::Object(map))
501    }
502}
503
504impl<T> ComputeController<T>
505where
506    T: ComputeControllerTimestamp,
507{
508    /// Create a compute instance.
509    pub fn create_instance(
510        &mut self,
511        id: ComputeInstanceId,
512        arranged_logs: BTreeMap<LogVariant, GlobalId>,
513        workload_class: Option<String>,
514    ) -> Result<(), InstanceExists> {
515        if self.instances.contains_key(&id) {
516            return Err(InstanceExists(id));
517        }
518
519        let mut collections = BTreeMap::new();
520        let mut logs = Vec::with_capacity(arranged_logs.len());
521        for (&log, &id) in &arranged_logs {
522            let collection = Collection::new_log();
523            let shared = collection.shared.clone();
524            collections.insert(id, collection);
525            logs.push((log, id, shared));
526        }
527
528        let client = instance::Client::spawn(
529            id,
530            self.build_info,
531            Arc::clone(&self.storage_collections),
532            self.peek_stash_persist_location.clone(),
533            logs,
534            self.metrics.for_instance(id),
535            self.now.clone(),
536            self.wallclock_lag.clone(),
537            Arc::clone(&self.dyncfg),
538            self.response_tx.clone(),
539            self.introspection_tx.clone(),
540        );
541
542        let instance = InstanceState::new(client, collections);
543        self.instances.insert(id, instance);
544
545        self.instance_workload_classes
546            .lock()
547            .expect("lock poisoned")
548            .insert(id, workload_class.clone());
549
550        let instance = self.instances.get_mut(&id).expect("instance just added");
551        if self.initialized {
552            instance.call(Instance::initialization_complete);
553        }
554
555        if !self.read_only {
556            instance.call(Instance::allow_writes);
557        }
558
559        let mut config_params = self.config.clone();
560        config_params.workload_class = Some(workload_class);
561        instance.call(|i| i.update_configuration(config_params));
562
563        Ok(())
564    }
565
566    /// Updates a compute instance's workload class.
567    pub fn update_instance_workload_class(
568        &mut self,
569        id: ComputeInstanceId,
570        workload_class: Option<String>,
571    ) -> Result<(), InstanceMissing> {
572        // Ensure that the instance exists first.
573        let _ = self.instance(id)?;
574
575        self.instance_workload_classes
576            .lock()
577            .expect("lock poisoned")
578            .insert(id, workload_class);
579
580        // Cause a config update to notify the instance about its new workload class.
581        self.update_configuration(Default::default());
582
583        Ok(())
584    }
585
586    /// Remove a compute instance.
587    ///
588    /// # Panics
589    ///
590    /// Panics if the identified `instance` still has active replicas.
591    pub fn drop_instance(&mut self, id: ComputeInstanceId) {
592        if let Some(instance) = self.instances.remove(&id) {
593            instance.call(|i| i.shutdown());
594        }
595
596        self.instance_workload_classes
597            .lock()
598            .expect("lock poisoned")
599            .remove(&id);
600    }
601
602    /// Returns the compute controller's config set.
603    pub fn dyncfg(&self) -> &Arc<ConfigSet> {
604        &self.dyncfg
605    }
606
607    /// Update compute configuration.
608    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
609        // Apply dyncfg updates.
610        config_params.dyncfg_updates.apply(&self.dyncfg);
611
612        let instance_workload_classes = self
613            .instance_workload_classes
614            .lock()
615            .expect("lock poisoned");
616
617        // Forward updates to existing clusters.
618        // Workload classes are cluster-specific, so we need to overwrite them here.
619        for (id, instance) in self.instances.iter_mut() {
620            let mut params = config_params.clone();
621            params.workload_class = Some(instance_workload_classes[id].clone());
622            instance.call(|i| i.update_configuration(params));
623        }
624
625        let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(&self.dyncfg);
626        match overflowing_behavior.parse() {
627            Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
628            Err(err) => {
629                tracing::error!(
630                    err,
631                    overflowing_behavior,
632                    "Invalid value for ore_overflowing_behavior"
633                );
634            }
635        }
636
637        // Remember updates for future clusters.
638        self.config.update(config_params);
639    }
640
641    /// Mark the end of any initialization commands.
642    ///
643    /// The implementor may wait for this method to be called before implementing prior commands,
644    /// and so it is important for a user to invoke this method as soon as it is comfortable.
645    /// This method can be invoked immediately, at the potential expense of performance.
646    pub fn initialization_complete(&mut self) {
647        self.initialized = true;
648        for instance in self.instances.values_mut() {
649            instance.call(Instance::initialization_complete);
650        }
651    }
652
653    /// Wait until the controller is ready to do some processing.
654    ///
655    /// This method may block for an arbitrarily long time.
656    ///
657    /// When the method returns, the caller should call [`ComputeController::process`].
658    ///
659    /// This method is cancellation safe.
660    pub async fn ready(&mut self) {
661        if self.stashed_response.is_some() {
662            // We still have a response stashed, which we are immediately ready to process.
663            return;
664        }
665        if self.maintenance_scheduled {
666            // Maintenance work has been scheduled.
667            return;
668        }
669
670        tokio::select! {
671            resp = self.response_rx.recv() => {
672                let resp = resp.expect("`self.response_tx` not dropped");
673                self.stashed_response = Some(resp);
674            }
675            _ = self.maintenance_ticker.tick() => {
676                self.maintenance_scheduled = true;
677            },
678        }
679    }
680
681    /// Adds replicas of an instance.
682    pub fn add_replica_to_instance(
683        &mut self,
684        instance_id: ComputeInstanceId,
685        replica_id: ReplicaId,
686        location: ClusterReplicaLocation,
687        config: ComputeReplicaConfig,
688    ) -> Result<(), ReplicaCreationError> {
689        use ReplicaCreationError::*;
690
691        let instance = self.instance(instance_id)?;
692
693        // Validation
694        if instance.replicas.contains(&replica_id) {
695            return Err(ReplicaExists(replica_id));
696        }
697
698        let (enable_logging, interval) = match config.logging.interval {
699            Some(interval) => (true, interval),
700            None => (false, Duration::from_secs(1)),
701        };
702
703        let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.dyncfg);
704
705        let replica_config = ReplicaConfig {
706            location,
707            logging: LoggingConfig {
708                interval,
709                enable_logging,
710                log_logging: config.logging.log_logging,
711                index_logs: Default::default(),
712            },
713            grpc_client: self.config.grpc_client.clone(),
714            expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
715        };
716
717        let instance = self.instance_mut(instance_id).expect("validated");
718        instance.replicas.insert(replica_id);
719
720        instance.call(move |i| {
721            i.add_replica(replica_id, replica_config, None)
722                .expect("validated")
723        });
724
725        Ok(())
726    }
727
728    /// Removes a replica from an instance, including its service in the orchestrator.
729    pub fn drop_replica(
730        &mut self,
731        instance_id: ComputeInstanceId,
732        replica_id: ReplicaId,
733    ) -> Result<(), ReplicaDropError> {
734        use ReplicaDropError::*;
735
736        let instance = self.instance_mut(instance_id)?;
737
738        // Validation
739        if !instance.replicas.contains(&replica_id) {
740            return Err(ReplicaMissing(replica_id));
741        }
742
743        instance.replicas.remove(&replica_id);
744
745        instance.call(move |i| i.remove_replica(replica_id).expect("validated"));
746
747        Ok(())
748    }
749
750    /// Creates the described dataflow and initializes state for its output.
751    ///
752    /// If a `subscribe_target_replica` is given, any subscribes exported by the dataflow are
753    /// configured to target that replica, i.e., only subscribe responses sent by that replica are
754    /// considered.
755    pub fn create_dataflow(
756        &mut self,
757        instance_id: ComputeInstanceId,
758        mut dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
759        subscribe_target_replica: Option<ReplicaId>,
760    ) -> Result<(), DataflowCreationError> {
761        use DataflowCreationError::*;
762
763        let instance = self.instance(instance_id)?;
764
765        // Validation: target replica
766        if let Some(replica_id) = subscribe_target_replica {
767            if !instance.replicas.contains(&replica_id) {
768                return Err(ReplicaMissing(replica_id));
769            }
770        }
771
772        // Validation: as_of
773        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
774        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
775            return Err(EmptyAsOfForSubscribe);
776        }
777        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
778            return Err(EmptyAsOfForCopyTo);
779        }
780
781        // Validation: input collections
782        let storage_ids = dataflow.imported_source_ids().collect();
783        let mut import_read_holds = self.storage_collections.acquire_read_holds(storage_ids)?;
784        for id in dataflow.imported_index_ids() {
785            let read_hold = instance.acquire_read_hold(id)?;
786            import_read_holds.push(read_hold);
787        }
788        for hold in &import_read_holds {
789            if PartialOrder::less_than(as_of, hold.since()) {
790                return Err(SinceViolation(hold.id()));
791            }
792        }
793
794        // Validation: storage sink collections
795        for id in dataflow.persist_sink_ids() {
796            if self.storage_collections.check_exists(id).is_err() {
797                return Err(CollectionMissing(id));
798            }
799        }
800        let time_dependence = self
801            .determine_time_dependence(instance_id, &dataflow)
802            .expect("must exist");
803
804        let instance = self.instance_mut(instance_id).expect("validated");
805
806        let mut shared_collection_state = BTreeMap::new();
807        for id in dataflow.export_ids() {
808            let shared = SharedCollectionState::new(as_of.clone());
809            let collection = Collection {
810                write_only: dataflow.sink_exports.contains_key(&id),
811                compute_dependencies: dataflow.imported_index_ids().collect(),
812                shared: shared.clone(),
813                time_dependence: time_dependence.clone(),
814            };
815            instance.collections.insert(id, collection);
816            shared_collection_state.insert(id, shared);
817        }
818
819        dataflow.time_dependence = time_dependence;
820
821        instance.call(move |i| {
822            i.create_dataflow(
823                dataflow,
824                import_read_holds,
825                subscribe_target_replica,
826                shared_collection_state,
827            )
828            .expect("validated")
829        });
830
831        Ok(())
832    }
833
834    /// Drop the read capability for the given collections and allow their resources to be
835    /// reclaimed.
836    pub fn drop_collections(
837        &mut self,
838        instance_id: ComputeInstanceId,
839        collection_ids: Vec<GlobalId>,
840    ) -> Result<(), CollectionUpdateError> {
841        let instance = self.instance_mut(instance_id)?;
842
843        // Validation
844        for id in &collection_ids {
845            instance.collection(*id)?;
846        }
847
848        for id in &collection_ids {
849            instance.collections.remove(id);
850        }
851
852        instance.call(|i| i.drop_collections(collection_ids).expect("validated"));
853
854        Ok(())
855    }
856
857    /// Initiate a peek request for the contents of the given collection at `timestamp`.
858    pub fn peek(
859        &self,
860        instance_id: ComputeInstanceId,
861        peek_target: PeekTarget,
862        literal_constraints: Option<Vec<Row>>,
863        uuid: Uuid,
864        timestamp: T,
865        result_desc: RelationDesc,
866        finishing: RowSetFinishing,
867        map_filter_project: mz_expr::SafeMfpPlan,
868        target_replica: Option<ReplicaId>,
869        peek_response_tx: oneshot::Sender<PeekResponse>,
870    ) -> Result<(), PeekError> {
871        use PeekError::*;
872
873        let instance = self.instance(instance_id)?;
874
875        // Validation: target replica
876        if let Some(replica_id) = target_replica {
877            if !instance.replicas.contains(&replica_id) {
878                return Err(ReplicaMissing(replica_id));
879            }
880        }
881
882        // Validation: peek target
883        let read_hold = match &peek_target {
884            PeekTarget::Index { id } => instance.acquire_read_hold(*id)?,
885            PeekTarget::Persist { id, .. } => self
886                .storage_collections
887                .acquire_read_holds(vec![*id])?
888                .into_element(),
889        };
890        if !read_hold.since().less_equal(&timestamp) {
891            return Err(SinceViolation(peek_target.id()));
892        }
893
894        instance.call(move |i| {
895            i.peek(
896                peek_target,
897                literal_constraints,
898                uuid,
899                timestamp,
900                result_desc,
901                finishing,
902                map_filter_project,
903                read_hold,
904                target_replica,
905                peek_response_tx,
906            )
907            .expect("validated")
908        });
909
910        Ok(())
911    }
912
913    /// Cancel an existing peek request.
914    ///
915    /// Canceling a peek is best effort. The caller may see any of the following
916    /// after canceling a peek request:
917    ///
918    ///   * A `PeekResponse::Rows` indicating that the cancellation request did
919    ///     not take effect in time and the query succeeded.
920    ///   * A `PeekResponse::Canceled` affirming that the peek was canceled.
921    ///   * No `PeekResponse` at all.
922    pub fn cancel_peek(
923        &self,
924        instance_id: ComputeInstanceId,
925        uuid: Uuid,
926        reason: PeekResponse,
927    ) -> Result<(), InstanceMissing> {
928        self.instance(instance_id)?
929            .call(move |i| i.cancel_peek(uuid, reason));
930        Ok(())
931    }
932
933    /// Assign a read policy to specific identifiers.
934    ///
935    /// The policies are assigned in the order presented, and repeated identifiers should
936    /// conclude with the last policy. Changing a policy will immediately downgrade the read
937    /// capability if appropriate, but it will not "recover" the read capability if the prior
938    /// capability is already ahead of it.
939    ///
940    /// Identifiers not present in `policies` retain their existing read policies.
941    ///
942    /// It is an error to attempt to set a read policy for a collection that is not readable in the
943    /// context of compute. At this time, only indexes are readable compute collections.
944    pub fn set_read_policy(
945        &self,
946        instance_id: ComputeInstanceId,
947        policies: Vec<(GlobalId, ReadPolicy<T>)>,
948    ) -> Result<(), ReadPolicyError> {
949        use ReadPolicyError::*;
950
951        let instance = self.instance(instance_id)?;
952
953        // Validation
954        for (id, _) in &policies {
955            let collection = instance.collection(*id)?;
956            if collection.write_only {
957                return Err(WriteOnlyCollection(*id));
958            }
959        }
960
961        self.instance(instance_id)?
962            .call(|i| i.set_read_policy(policies).expect("validated"));
963
964        Ok(())
965    }
966
967    /// Acquires a [`ReadHold`] for the identified compute collection.
968    pub fn acquire_read_hold(
969        &self,
970        instance_id: ComputeInstanceId,
971        collection_id: GlobalId,
972    ) -> Result<ReadHold<T>, CollectionUpdateError> {
973        let read_hold = self
974            .instance(instance_id)?
975            .acquire_read_hold(collection_id)?;
976        Ok(read_hold)
977    }
978
979    /// Determine the time dependence for a dataflow.
980    fn determine_time_dependence(
981        &self,
982        instance_id: ComputeInstanceId,
983        dataflow: &DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
984    ) -> Result<Option<TimeDependence>, TimeDependenceError> {
985        // TODO(ct3): Continual tasks don't support replica expiration
986        let is_continual_task = dataflow.continual_task_ids().next().is_some();
987        if is_continual_task {
988            return Ok(None);
989        }
990
991        let instance = self
992            .instance(instance_id)
993            .map_err(|err| TimeDependenceError::InstanceMissing(err.0))?;
994        let mut time_dependencies = Vec::new();
995
996        for id in dataflow.imported_index_ids() {
997            let dependence = instance
998                .get_time_dependence(id)
999                .map_err(|err| TimeDependenceError::CollectionMissing(err.0))?;
1000            time_dependencies.push(dependence);
1001        }
1002
1003        'source: for id in dataflow.imported_source_ids() {
1004            // We first check whether the id is backed by a compute object, in which case we use
1005            // the time dependence we know. This is true for materialized views, continual tasks,
1006            // etc.
1007            for instance in self.instances.values() {
1008                if let Ok(dependence) = instance.get_time_dependence(id) {
1009                    time_dependencies.push(dependence);
1010                    continue 'source;
1011                }
1012            }
1013
1014            // Not a compute object: Consult the storage collections controller.
1015            time_dependencies.push(self.storage_collections.determine_time_dependence(id)?);
1016        }
1017
1018        Ok(TimeDependence::merge(
1019            time_dependencies,
1020            dataflow.refresh_schedule.as_ref(),
1021        ))
1022    }
1023
1024    /// Processes the work queued by [`ComputeController::ready`].
1025    #[mz_ore::instrument(level = "debug")]
1026    pub fn process(&mut self) -> Option<ComputeControllerResponse<T>> {
1027        // Perform periodic maintenance work.
1028        if self.maintenance_scheduled {
1029            self.maintain();
1030            self.maintenance_scheduled = false;
1031        }
1032
1033        // Return a ready response, if any.
1034        self.stashed_response.take()
1035    }
1036
1037    #[mz_ore::instrument(level = "debug")]
1038    fn maintain(&mut self) {
1039        // Perform instance maintenance work.
1040        for instance in self.instances.values_mut() {
1041            instance.call(Instance::maintain);
1042        }
1043    }
1044}
1045
1046#[derive(Debug)]
1047struct InstanceState<T: ComputeControllerTimestamp> {
1048    client: instance::Client<T>,
1049    replicas: BTreeSet<ReplicaId>,
1050    collections: BTreeMap<GlobalId, Collection<T>>,
1051}
1052
1053impl<T: ComputeControllerTimestamp> InstanceState<T> {
1054    fn new(client: instance::Client<T>, collections: BTreeMap<GlobalId, Collection<T>>) -> Self {
1055        Self {
1056            client,
1057            replicas: Default::default(),
1058            collections,
1059        }
1060    }
1061
1062    fn collection(&self, id: GlobalId) -> Result<&Collection<T>, CollectionMissing> {
1063        self.collections.get(&id).ok_or(CollectionMissing(id))
1064    }
1065
1066    fn call<F>(&self, f: F)
1067    where
1068        F: FnOnce(&mut Instance<T>) + Send + 'static,
1069    {
1070        let otel_ctx = OpenTelemetryContext::obtain();
1071        self.client
1072            .send(Box::new(move |instance| {
1073                let _span = debug_span!("instance::call").entered();
1074                otel_ctx.attach_as_parent();
1075
1076                f(instance)
1077            }))
1078            .expect("instance not dropped");
1079    }
1080
1081    async fn call_sync<F, R>(&self, f: F) -> R
1082    where
1083        F: FnOnce(&mut Instance<T>) -> R + Send + 'static,
1084        R: Send + 'static,
1085    {
1086        let (tx, rx) = oneshot::channel();
1087        let otel_ctx = OpenTelemetryContext::obtain();
1088        self.client
1089            .send(Box::new(move |instance| {
1090                let _span = debug_span!("instance::call_sync").entered();
1091                otel_ctx.attach_as_parent();
1092
1093                let result = f(instance);
1094                let _ = tx.send(result);
1095            }))
1096            .expect("instance not dropped");
1097
1098        rx.await.expect("instance not dropped")
1099    }
1100
1101    /// Acquires a [`ReadHold`] for the identified compute collection.
1102    pub fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
1103        // We acquire read holds at the earliest possible time rather than returning a copy
1104        // of the implied read hold. This is so that in `create_dataflow` we can acquire read holds
1105        // on compute dependencies at frontiers that are held back by other read holds the caller
1106        // has previously taken.
1107        //
1108        // If/when we change the compute API to expect callers to pass in the `ReadHold`s rather
1109        // than acquiring them ourselves, we might tighten this up and instead acquire read holds
1110        // at the implied capability.
1111
1112        let collection = self.collection(id)?;
1113        let since = collection.shared.lock_read_capabilities(|caps| {
1114            let since = caps.frontier().to_owned();
1115            caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
1116            since
1117        });
1118
1119        let hold = ReadHold::new(id, since, self.client.read_hold_tx());
1120        Ok(hold)
1121    }
1122
1123    /// Return the stored time dependence for a collection.
1124    fn get_time_dependence(
1125        &self,
1126        id: GlobalId,
1127    ) -> Result<Option<TimeDependence>, CollectionMissing> {
1128        Ok(self.collection(id)?.time_dependence.clone())
1129    }
1130
1131    /// Returns the [`InstanceState`] formatted as JSON.
1132    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1133        // Destructure `self` here so we don't forget to consider dumping newly added fields.
1134        let Self {
1135            client: _,
1136            replicas,
1137            collections,
1138        } = self;
1139
1140        let instance = self.call_sync(|i| i.dump()).await?;
1141        let replicas: Vec<_> = replicas.iter().map(|id| id.to_string()).collect();
1142        let collections: BTreeMap<_, _> = collections
1143            .iter()
1144            .map(|(id, c)| (id.to_string(), format!("{c:?}")))
1145            .collect();
1146
1147        fn field(
1148            key: &str,
1149            value: impl Serialize,
1150        ) -> Result<(String, serde_json::Value), anyhow::Error> {
1151            let value = serde_json::to_value(value)?;
1152            Ok((key.to_string(), value))
1153        }
1154
1155        let map = serde_json::Map::from_iter([
1156            field("instance", instance)?,
1157            field("replicas", replicas)?,
1158            field("collections", collections)?,
1159        ]);
1160        Ok(serde_json::Value::Object(map))
1161    }
1162}
1163
1164#[derive(Debug)]
1165struct Collection<T> {
1166    write_only: bool,
1167    compute_dependencies: BTreeSet<GlobalId>,
1168    shared: SharedCollectionState<T>,
1169    /// The computed time dependence for this collection. None indicates no specific information,
1170    /// a value describes how the collection relates to wall-clock time.
1171    time_dependence: Option<TimeDependence>,
1172}
1173
1174impl<T: Timestamp> Collection<T> {
1175    fn new_log() -> Self {
1176        let as_of = Antichain::from_elem(T::minimum());
1177        Self {
1178            write_only: false,
1179            compute_dependencies: Default::default(),
1180            shared: SharedCollectionState::new(as_of),
1181            time_dependence: Some(TimeDependence::default()),
1182        }
1183    }
1184
1185    fn frontiers(&self) -> CollectionFrontiers<T> {
1186        let read_frontier = self
1187            .shared
1188            .lock_read_capabilities(|c| c.frontier().to_owned());
1189        let write_frontier = self.shared.lock_write_frontier(|f| f.clone());
1190        CollectionFrontiers {
1191            read_frontier,
1192            write_frontier,
1193        }
1194    }
1195}
1196
1197/// The frontiers of a compute collection.
1198#[derive(Clone, Debug)]
1199pub struct CollectionFrontiers<T> {
1200    /// The read frontier.
1201    pub read_frontier: Antichain<T>,
1202    /// The write frontier.
1203    pub write_frontier: Antichain<T>,
1204}
1205
1206impl<T: Timestamp> Default for CollectionFrontiers<T> {
1207    fn default() -> Self {
1208        Self {
1209            read_frontier: Antichain::from_elem(T::minimum()),
1210            write_frontier: Antichain::from_elem(T::minimum()),
1211        }
1212    }
1213}