Skip to main content

mz_compute_client/
controller.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller that provides an interface to the compute layer, and the storage layer below it.
11//!
12//! The compute controller manages the creation, maintenance, and removal of compute instances.
13//! This involves ensuring the intended service state with the orchestrator, as well as maintaining
14//! a dedicated compute instance controller for each active compute instance.
15//!
16//! For each compute instance, the compute controller curates the creation of indexes and sinks
17//! installed on the instance, the progress of readers through these collections, and their
18//! eventual dropping and resource reclamation.
19//!
20//! The state maintained for a compute instance can be viewed as a partial map from `GlobalId` to
21//! collection. It is an error to use an identifier before it has been "created" with
22//! `create_dataflow()`. Once created, the controller holds a read capability for each output
23//! collection of a dataflow, which is manipulated with `set_read_policy()`. Eventually, a
24//! collection is dropped with `drop_collections()`.
25//!
26//! A dataflow can be in read-only or read-write mode. In read-only mode, the dataflow does not
27//! modify any persistent state. Sending a `allow_write` message to the compute instance will
28//! transition the dataflow to read-write mode, allowing it to write to persistent sinks.
29//!
30//! Created dataflows will prevent the compaction of their inputs, including other compute
31//! collections but also collections managed by the storage layer. Each dataflow input is prevented
32//! from compacting beyond the allowed compaction of each of its outputs, ensuring that we can
33//! recover each dataflow to its current state in case of failure or other reconfiguration.
34
35use std::collections::{BTreeMap, BTreeSet};
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use mz_build_info::BuildInfo;
40use mz_cluster_client::client::ClusterReplicaLocation;
41use mz_cluster_client::metrics::ControllerMetrics;
42use mz_cluster_client::{ReplicaId, WallclockLagFn};
43use mz_compute_types::ComputeInstanceId;
44use mz_compute_types::config::ComputeReplicaConfig;
45use mz_compute_types::dataflows::DataflowDescription;
46use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
47use mz_dyncfg::ConfigSet;
48use mz_expr::RowSetFinishing;
49use mz_expr::row::RowCollection;
50use mz_ore::cast::CastFrom;
51use mz_ore::collections::CollectionExt;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::NowFn;
54use mz_ore::tracing::OpenTelemetryContext;
55use mz_persist_types::PersistLocation;
56use mz_repr::{Datum, GlobalId, RelationDesc, Row, TimestampManipulation};
57use mz_storage_client::controller::StorageController;
58use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
59use mz_storage_types::read_holds::ReadHold;
60use mz_storage_types::read_policy::ReadPolicy;
61use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
62use prometheus::proto::LabelPair;
63use serde::{Deserialize, Serialize};
64use timely::PartialOrder;
65use timely::progress::{Antichain, Timestamp};
66use tokio::sync::{mpsc, oneshot};
67use tokio::time::{self, MissedTickBehavior};
68use uuid::Uuid;
69
70use crate::controller::error::{
71    CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
72    HydrationCheckBadTarget, InstanceExists, InstanceMissing, PeekError, ReadPolicyError,
73    ReplicaCreationError, ReplicaDropError,
74};
75use crate::controller::instance::{Instance, SharedCollectionState};
76use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection_sink};
77use crate::controller::replica::ReplicaConfig;
78use crate::logging::{LogVariant, LoggingConfig};
79use crate::metrics::ComputeControllerMetrics;
80use crate::protocol::command::{ComputeParameters, PeekTarget};
81use crate::protocol::response::{PeekResponse, SubscribeBatch};
82
83mod instance;
84mod introspection;
85mod replica;
86mod sequential_hydration;
87
88pub mod error;
89pub mod instance_client;
90pub use instance_client::InstanceClient;
91
92pub(crate) type StorageCollections<T> = Arc<
93    dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T> + Send + Sync,
94>;
95
96/// A composite trait for types that serve as timestamps in the Compute Controller.
97/// `Into<Datum<'a>>` is needed for writing timestamps to introspection collections.
98pub trait ComputeControllerTimestamp: TimestampManipulation + Into<Datum<'static>> + Sync {}
99
100impl ComputeControllerTimestamp for mz_repr::Timestamp {}
101
102/// Responses from the compute controller.
103#[derive(Debug)]
104pub enum ComputeControllerResponse<T> {
105    /// See [`PeekNotification`].
106    PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
107    /// See [`crate::protocol::response::ComputeResponse::SubscribeResponse`].
108    SubscribeResponse(GlobalId, SubscribeBatch<T>),
109    /// The response from a dataflow containing an `CopyToS3Oneshot` sink.
110    ///
111    /// The `GlobalId` identifies the sink. The `Result` is the response from
112    /// the sink, where an `Ok(n)` indicates that `n` rows were successfully
113    /// copied to S3 and an `Err` indicates that an error was encountered
114    /// during the copy operation.
115    ///
116    /// For a given `CopyToS3Oneshot` sink, there will be at most one `CopyToResponse`
117    /// produced. (The sink may produce no responses if its dataflow is dropped
118    /// before completion.)
119    CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
120    /// A response reporting advancement of a collection's upper frontier.
121    ///
122    /// Once a collection's upper (aka "write frontier") has advanced to beyond a given time, the
123    /// contents of the collection as of that time have been sealed and cannot change anymore.
124    FrontierUpper {
125        /// The ID of a compute collection.
126        id: GlobalId,
127        /// The new upper frontier of the identified compute collection.
128        upper: Antichain<T>,
129    },
130}
131
132/// Notification and summary of a received and forwarded [`crate::protocol::response::ComputeResponse::PeekResponse`].
133#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
134pub enum PeekNotification {
135    /// Returned rows of a successful peek.
136    Success {
137        /// Number of rows in the returned peek result.
138        rows: u64,
139        /// Size of the returned peek result in bytes.
140        result_size: u64,
141    },
142    /// Error of an unsuccessful peek, including the reason for the error.
143    Error(String),
144    /// The peek was canceled.
145    Canceled,
146}
147
148impl PeekNotification {
149    /// Construct a new [`PeekNotification`] from a [`PeekResponse`]. The `offset` and `limit`
150    /// parameters are used to calculate the number of rows in the peek result.
151    fn new(peek_response: &PeekResponse, offset: usize, limit: Option<usize>) -> Self {
152        match peek_response {
153            PeekResponse::Rows(rows) => {
154                let num_rows = u64::cast_from(RowCollection::offset_limit(
155                    rows.iter().map(|r| r.count()).sum(),
156                    offset,
157                    limit,
158                ));
159                let result_size = u64::cast_from(rows.iter().map(|r| r.byte_len()).sum::<usize>());
160
161                tracing::trace!(?num_rows, ?result_size, "inline peek result");
162
163                Self::Success {
164                    rows: num_rows,
165                    result_size,
166                }
167            }
168            PeekResponse::Stashed(stashed_response) => {
169                let rows = stashed_response.num_rows(offset, limit);
170                let result_size = stashed_response.size_bytes();
171
172                tracing::trace!(?rows, ?result_size, "stashed peek result");
173
174                Self::Success {
175                    rows: u64::cast_from(rows),
176                    result_size: u64::cast_from(result_size),
177                }
178            }
179            PeekResponse::Error(err) => Self::Error(err.clone()),
180            PeekResponse::Canceled => Self::Canceled,
181        }
182    }
183}
184
185/// A controller for the compute layer.
186pub struct ComputeController<T: ComputeControllerTimestamp> {
187    instances: BTreeMap<ComputeInstanceId, InstanceState<T>>,
188    /// A map from an instance ID to an arbitrary string that describes the
189    /// class of the workload that compute instance is running (e.g.,
190    /// `production` or `staging`).
191    instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>,
192    build_info: &'static BuildInfo,
193    /// A handle providing access to storage collections.
194    storage_collections: StorageCollections<T>,
195    /// Set to `true` once `initialization_complete` has been called.
196    initialized: bool,
197    /// Whether or not this controller is in read-only mode.
198    ///
199    /// When in read-only mode, neither this controller nor the instances
200    /// controlled by it are allowed to affect changes to external systems
201    /// (largely persist).
202    read_only: bool,
203    /// Compute configuration to apply to new instances.
204    config: ComputeParameters,
205    /// The persist location where we can stash large peek results.
206    peek_stash_persist_location: PersistLocation,
207    /// A controller response to be returned on the next call to [`ComputeController::process`].
208    stashed_response: Option<ComputeControllerResponse<T>>,
209    /// The compute controller metrics.
210    metrics: ComputeControllerMetrics,
211    /// A function that produces the current wallclock time.
212    now: NowFn,
213    /// A function that computes the lag between the given time and wallclock time.
214    wallclock_lag: WallclockLagFn<T>,
215    /// Dynamic system configuration.
216    ///
217    /// Updated through `ComputeController::update_configuration` calls and shared with all
218    /// subcomponents of the compute controller.
219    dyncfg: Arc<ConfigSet>,
220
221    /// Receiver for responses produced by `Instance`s.
222    response_rx: mpsc::UnboundedReceiver<ComputeControllerResponse<T>>,
223    /// Response sender that's passed to new `Instance`s.
224    response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
225    /// Receiver for introspection updates produced by `Instance`s.
226    ///
227    /// When [`ComputeController::start_introspection_sink`] is first called, this receiver is
228    /// passed to the introspection sink task.
229    introspection_rx: Option<mpsc::UnboundedReceiver<IntrospectionUpdates>>,
230    /// Introspection updates sender that's passed to new `Instance`s.
231    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
232
233    /// Ticker for scheduling periodic maintenance work.
234    maintenance_ticker: tokio::time::Interval,
235    /// Whether maintenance work was scheduled.
236    maintenance_scheduled: bool,
237}
238
239impl<T: ComputeControllerTimestamp> ComputeController<T> {
240    /// Construct a new [`ComputeController`].
241    pub fn new(
242        build_info: &'static BuildInfo,
243        storage_collections: StorageCollections<T>,
244        read_only: bool,
245        metrics_registry: &MetricsRegistry,
246        peek_stash_persist_location: PersistLocation,
247        controller_metrics: ControllerMetrics,
248        now: NowFn,
249        wallclock_lag: WallclockLagFn<T>,
250    ) -> Self {
251        let (response_tx, response_rx) = mpsc::unbounded_channel();
252        let (introspection_tx, introspection_rx) = mpsc::unbounded_channel();
253
254        let mut maintenance_ticker = time::interval(Duration::from_secs(1));
255        maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
256
257        let instance_workload_classes = Arc::new(Mutex::new(BTreeMap::<
258            ComputeInstanceId,
259            Option<String>,
260        >::new()));
261
262        // Apply a `workload_class` label to all metrics in the registry that
263        // have an `instance_id` label for an instance whose workload class is
264        // known.
265        metrics_registry.register_postprocessor({
266            let instance_workload_classes = Arc::clone(&instance_workload_classes);
267            move |metrics| {
268                let instance_workload_classes = instance_workload_classes
269                    .lock()
270                    .expect("lock poisoned")
271                    .iter()
272                    .map(|(id, workload_class)| (id.to_string(), workload_class.clone()))
273                    .collect::<BTreeMap<String, Option<String>>>();
274                for metric in metrics {
275                    'metric: for metric in metric.mut_metric() {
276                        for label in metric.get_label() {
277                            if label.name() == "instance_id" {
278                                if let Some(workload_class) = instance_workload_classes
279                                    .get(label.value())
280                                    .cloned()
281                                    .flatten()
282                                {
283                                    let mut label = LabelPair::default();
284                                    label.set_name("workload_class".into());
285                                    label.set_value(workload_class.clone());
286
287                                    let mut labels = metric.take_label();
288                                    labels.push(label);
289                                    metric.set_label(labels);
290                                }
291                                continue 'metric;
292                            }
293                        }
294                    }
295                }
296            }
297        });
298
299        let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics);
300
301        Self {
302            instances: BTreeMap::new(),
303            instance_workload_classes,
304            build_info,
305            storage_collections,
306            initialized: false,
307            read_only,
308            config: Default::default(),
309            peek_stash_persist_location,
310            stashed_response: None,
311            metrics,
312            now,
313            wallclock_lag,
314            dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()),
315            response_rx,
316            response_tx,
317            introspection_rx: Some(introspection_rx),
318            introspection_tx,
319            maintenance_ticker,
320            maintenance_scheduled: false,
321        }
322    }
323
324    /// Start sinking the compute controller's introspection data into storage.
325    ///
326    /// This method should be called once the introspection collections have been registered with
327    /// the storage controller. It will panic if invoked earlier than that.
328    pub fn start_introspection_sink(
329        &mut self,
330        storage_controller: &dyn StorageController<Timestamp = T>,
331    ) {
332        if let Some(rx) = self.introspection_rx.take() {
333            spawn_introspection_sink(rx, storage_controller);
334        }
335    }
336
337    /// TODO(database-issues#7533): Add documentation.
338    pub fn instance_exists(&self, id: ComputeInstanceId) -> bool {
339        self.instances.contains_key(&id)
340    }
341
342    /// Return a reference to the indicated compute instance.
343    fn instance(&self, id: ComputeInstanceId) -> Result<&InstanceState<T>, InstanceMissing> {
344        self.instances.get(&id).ok_or(InstanceMissing(id))
345    }
346
347    /// Return an `InstanceClient` for the indicated compute instance.
348    pub fn instance_client(
349        &self,
350        id: ComputeInstanceId,
351    ) -> Result<InstanceClient<T>, InstanceMissing> {
352        self.instance(id).map(|instance| instance.client.clone())
353    }
354
355    /// Return a mutable reference to the indicated compute instance.
356    fn instance_mut(
357        &mut self,
358        id: ComputeInstanceId,
359    ) -> Result<&mut InstanceState<T>, InstanceMissing> {
360        self.instances.get_mut(&id).ok_or(InstanceMissing(id))
361    }
362
363    /// List the IDs of all collections in the identified compute instance.
364    pub fn collection_ids(
365        &self,
366        instance_id: ComputeInstanceId,
367    ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
368        let instance = self.instance(instance_id)?;
369        let ids = instance.collections.keys().copied();
370        Ok(ids)
371    }
372
373    /// Return the frontiers of the indicated collection.
374    ///
375    /// If an `instance_id` is provided, the collection is assumed to be installed on that
376    /// instance. Otherwise all available instances are searched.
377    pub fn collection_frontiers(
378        &self,
379        collection_id: GlobalId,
380        instance_id: Option<ComputeInstanceId>,
381    ) -> Result<CollectionFrontiers<T>, CollectionLookupError> {
382        let collection = match instance_id {
383            Some(id) => self.instance(id)?.collection(collection_id)?,
384            None => self
385                .instances
386                .values()
387                .find_map(|i| i.collections.get(&collection_id))
388                .ok_or(CollectionMissing(collection_id))?,
389        };
390
391        Ok(collection.frontiers())
392    }
393
394    /// List compute collections that depend on the given collection.
395    pub fn collection_reverse_dependencies(
396        &self,
397        instance_id: ComputeInstanceId,
398        id: GlobalId,
399    ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
400        let instance = self.instance(instance_id)?;
401        let collections = instance.collections.iter();
402        let ids = collections
403            .filter_map(move |(cid, c)| c.compute_dependencies.contains(&id).then_some(*cid));
404        Ok(ids)
405    }
406
407    /// Returns `true` iff the given collection has been hydrated.
408    ///
409    /// For this check, zero-replica clusters are always considered hydrated.
410    /// Their collections would never normally be considered hydrated but it's
411    /// clearly intentional that they have no replicas.
412    pub async fn collection_hydrated(
413        &self,
414        instance_id: ComputeInstanceId,
415        collection_id: GlobalId,
416    ) -> Result<bool, anyhow::Error> {
417        let instance = self.instance(instance_id)?;
418
419        let res = instance
420            .call_sync(move |i| i.collection_hydrated(collection_id))
421            .await?;
422
423        Ok(res)
424    }
425
426    /// Returns `true` if all non-transient, non-excluded collections are hydrated on any of the
427    /// provided replicas.
428    ///
429    /// For this check, zero-replica clusters are always considered hydrated.
430    /// Their collections would never normally be considered hydrated but it's
431    /// clearly intentional that they have no replicas.
432    pub fn collections_hydrated_for_replicas(
433        &self,
434        instance_id: ComputeInstanceId,
435        replicas: Vec<ReplicaId>,
436        exclude_collections: BTreeSet<GlobalId>,
437    ) -> Result<oneshot::Receiver<bool>, anyhow::Error> {
438        let instance = self.instance(instance_id)?;
439
440        // Validation
441        if instance.replicas.is_empty() && !replicas.iter().any(|id| instance.replicas.contains(id))
442        {
443            return Err(HydrationCheckBadTarget(replicas).into());
444        }
445
446        let (tx, rx) = oneshot::channel();
447        instance.call(move |i| {
448            let result = i
449                .collections_hydrated_on_replicas(Some(replicas), &exclude_collections)
450                .expect("validated");
451            let _ = tx.send(result);
452        });
453
454        Ok(rx)
455    }
456
457    /// Returns the state of the [`ComputeController`] formatted as JSON.
458    ///
459    /// The returned value is not guaranteed to be stable and may change at any point in time.
460    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
461        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
462        // returned object as a tradeoff between usability and stability. `serde_json` will fail
463        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
464        // prevents a future unrelated change from silently breaking this method.
465
466        // Destructure `self` here so we don't forget to consider dumping newly added fields.
467        let Self {
468            instances,
469            instance_workload_classes,
470            build_info: _,
471            storage_collections: _,
472            initialized,
473            read_only,
474            config: _,
475            peek_stash_persist_location: _,
476            stashed_response,
477            metrics: _,
478            now: _,
479            wallclock_lag: _,
480            dyncfg: _,
481            response_rx: _,
482            response_tx: _,
483            introspection_rx: _,
484            introspection_tx: _,
485            maintenance_ticker: _,
486            maintenance_scheduled,
487        } = self;
488
489        let mut instances_dump = BTreeMap::new();
490        for (id, instance) in instances {
491            let dump = instance.dump().await?;
492            instances_dump.insert(id.to_string(), dump);
493        }
494
495        let instance_workload_classes: BTreeMap<_, _> = instance_workload_classes
496            .lock()
497            .expect("lock poisoned")
498            .iter()
499            .map(|(id, wc)| (id.to_string(), format!("{wc:?}")))
500            .collect();
501
502        Ok(serde_json::json!({
503            "instances": instances_dump,
504            "instance_workload_classes": instance_workload_classes,
505            "initialized": initialized,
506            "read_only": read_only,
507            "stashed_response": format!("{stashed_response:?}"),
508            "maintenance_scheduled": maintenance_scheduled,
509        }))
510    }
511}
512
513impl<T> ComputeController<T>
514where
515    T: ComputeControllerTimestamp,
516{
517    /// Create a compute instance.
518    pub fn create_instance(
519        &mut self,
520        id: ComputeInstanceId,
521        arranged_logs: BTreeMap<LogVariant, GlobalId>,
522        workload_class: Option<String>,
523    ) -> Result<(), InstanceExists> {
524        if self.instances.contains_key(&id) {
525            return Err(InstanceExists(id));
526        }
527
528        let mut collections = BTreeMap::new();
529        let mut logs = Vec::with_capacity(arranged_logs.len());
530        for (&log, &id) in &arranged_logs {
531            let collection = Collection::new_log();
532            let shared = collection.shared.clone();
533            collections.insert(id, collection);
534            logs.push((log, id, shared));
535        }
536
537        let client = InstanceClient::spawn(
538            id,
539            self.build_info,
540            Arc::clone(&self.storage_collections),
541            self.peek_stash_persist_location.clone(),
542            logs,
543            self.metrics.for_instance(id),
544            self.now.clone(),
545            self.wallclock_lag.clone(),
546            Arc::clone(&self.dyncfg),
547            self.response_tx.clone(),
548            self.introspection_tx.clone(),
549            self.read_only,
550        );
551
552        let instance = InstanceState::new(client, collections);
553        self.instances.insert(id, instance);
554
555        self.instance_workload_classes
556            .lock()
557            .expect("lock poisoned")
558            .insert(id, workload_class.clone());
559
560        let instance = self.instances.get_mut(&id).expect("instance just added");
561        if self.initialized {
562            instance.call(Instance::initialization_complete);
563        }
564
565        let mut config_params = self.config.clone();
566        config_params.workload_class = Some(workload_class);
567        instance.call(|i| i.update_configuration(config_params));
568
569        Ok(())
570    }
571
572    /// Updates a compute instance's workload class.
573    pub fn update_instance_workload_class(
574        &mut self,
575        id: ComputeInstanceId,
576        workload_class: Option<String>,
577    ) -> Result<(), InstanceMissing> {
578        // Ensure that the instance exists first.
579        let _ = self.instance(id)?;
580
581        self.instance_workload_classes
582            .lock()
583            .expect("lock poisoned")
584            .insert(id, workload_class);
585
586        // Cause a config update to notify the instance about its new workload class.
587        self.update_configuration(Default::default());
588
589        Ok(())
590    }
591
592    /// Remove a compute instance.
593    ///
594    /// # Panics
595    ///
596    /// Panics if the identified `instance` still has active replicas.
597    pub fn drop_instance(&mut self, id: ComputeInstanceId) {
598        if let Some(instance) = self.instances.remove(&id) {
599            instance.call(|i| i.shutdown());
600        }
601
602        self.instance_workload_classes
603            .lock()
604            .expect("lock poisoned")
605            .remove(&id);
606    }
607
608    /// Returns the compute controller's config set.
609    pub fn dyncfg(&self) -> &Arc<ConfigSet> {
610        &self.dyncfg
611    }
612
613    /// Update compute configuration.
614    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
615        // Apply dyncfg updates.
616        config_params.dyncfg_updates.apply(&self.dyncfg);
617
618        let instance_workload_classes = self
619            .instance_workload_classes
620            .lock()
621            .expect("lock poisoned");
622
623        // Forward updates to existing clusters.
624        // Workload classes are cluster-specific, so we need to overwrite them here.
625        for (id, instance) in self.instances.iter_mut() {
626            let mut params = config_params.clone();
627            params.workload_class = Some(instance_workload_classes[id].clone());
628            instance.call(|i| i.update_configuration(params));
629        }
630
631        let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(&self.dyncfg);
632        match overflowing_behavior.parse() {
633            Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
634            Err(err) => {
635                tracing::error!(
636                    err,
637                    overflowing_behavior,
638                    "Invalid value for ore_overflowing_behavior"
639                );
640            }
641        }
642
643        // Remember updates for future clusters.
644        self.config.update(config_params);
645    }
646
647    /// Mark the end of any initialization commands.
648    ///
649    /// The implementor may wait for this method to be called before implementing prior commands,
650    /// and so it is important for a user to invoke this method as soon as it is comfortable.
651    /// This method can be invoked immediately, at the potential expense of performance.
652    pub fn initialization_complete(&mut self) {
653        self.initialized = true;
654        for instance in self.instances.values_mut() {
655            instance.call(Instance::initialization_complete);
656        }
657    }
658
659    /// Wait until the controller is ready to do some processing.
660    ///
661    /// This method may block for an arbitrarily long time.
662    ///
663    /// When the method returns, the caller should call [`ComputeController::process`].
664    ///
665    /// This method is cancellation safe.
666    pub async fn ready(&mut self) {
667        if self.stashed_response.is_some() {
668            // We still have a response stashed, which we are immediately ready to process.
669            return;
670        }
671        if self.maintenance_scheduled {
672            // Maintenance work has been scheduled.
673            return;
674        }
675
676        tokio::select! {
677            resp = self.response_rx.recv() => {
678                let resp = resp.expect("`self.response_tx` not dropped");
679                self.stashed_response = Some(resp);
680            }
681            _ = self.maintenance_ticker.tick() => {
682                self.maintenance_scheduled = true;
683            },
684        }
685    }
686
687    /// Adds replicas of an instance.
688    pub fn add_replica_to_instance(
689        &mut self,
690        instance_id: ComputeInstanceId,
691        replica_id: ReplicaId,
692        location: ClusterReplicaLocation,
693        config: ComputeReplicaConfig,
694    ) -> Result<(), ReplicaCreationError> {
695        use ReplicaCreationError::*;
696
697        let instance = self.instance(instance_id)?;
698
699        // Validation
700        if instance.replicas.contains(&replica_id) {
701            return Err(ReplicaExists(replica_id));
702        }
703
704        let (enable_logging, interval) = match config.logging.interval {
705            Some(interval) => (true, interval),
706            None => (false, Duration::from_secs(1)),
707        };
708
709        let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.dyncfg);
710
711        let replica_config = ReplicaConfig {
712            location,
713            logging: LoggingConfig {
714                interval,
715                enable_logging,
716                log_logging: config.logging.log_logging,
717                index_logs: Default::default(),
718            },
719            grpc_client: self.config.grpc_client.clone(),
720            expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
721        };
722
723        let instance = self.instance_mut(instance_id).expect("validated");
724        instance.replicas.insert(replica_id);
725
726        instance.call(move |i| {
727            i.add_replica(replica_id, replica_config, None)
728                .expect("validated")
729        });
730
731        Ok(())
732    }
733
734    /// Removes a replica from an instance, including its service in the orchestrator.
735    pub fn drop_replica(
736        &mut self,
737        instance_id: ComputeInstanceId,
738        replica_id: ReplicaId,
739    ) -> Result<(), ReplicaDropError> {
740        use ReplicaDropError::*;
741
742        let instance = self.instance_mut(instance_id)?;
743
744        // Validation
745        if !instance.replicas.contains(&replica_id) {
746            return Err(ReplicaMissing(replica_id));
747        }
748
749        instance.replicas.remove(&replica_id);
750
751        instance.call(move |i| i.remove_replica(replica_id).expect("validated"));
752
753        Ok(())
754    }
755
756    /// Creates the described dataflow and initializes state for its output.
757    ///
758    /// Only materialized views and subscribes are allowed to have a `target_replica`.
759    ///
760    /// Panics if called with a dataflow description that has index exports
761    /// when `target_replica` is set.
762    pub fn create_dataflow(
763        &mut self,
764        instance_id: ComputeInstanceId,
765        mut dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
766        target_replica: Option<ReplicaId>,
767    ) -> Result<(), DataflowCreationError> {
768        use DataflowCreationError::*;
769
770        let instance = self.instance(instance_id)?;
771
772        // Validation: target replica
773        if let Some(replica_id) = target_replica {
774            if !instance.replicas.contains(&replica_id) {
775                return Err(ReplicaMissing(replica_id));
776            }
777            assert!(
778                dataflow.exported_index_ids().next().is_none(),
779                "Replica-targeted indexes are not supported"
780            );
781        }
782
783        // Validation: as_of
784        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
785        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
786            return Err(EmptyAsOfForSubscribe);
787        }
788        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
789            return Err(EmptyAsOfForCopyTo);
790        }
791
792        // Validation: input collections
793        let storage_ids = dataflow.imported_source_ids().collect();
794        let mut import_read_holds = self.storage_collections.acquire_read_holds(storage_ids)?;
795        for id in dataflow.imported_index_ids() {
796            let read_hold = instance.acquire_read_hold(id)?;
797            import_read_holds.push(read_hold);
798        }
799        for hold in &import_read_holds {
800            if PartialOrder::less_than(as_of, hold.since()) {
801                return Err(SinceViolation(hold.id()));
802            }
803        }
804
805        // Validation: storage sink collections
806        for id in dataflow.persist_sink_ids() {
807            if self.storage_collections.check_exists(id).is_err() {
808                return Err(CollectionMissing(id));
809            }
810        }
811        let time_dependence = self
812            .determine_time_dependence(instance_id, &dataflow)
813            .expect("must exist");
814
815        let instance = self.instance_mut(instance_id).expect("validated");
816
817        let mut shared_collection_state = BTreeMap::new();
818        for id in dataflow.export_ids() {
819            let shared = SharedCollectionState::new(as_of.clone());
820            let collection = Collection {
821                write_only: dataflow.sink_exports.contains_key(&id),
822                compute_dependencies: dataflow.imported_index_ids().collect(),
823                shared: shared.clone(),
824                time_dependence: time_dependence.clone(),
825            };
826            instance.collections.insert(id, collection);
827            shared_collection_state.insert(id, shared);
828        }
829
830        dataflow.time_dependence = time_dependence;
831
832        instance.call(move |i| {
833            i.create_dataflow(
834                dataflow,
835                import_read_holds,
836                shared_collection_state,
837                target_replica,
838            )
839            .expect("validated")
840        });
841
842        Ok(())
843    }
844
845    /// Drop the read capability for the given collections and allow their resources to be
846    /// reclaimed.
847    pub fn drop_collections(
848        &mut self,
849        instance_id: ComputeInstanceId,
850        collection_ids: Vec<GlobalId>,
851    ) -> Result<(), CollectionUpdateError> {
852        let instance = self.instance_mut(instance_id)?;
853
854        // Validation
855        for id in &collection_ids {
856            instance.collection(*id)?;
857        }
858
859        for id in &collection_ids {
860            instance.collections.remove(id);
861        }
862
863        instance.call(|i| i.drop_collections(collection_ids).expect("validated"));
864
865        Ok(())
866    }
867
868    /// Initiate a peek request for the contents of the given collection at `timestamp`.
869    pub fn peek(
870        &self,
871        instance_id: ComputeInstanceId,
872        peek_target: PeekTarget,
873        literal_constraints: Option<Vec<Row>>,
874        uuid: Uuid,
875        timestamp: T,
876        result_desc: RelationDesc,
877        finishing: RowSetFinishing,
878        map_filter_project: mz_expr::SafeMfpPlan,
879        target_replica: Option<ReplicaId>,
880        peek_response_tx: oneshot::Sender<PeekResponse>,
881    ) -> Result<(), PeekError> {
882        use PeekError::*;
883
884        let instance = self.instance(instance_id)?;
885
886        // Validation: target replica
887        if let Some(replica_id) = target_replica {
888            if !instance.replicas.contains(&replica_id) {
889                return Err(ReplicaMissing(replica_id));
890            }
891        }
892
893        // Validation: peek target
894        let read_hold = match &peek_target {
895            PeekTarget::Index { id } => instance.acquire_read_hold(*id)?,
896            PeekTarget::Persist { id, .. } => self
897                .storage_collections
898                .acquire_read_holds(vec![*id])?
899                .into_element(),
900        };
901        if !read_hold.since().less_equal(&timestamp) {
902            return Err(SinceViolation(peek_target.id()));
903        }
904
905        instance.call(move |i| {
906            i.peek(
907                peek_target,
908                literal_constraints,
909                uuid,
910                timestamp,
911                result_desc,
912                finishing,
913                map_filter_project,
914                read_hold,
915                target_replica,
916                peek_response_tx,
917            )
918            .expect("validated")
919        });
920
921        Ok(())
922    }
923
924    /// Cancel an existing peek request.
925    ///
926    /// Canceling a peek is best effort. The caller may see any of the following
927    /// after canceling a peek request:
928    ///
929    ///   * A `PeekResponse::Rows` indicating that the cancellation request did
930    ///     not take effect in time and the query succeeded.
931    ///   * A `PeekResponse::Canceled` affirming that the peek was canceled.
932    ///   * No `PeekResponse` at all.
933    pub fn cancel_peek(
934        &self,
935        instance_id: ComputeInstanceId,
936        uuid: Uuid,
937        reason: PeekResponse,
938    ) -> Result<(), InstanceMissing> {
939        self.instance(instance_id)?
940            .call(move |i| i.cancel_peek(uuid, reason));
941        Ok(())
942    }
943
944    /// Assign a read policy to specific identifiers.
945    ///
946    /// The policies are assigned in the order presented, and repeated identifiers should
947    /// conclude with the last policy. Changing a policy will immediately downgrade the read
948    /// capability if appropriate, but it will not "recover" the read capability if the prior
949    /// capability is already ahead of it.
950    ///
951    /// Identifiers not present in `policies` retain their existing read policies.
952    ///
953    /// It is an error to attempt to set a read policy for a collection that is not readable in the
954    /// context of compute. At this time, only indexes are readable compute collections.
955    pub fn set_read_policy(
956        &self,
957        instance_id: ComputeInstanceId,
958        policies: Vec<(GlobalId, ReadPolicy<T>)>,
959    ) -> Result<(), ReadPolicyError> {
960        use ReadPolicyError::*;
961
962        let instance = self.instance(instance_id)?;
963
964        // Validation
965        for (id, _) in &policies {
966            let collection = instance.collection(*id)?;
967            if collection.write_only {
968                return Err(WriteOnlyCollection(*id));
969            }
970        }
971
972        self.instance(instance_id)?
973            .call(|i| i.set_read_policy(policies).expect("validated"));
974
975        Ok(())
976    }
977
978    /// Acquires a [`ReadHold`] for the identified compute collection.
979    pub fn acquire_read_hold(
980        &self,
981        instance_id: ComputeInstanceId,
982        collection_id: GlobalId,
983    ) -> Result<ReadHold<T>, CollectionUpdateError> {
984        let read_hold = self
985            .instance(instance_id)?
986            .acquire_read_hold(collection_id)?;
987        Ok(read_hold)
988    }
989
990    /// Determine the time dependence for a dataflow.
991    fn determine_time_dependence(
992        &self,
993        instance_id: ComputeInstanceId,
994        dataflow: &DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
995    ) -> Result<Option<TimeDependence>, TimeDependenceError> {
996        // TODO(ct3): Continual tasks don't support replica expiration
997        let is_continual_task = dataflow.continual_task_ids().next().is_some();
998        if is_continual_task {
999            return Ok(None);
1000        }
1001
1002        let instance = self
1003            .instance(instance_id)
1004            .map_err(|err| TimeDependenceError::InstanceMissing(err.0))?;
1005        let mut time_dependencies = Vec::new();
1006
1007        for id in dataflow.imported_index_ids() {
1008            let dependence = instance
1009                .get_time_dependence(id)
1010                .map_err(|err| TimeDependenceError::CollectionMissing(err.0))?;
1011            time_dependencies.push(dependence);
1012        }
1013
1014        'source: for id in dataflow.imported_source_ids() {
1015            // We first check whether the id is backed by a compute object, in which case we use
1016            // the time dependence we know. This is true for materialized views, continual tasks,
1017            // etc.
1018            for instance in self.instances.values() {
1019                if let Ok(dependence) = instance.get_time_dependence(id) {
1020                    time_dependencies.push(dependence);
1021                    continue 'source;
1022                }
1023            }
1024
1025            // Not a compute object: Consult the storage collections controller.
1026            time_dependencies.push(self.storage_collections.determine_time_dependence(id)?);
1027        }
1028
1029        Ok(TimeDependence::merge(
1030            time_dependencies,
1031            dataflow.refresh_schedule.as_ref(),
1032        ))
1033    }
1034
1035    /// Processes the work queued by [`ComputeController::ready`].
1036    #[mz_ore::instrument(level = "debug")]
1037    pub fn process(&mut self) -> Option<ComputeControllerResponse<T>> {
1038        // Perform periodic maintenance work.
1039        if self.maintenance_scheduled {
1040            self.maintain();
1041            self.maintenance_scheduled = false;
1042        }
1043
1044        // Return a ready response, if any.
1045        self.stashed_response.take()
1046    }
1047
1048    #[mz_ore::instrument(level = "debug")]
1049    fn maintain(&mut self) {
1050        // Perform instance maintenance work.
1051        for instance in self.instances.values_mut() {
1052            instance.call(Instance::maintain);
1053        }
1054    }
1055
1056    /// Allow writes for the specified collections on `instance_id`.
1057    ///
1058    /// If this controller is in read-only mode, this is a no-op.
1059    pub fn allow_writes(
1060        &mut self,
1061        instance_id: ComputeInstanceId,
1062        collection_id: GlobalId,
1063    ) -> Result<(), CollectionUpdateError> {
1064        if self.read_only {
1065            tracing::debug!("Skipping allow_writes in read-only mode");
1066            return Ok(());
1067        }
1068
1069        let instance = self.instance_mut(instance_id)?;
1070
1071        // Validation
1072        instance.collection(collection_id)?;
1073
1074        instance.call(move |i| i.allow_writes(collection_id).expect("validated"));
1075
1076        Ok(())
1077    }
1078}
1079
1080#[derive(Debug)]
1081struct InstanceState<T: ComputeControllerTimestamp> {
1082    client: InstanceClient<T>,
1083    replicas: BTreeSet<ReplicaId>,
1084    collections: BTreeMap<GlobalId, Collection<T>>,
1085}
1086
1087impl<T: ComputeControllerTimestamp> InstanceState<T> {
1088    fn new(client: InstanceClient<T>, collections: BTreeMap<GlobalId, Collection<T>>) -> Self {
1089        Self {
1090            client,
1091            replicas: Default::default(),
1092            collections,
1093        }
1094    }
1095
1096    fn collection(&self, id: GlobalId) -> Result<&Collection<T>, CollectionMissing> {
1097        self.collections.get(&id).ok_or(CollectionMissing(id))
1098    }
1099
1100    /// Calls the given function on the instance task. Does not await the result.
1101    ///
1102    /// # Panics
1103    ///
1104    /// Panics if the instance corresponding to `self` does not exist.
1105    fn call<F>(&self, f: F)
1106    where
1107        F: FnOnce(&mut Instance<T>) + Send + 'static,
1108    {
1109        self.client.call(f).expect("instance not dropped")
1110    }
1111
1112    /// Calls the given function on the instance task, and awaits the result.
1113    ///
1114    /// # Panics
1115    ///
1116    /// Panics if the instance corresponding to `self` does not exist.
1117    async fn call_sync<F, R>(&self, f: F) -> R
1118    where
1119        F: FnOnce(&mut Instance<T>) -> R + Send + 'static,
1120        R: Send + 'static,
1121    {
1122        self.client
1123            .call_sync(f)
1124            .await
1125            .expect("instance not dropped")
1126    }
1127
1128    /// Acquires a [`ReadHold`] for the identified compute collection.
1129    pub fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
1130        // We acquire read holds at the earliest possible time rather than returning a copy
1131        // of the implied read hold. This is so that in `create_dataflow` we can acquire read holds
1132        // on compute dependencies at frontiers that are held back by other read holds the caller
1133        // has previously taken.
1134        //
1135        // If/when we change the compute API to expect callers to pass in the `ReadHold`s rather
1136        // than acquiring them ourselves, we might tighten this up and instead acquire read holds
1137        // at the implied capability.
1138
1139        let collection = self.collection(id)?;
1140        let since = collection.shared.lock_read_capabilities(|caps| {
1141            let since = caps.frontier().to_owned();
1142            caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
1143            since
1144        });
1145
1146        let hold = ReadHold::new(id, since, self.client.read_hold_tx());
1147        Ok(hold)
1148    }
1149
1150    /// Return the stored time dependence for a collection.
1151    fn get_time_dependence(
1152        &self,
1153        id: GlobalId,
1154    ) -> Result<Option<TimeDependence>, CollectionMissing> {
1155        Ok(self.collection(id)?.time_dependence.clone())
1156    }
1157
1158    /// Returns the [`InstanceState`] formatted as JSON.
1159    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1160        // Destructure `self` here so we don't forget to consider dumping newly added fields.
1161        let Self {
1162            client: _,
1163            replicas,
1164            collections,
1165        } = self;
1166
1167        let instance = self.call_sync(|i| i.dump()).await?;
1168        let replicas: Vec<_> = replicas.iter().map(|id| id.to_string()).collect();
1169        let collections: BTreeMap<_, _> = collections
1170            .iter()
1171            .map(|(id, c)| (id.to_string(), format!("{c:?}")))
1172            .collect();
1173
1174        Ok(serde_json::json!({
1175            "instance": instance,
1176            "replicas": replicas,
1177            "collections": collections,
1178        }))
1179    }
1180}
1181
1182#[derive(Debug)]
1183struct Collection<T> {
1184    /// Whether a collection is write-only, i.e., we cannot read it directly like an index.
1185    write_only: bool,
1186    compute_dependencies: BTreeSet<GlobalId>,
1187    shared: SharedCollectionState<T>,
1188    /// The computed time dependence for this collection. None indicates no specific information,
1189    /// a value describes how the collection relates to wall-clock time.
1190    time_dependence: Option<TimeDependence>,
1191}
1192
1193impl<T: Timestamp> Collection<T> {
1194    fn new_log() -> Self {
1195        let as_of = Antichain::from_elem(T::minimum());
1196        Self {
1197            write_only: false,
1198            compute_dependencies: Default::default(),
1199            shared: SharedCollectionState::new(as_of),
1200            time_dependence: Some(TimeDependence::default()),
1201        }
1202    }
1203
1204    fn frontiers(&self) -> CollectionFrontiers<T> {
1205        let read_frontier = self
1206            .shared
1207            .lock_read_capabilities(|c| c.frontier().to_owned());
1208        let write_frontier = self.shared.lock_write_frontier(|f| f.clone());
1209        CollectionFrontiers {
1210            read_frontier,
1211            write_frontier,
1212        }
1213    }
1214}
1215
1216/// The frontiers of a compute collection.
1217#[derive(Clone, Debug)]
1218pub struct CollectionFrontiers<T> {
1219    /// The read frontier.
1220    pub read_frontier: Antichain<T>,
1221    /// The write frontier.
1222    pub write_frontier: Antichain<T>,
1223}
1224
1225impl<T: Timestamp> Default for CollectionFrontiers<T> {
1226    fn default() -> Self {
1227        Self {
1228            read_frontier: Antichain::from_elem(T::minimum()),
1229            write_frontier: Antichain::from_elem(T::minimum()),
1230        }
1231    }
1232}