Skip to main content

mz_compute_client/
controller.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller that provides an interface to the compute layer, and the storage layer below it.
11//!
12//! The compute controller manages the creation, maintenance, and removal of compute instances.
13//! This involves ensuring the intended service state with the orchestrator, as well as maintaining
14//! a dedicated compute instance controller for each active compute instance.
15//!
16//! For each compute instance, the compute controller curates the creation of indexes and sinks
17//! installed on the instance, the progress of readers through these collections, and their
18//! eventual dropping and resource reclamation.
19//!
20//! The state maintained for a compute instance can be viewed as a partial map from `GlobalId` to
21//! collection. It is an error to use an identifier before it has been "created" with
22//! `create_dataflow()`. Once created, the controller holds a read capability for each output
23//! collection of a dataflow, which is manipulated with `set_read_policy()`. Eventually, a
24//! collection is dropped with `drop_collections()`.
25//!
26//! A dataflow can be in read-only or read-write mode. In read-only mode, the dataflow does not
27//! modify any persistent state. Sending a `allow_write` message to the compute instance will
28//! transition the dataflow to read-write mode, allowing it to write to persistent sinks.
29//!
30//! Created dataflows will prevent the compaction of their inputs, including other compute
31//! collections but also collections managed by the storage layer. Each dataflow input is prevented
32//! from compacting beyond the allowed compaction of each of its outputs, ensuring that we can
33//! recover each dataflow to its current state in case of failure or other reconfiguration.
34
35use std::collections::{BTreeMap, BTreeSet};
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use mz_build_info::BuildInfo;
40use mz_cluster_client::client::ClusterReplicaLocation;
41use mz_cluster_client::metrics::ControllerMetrics;
42use mz_cluster_client::{ReplicaId, WallclockLagFn};
43use mz_compute_types::ComputeInstanceId;
44use mz_compute_types::config::ComputeReplicaConfig;
45use mz_compute_types::dataflows::DataflowDescription;
46use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
47use mz_dyncfg::ConfigSet;
48use mz_expr::RowSetFinishing;
49use mz_ore::cast::CastFrom;
50use mz_ore::collections::CollectionExt;
51use mz_ore::metrics::MetricsRegistry;
52use mz_ore::now::NowFn;
53use mz_ore::tracing::OpenTelemetryContext;
54use mz_persist_types::PersistLocation;
55use mz_repr::{Datum, GlobalId, RelationDesc, Row, TimestampManipulation};
56use mz_storage_client::controller::StorageController;
57use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
58use mz_storage_types::read_holds::ReadHold;
59use mz_storage_types::read_policy::ReadPolicy;
60use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
61use prometheus::proto::LabelPair;
62use serde::{Deserialize, Serialize};
63use timely::PartialOrder;
64use timely::progress::{Antichain, Timestamp};
65use tokio::sync::{mpsc, oneshot};
66use tokio::time::{self, MissedTickBehavior};
67use uuid::Uuid;
68
69use crate::controller::error::{
70    CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
71    HydrationCheckBadTarget, InstanceExists, InstanceMissing, PeekError, ReadPolicyError,
72    ReplicaCreationError, ReplicaDropError,
73};
74use crate::controller::instance::{Instance, SharedCollectionState};
75use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection_sink};
76use crate::controller::replica::ReplicaConfig;
77use crate::logging::{LogVariant, LoggingConfig};
78use crate::metrics::ComputeControllerMetrics;
79use crate::protocol::command::{ComputeParameters, PeekTarget};
80use crate::protocol::response::{PeekResponse, SubscribeBatch};
81
82mod instance;
83mod introspection;
84mod replica;
85mod sequential_hydration;
86
87pub mod error;
88pub mod instance_client;
89pub use instance_client::InstanceClient;
90
91pub(crate) type StorageCollections<T> = Arc<
92    dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T> + Send + Sync,
93>;
94
95/// A composite trait for types that serve as timestamps in the Compute Controller.
96/// `Into<Datum<'a>>` is needed for writing timestamps to introspection collections.
97pub trait ComputeControllerTimestamp: TimestampManipulation + Into<Datum<'static>> + Sync {}
98
99impl ComputeControllerTimestamp for mz_repr::Timestamp {}
100
101/// Responses from the compute controller.
102#[derive(Debug)]
103pub enum ComputeControllerResponse<T> {
104    /// See [`PeekNotification`].
105    PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
106    /// See [`crate::protocol::response::ComputeResponse::SubscribeResponse`].
107    SubscribeResponse(GlobalId, SubscribeBatch<T>),
108    /// The response from a dataflow containing an `CopyToS3Oneshot` sink.
109    ///
110    /// The `GlobalId` identifies the sink. The `Result` is the response from
111    /// the sink, where an `Ok(n)` indicates that `n` rows were successfully
112    /// copied to S3 and an `Err` indicates that an error was encountered
113    /// during the copy operation.
114    ///
115    /// For a given `CopyToS3Oneshot` sink, there will be at most one `CopyToResponse`
116    /// produced. (The sink may produce no responses if its dataflow is dropped
117    /// before completion.)
118    CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
119    /// A response reporting advancement of a collection's upper frontier.
120    ///
121    /// Once a collection's upper (aka "write frontier") has advanced to beyond a given time, the
122    /// contents of the collection as of that time have been sealed and cannot change anymore.
123    FrontierUpper {
124        /// The ID of a compute collection.
125        id: GlobalId,
126        /// The new upper frontier of the identified compute collection.
127        upper: Antichain<T>,
128    },
129}
130
131/// Notification and summary of a received and forwarded [`crate::protocol::response::ComputeResponse::PeekResponse`].
132#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
133pub enum PeekNotification {
134    /// Returned rows of a successful peek.
135    Success {
136        /// Number of rows in the returned peek result.
137        rows: u64,
138        /// Size of the returned peek result in bytes.
139        result_size: u64,
140    },
141    /// Error of an unsuccessful peek, including the reason for the error.
142    Error(String),
143    /// The peek was canceled.
144    Canceled,
145}
146
147impl PeekNotification {
148    /// Construct a new [`PeekNotification`] from a [`PeekResponse`]. The `offset` and `limit`
149    /// parameters are used to calculate the number of rows in the peek result.
150    fn new(peek_response: &PeekResponse, offset: usize, limit: Option<usize>) -> Self {
151        match peek_response {
152            PeekResponse::Rows(rows) => {
153                let num_rows = u64::cast_from(rows.count(offset, limit));
154                let result_size = u64::cast_from(rows.byte_len());
155
156                tracing::trace!(?num_rows, ?result_size, "inline peek result");
157
158                Self::Success {
159                    rows: num_rows,
160                    result_size,
161                }
162            }
163            PeekResponse::Stashed(stashed_response) => {
164                let rows = stashed_response.num_rows(offset, limit);
165                let result_size = stashed_response.size_bytes();
166
167                tracing::trace!(?rows, ?result_size, "stashed peek result");
168
169                Self::Success {
170                    rows: u64::cast_from(rows),
171                    result_size: u64::cast_from(result_size),
172                }
173            }
174            PeekResponse::Error(err) => Self::Error(err.clone()),
175            PeekResponse::Canceled => Self::Canceled,
176        }
177    }
178}
179
180/// A controller for the compute layer.
181pub struct ComputeController<T: ComputeControllerTimestamp> {
182    instances: BTreeMap<ComputeInstanceId, InstanceState<T>>,
183    /// A map from an instance ID to an arbitrary string that describes the
184    /// class of the workload that compute instance is running (e.g.,
185    /// `production` or `staging`).
186    instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>,
187    build_info: &'static BuildInfo,
188    /// A handle providing access to storage collections.
189    storage_collections: StorageCollections<T>,
190    /// Set to `true` once `initialization_complete` has been called.
191    initialized: bool,
192    /// Whether or not this controller is in read-only mode.
193    ///
194    /// When in read-only mode, neither this controller nor the instances
195    /// controlled by it are allowed to affect changes to external systems
196    /// (largely persist).
197    read_only: bool,
198    /// Compute configuration to apply to new instances.
199    config: ComputeParameters,
200    /// The persist location where we can stash large peek results.
201    peek_stash_persist_location: PersistLocation,
202    /// A controller response to be returned on the next call to [`ComputeController::process`].
203    stashed_response: Option<ComputeControllerResponse<T>>,
204    /// The compute controller metrics.
205    metrics: ComputeControllerMetrics,
206    /// A function that produces the current wallclock time.
207    now: NowFn,
208    /// A function that computes the lag between the given time and wallclock time.
209    wallclock_lag: WallclockLagFn<T>,
210    /// Dynamic system configuration.
211    ///
212    /// Updated through `ComputeController::update_configuration` calls and shared with all
213    /// subcomponents of the compute controller.
214    dyncfg: Arc<ConfigSet>,
215
216    /// Receiver for responses produced by `Instance`s.
217    response_rx: mpsc::UnboundedReceiver<ComputeControllerResponse<T>>,
218    /// Response sender that's passed to new `Instance`s.
219    response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
220    /// Receiver for introspection updates produced by `Instance`s.
221    ///
222    /// When [`ComputeController::start_introspection_sink`] is first called, this receiver is
223    /// passed to the introspection sink task.
224    introspection_rx: Option<mpsc::UnboundedReceiver<IntrospectionUpdates>>,
225    /// Introspection updates sender that's passed to new `Instance`s.
226    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
227
228    /// Ticker for scheduling periodic maintenance work.
229    maintenance_ticker: tokio::time::Interval,
230    /// Whether maintenance work was scheduled.
231    maintenance_scheduled: bool,
232}
233
234impl<T: ComputeControllerTimestamp> ComputeController<T> {
235    /// Construct a new [`ComputeController`].
236    pub fn new(
237        build_info: &'static BuildInfo,
238        storage_collections: StorageCollections<T>,
239        read_only: bool,
240        metrics_registry: &MetricsRegistry,
241        peek_stash_persist_location: PersistLocation,
242        controller_metrics: ControllerMetrics,
243        now: NowFn,
244        wallclock_lag: WallclockLagFn<T>,
245    ) -> Self {
246        let (response_tx, response_rx) = mpsc::unbounded_channel();
247        let (introspection_tx, introspection_rx) = mpsc::unbounded_channel();
248
249        let mut maintenance_ticker = time::interval(Duration::from_secs(1));
250        maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
251
252        let instance_workload_classes = Arc::new(Mutex::new(BTreeMap::<
253            ComputeInstanceId,
254            Option<String>,
255        >::new()));
256
257        // Apply a `workload_class` label to all metrics in the registry that
258        // have an `instance_id` label for an instance whose workload class is
259        // known.
260        metrics_registry.register_postprocessor({
261            let instance_workload_classes = Arc::clone(&instance_workload_classes);
262            move |metrics| {
263                let instance_workload_classes = instance_workload_classes
264                    .lock()
265                    .expect("lock poisoned")
266                    .iter()
267                    .map(|(id, workload_class)| (id.to_string(), workload_class.clone()))
268                    .collect::<BTreeMap<String, Option<String>>>();
269                for metric in metrics {
270                    'metric: for metric in metric.mut_metric() {
271                        for label in metric.get_label() {
272                            if label.name() == "instance_id" {
273                                if let Some(workload_class) = instance_workload_classes
274                                    .get(label.value())
275                                    .cloned()
276                                    .flatten()
277                                {
278                                    let mut label = LabelPair::default();
279                                    label.set_name("workload_class".into());
280                                    label.set_value(workload_class.clone());
281
282                                    let mut labels = metric.take_label();
283                                    labels.push(label);
284                                    metric.set_label(labels);
285                                }
286                                continue 'metric;
287                            }
288                        }
289                    }
290                }
291            }
292        });
293
294        let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics);
295
296        Self {
297            instances: BTreeMap::new(),
298            instance_workload_classes,
299            build_info,
300            storage_collections,
301            initialized: false,
302            read_only,
303            config: Default::default(),
304            peek_stash_persist_location,
305            stashed_response: None,
306            metrics,
307            now,
308            wallclock_lag,
309            dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()),
310            response_rx,
311            response_tx,
312            introspection_rx: Some(introspection_rx),
313            introspection_tx,
314            maintenance_ticker,
315            maintenance_scheduled: false,
316        }
317    }
318
319    /// Start sinking the compute controller's introspection data into storage.
320    ///
321    /// This method should be called once the introspection collections have been registered with
322    /// the storage controller. It will panic if invoked earlier than that.
323    pub fn start_introspection_sink(
324        &mut self,
325        storage_controller: &dyn StorageController<Timestamp = T>,
326    ) {
327        if let Some(rx) = self.introspection_rx.take() {
328            spawn_introspection_sink(rx, storage_controller);
329        }
330    }
331
332    /// TODO(database-issues#7533): Add documentation.
333    pub fn instance_exists(&self, id: ComputeInstanceId) -> bool {
334        self.instances.contains_key(&id)
335    }
336
337    /// Return a reference to the indicated compute instance.
338    fn instance(&self, id: ComputeInstanceId) -> Result<&InstanceState<T>, InstanceMissing> {
339        self.instances.get(&id).ok_or(InstanceMissing(id))
340    }
341
342    /// Return an `InstanceClient` for the indicated compute instance.
343    pub fn instance_client(
344        &self,
345        id: ComputeInstanceId,
346    ) -> Result<InstanceClient<T>, InstanceMissing> {
347        self.instance(id).map(|instance| instance.client.clone())
348    }
349
350    /// Return a mutable reference to the indicated compute instance.
351    fn instance_mut(
352        &mut self,
353        id: ComputeInstanceId,
354    ) -> Result<&mut InstanceState<T>, InstanceMissing> {
355        self.instances.get_mut(&id).ok_or(InstanceMissing(id))
356    }
357
358    /// List the IDs of all collections in the identified compute instance.
359    pub fn collection_ids(
360        &self,
361        instance_id: ComputeInstanceId,
362    ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
363        let instance = self.instance(instance_id)?;
364        let ids = instance.collections.keys().copied();
365        Ok(ids)
366    }
367
368    /// Return the frontiers of the indicated collection.
369    ///
370    /// If an `instance_id` is provided, the collection is assumed to be installed on that
371    /// instance. Otherwise all available instances are searched.
372    pub fn collection_frontiers(
373        &self,
374        collection_id: GlobalId,
375        instance_id: Option<ComputeInstanceId>,
376    ) -> Result<CollectionFrontiers<T>, CollectionLookupError> {
377        let collection = match instance_id {
378            Some(id) => self.instance(id)?.collection(collection_id)?,
379            None => self
380                .instances
381                .values()
382                .find_map(|i| i.collections.get(&collection_id))
383                .ok_or(CollectionMissing(collection_id))?,
384        };
385
386        Ok(collection.frontiers())
387    }
388
389    /// List compute collections that depend on the given collection.
390    pub fn collection_reverse_dependencies(
391        &self,
392        instance_id: ComputeInstanceId,
393        id: GlobalId,
394    ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
395        let instance = self.instance(instance_id)?;
396        let collections = instance.collections.iter();
397        let ids = collections
398            .filter_map(move |(cid, c)| c.compute_dependencies.contains(&id).then_some(*cid));
399        Ok(ids)
400    }
401
402    /// Returns `true` iff the given collection has been hydrated.
403    ///
404    /// For this check, zero-replica clusters are always considered hydrated.
405    /// Their collections would never normally be considered hydrated but it's
406    /// clearly intentional that they have no replicas.
407    pub async fn collection_hydrated(
408        &self,
409        instance_id: ComputeInstanceId,
410        collection_id: GlobalId,
411    ) -> Result<bool, anyhow::Error> {
412        let instance = self.instance(instance_id)?;
413
414        let res = instance
415            .call_sync(move |i| i.collection_hydrated(collection_id))
416            .await?;
417
418        Ok(res)
419    }
420
421    /// Returns `true` if all non-transient, non-excluded collections are hydrated on any of the
422    /// provided replicas.
423    ///
424    /// For this check, zero-replica clusters are always considered hydrated.
425    /// Their collections would never normally be considered hydrated but it's
426    /// clearly intentional that they have no replicas.
427    pub fn collections_hydrated_for_replicas(
428        &self,
429        instance_id: ComputeInstanceId,
430        replicas: Vec<ReplicaId>,
431        exclude_collections: BTreeSet<GlobalId>,
432    ) -> Result<oneshot::Receiver<bool>, anyhow::Error> {
433        let instance = self.instance(instance_id)?;
434
435        // Validation
436        if instance.replicas.is_empty() && !replicas.iter().any(|id| instance.replicas.contains(id))
437        {
438            return Err(HydrationCheckBadTarget(replicas).into());
439        }
440
441        let (tx, rx) = oneshot::channel();
442        instance.call(move |i| {
443            let result = i
444                .collections_hydrated_on_replicas(Some(replicas), &exclude_collections)
445                .expect("validated");
446            let _ = tx.send(result);
447        });
448
449        Ok(rx)
450    }
451
452    /// Returns the state of the [`ComputeController`] formatted as JSON.
453    ///
454    /// The returned value is not guaranteed to be stable and may change at any point in time.
455    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
456        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
457        // returned object as a tradeoff between usability and stability. `serde_json` will fail
458        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
459        // prevents a future unrelated change from silently breaking this method.
460
461        // Destructure `self` here so we don't forget to consider dumping newly added fields.
462        let Self {
463            instances,
464            instance_workload_classes,
465            build_info: _,
466            storage_collections: _,
467            initialized,
468            read_only,
469            config: _,
470            peek_stash_persist_location: _,
471            stashed_response,
472            metrics: _,
473            now: _,
474            wallclock_lag: _,
475            dyncfg: _,
476            response_rx: _,
477            response_tx: _,
478            introspection_rx: _,
479            introspection_tx: _,
480            maintenance_ticker: _,
481            maintenance_scheduled,
482        } = self;
483
484        let mut instances_dump = BTreeMap::new();
485        for (id, instance) in instances {
486            let dump = instance.dump().await?;
487            instances_dump.insert(id.to_string(), dump);
488        }
489
490        let instance_workload_classes: BTreeMap<_, _> = instance_workload_classes
491            .lock()
492            .expect("lock poisoned")
493            .iter()
494            .map(|(id, wc)| (id.to_string(), format!("{wc:?}")))
495            .collect();
496
497        Ok(serde_json::json!({
498            "instances": instances_dump,
499            "instance_workload_classes": instance_workload_classes,
500            "initialized": initialized,
501            "read_only": read_only,
502            "stashed_response": format!("{stashed_response:?}"),
503            "maintenance_scheduled": maintenance_scheduled,
504        }))
505    }
506}
507
508impl<T> ComputeController<T>
509where
510    T: ComputeControllerTimestamp,
511{
512    /// Create a compute instance.
513    pub fn create_instance(
514        &mut self,
515        id: ComputeInstanceId,
516        arranged_logs: BTreeMap<LogVariant, GlobalId>,
517        workload_class: Option<String>,
518    ) -> Result<(), InstanceExists> {
519        if self.instances.contains_key(&id) {
520            return Err(InstanceExists(id));
521        }
522
523        let mut collections = BTreeMap::new();
524        let mut logs = Vec::with_capacity(arranged_logs.len());
525        for (&log, &id) in &arranged_logs {
526            let collection = Collection::new_log();
527            let shared = collection.shared.clone();
528            collections.insert(id, collection);
529            logs.push((log, id, shared));
530        }
531
532        let client = InstanceClient::spawn(
533            id,
534            self.build_info,
535            Arc::clone(&self.storage_collections),
536            self.peek_stash_persist_location.clone(),
537            logs,
538            self.metrics.for_instance(id),
539            self.now.clone(),
540            self.wallclock_lag.clone(),
541            Arc::clone(&self.dyncfg),
542            self.response_tx.clone(),
543            self.introspection_tx.clone(),
544            self.read_only,
545        );
546
547        let instance = InstanceState::new(client, collections);
548        self.instances.insert(id, instance);
549
550        self.instance_workload_classes
551            .lock()
552            .expect("lock poisoned")
553            .insert(id, workload_class.clone());
554
555        let instance = self.instances.get_mut(&id).expect("instance just added");
556        if self.initialized {
557            instance.call(Instance::initialization_complete);
558        }
559
560        let mut config_params = self.config.clone();
561        config_params.workload_class = Some(workload_class);
562        instance.call(|i| i.update_configuration(config_params));
563
564        Ok(())
565    }
566
567    /// Updates a compute instance's workload class.
568    pub fn update_instance_workload_class(
569        &mut self,
570        id: ComputeInstanceId,
571        workload_class: Option<String>,
572    ) -> Result<(), InstanceMissing> {
573        // Ensure that the instance exists first.
574        let _ = self.instance(id)?;
575
576        self.instance_workload_classes
577            .lock()
578            .expect("lock poisoned")
579            .insert(id, workload_class);
580
581        // Cause a config update to notify the instance about its new workload class.
582        self.update_configuration(Default::default());
583
584        Ok(())
585    }
586
587    /// Remove a compute instance.
588    ///
589    /// # Panics
590    ///
591    /// Panics if the identified `instance` still has active replicas.
592    pub fn drop_instance(&mut self, id: ComputeInstanceId) {
593        if let Some(instance) = self.instances.remove(&id) {
594            instance.call(|i| i.shutdown());
595        }
596
597        self.instance_workload_classes
598            .lock()
599            .expect("lock poisoned")
600            .remove(&id);
601    }
602
603    /// Returns the compute controller's config set.
604    pub fn dyncfg(&self) -> &Arc<ConfigSet> {
605        &self.dyncfg
606    }
607
608    /// Update compute configuration.
609    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
610        // Apply dyncfg updates.
611        config_params.dyncfg_updates.apply(&self.dyncfg);
612
613        let instance_workload_classes = self
614            .instance_workload_classes
615            .lock()
616            .expect("lock poisoned");
617
618        // Forward updates to existing clusters.
619        // Workload classes are cluster-specific, so we need to overwrite them here.
620        for (id, instance) in self.instances.iter_mut() {
621            let mut params = config_params.clone();
622            params.workload_class = Some(instance_workload_classes[id].clone());
623            instance.call(|i| i.update_configuration(params));
624        }
625
626        let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(&self.dyncfg);
627        match overflowing_behavior.parse() {
628            Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
629            Err(err) => {
630                tracing::error!(
631                    err,
632                    overflowing_behavior,
633                    "Invalid value for ore_overflowing_behavior"
634                );
635            }
636        }
637
638        // Remember updates for future clusters.
639        self.config.update(config_params);
640    }
641
642    /// Mark the end of any initialization commands.
643    ///
644    /// The implementor may wait for this method to be called before implementing prior commands,
645    /// and so it is important for a user to invoke this method as soon as it is comfortable.
646    /// This method can be invoked immediately, at the potential expense of performance.
647    pub fn initialization_complete(&mut self) {
648        self.initialized = true;
649        for instance in self.instances.values_mut() {
650            instance.call(Instance::initialization_complete);
651        }
652    }
653
654    /// Wait until the controller is ready to do some processing.
655    ///
656    /// This method may block for an arbitrarily long time.
657    ///
658    /// When the method returns, the caller should call [`ComputeController::process`].
659    ///
660    /// This method is cancellation safe.
661    pub async fn ready(&mut self) {
662        if self.stashed_response.is_some() {
663            // We still have a response stashed, which we are immediately ready to process.
664            return;
665        }
666        if self.maintenance_scheduled {
667            // Maintenance work has been scheduled.
668            return;
669        }
670
671        tokio::select! {
672            resp = self.response_rx.recv() => {
673                let resp = resp.expect("`self.response_tx` not dropped");
674                self.stashed_response = Some(resp);
675            }
676            _ = self.maintenance_ticker.tick() => {
677                self.maintenance_scheduled = true;
678            },
679        }
680    }
681
682    /// Adds replicas of an instance.
683    pub fn add_replica_to_instance(
684        &mut self,
685        instance_id: ComputeInstanceId,
686        replica_id: ReplicaId,
687        location: ClusterReplicaLocation,
688        config: ComputeReplicaConfig,
689    ) -> Result<(), ReplicaCreationError> {
690        use ReplicaCreationError::*;
691
692        let instance = self.instance(instance_id)?;
693
694        // Validation
695        if instance.replicas.contains(&replica_id) {
696            return Err(ReplicaExists(replica_id));
697        }
698
699        let (enable_logging, interval) = match config.logging.interval {
700            Some(interval) => (true, interval),
701            None => (false, Duration::from_secs(1)),
702        };
703
704        let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.dyncfg);
705
706        let replica_config = ReplicaConfig {
707            location,
708            logging: LoggingConfig {
709                interval,
710                enable_logging,
711                log_logging: config.logging.log_logging,
712                index_logs: Default::default(),
713            },
714            grpc_client: self.config.grpc_client.clone(),
715            expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
716        };
717
718        let instance = self.instance_mut(instance_id).expect("validated");
719        instance.replicas.insert(replica_id);
720
721        instance.call(move |i| {
722            i.add_replica(replica_id, replica_config, None)
723                .expect("validated")
724        });
725
726        Ok(())
727    }
728
729    /// Removes a replica from an instance, including its service in the orchestrator.
730    pub fn drop_replica(
731        &mut self,
732        instance_id: ComputeInstanceId,
733        replica_id: ReplicaId,
734    ) -> Result<(), ReplicaDropError> {
735        use ReplicaDropError::*;
736
737        let instance = self.instance_mut(instance_id)?;
738
739        // Validation
740        if !instance.replicas.contains(&replica_id) {
741            return Err(ReplicaMissing(replica_id));
742        }
743
744        instance.replicas.remove(&replica_id);
745
746        instance.call(move |i| i.remove_replica(replica_id).expect("validated"));
747
748        Ok(())
749    }
750
751    /// Creates the described dataflow and initializes state for its output.
752    ///
753    /// If a `subscribe_target_replica` is given, any subscribes exported by the dataflow are
754    /// configured to target that replica, i.e., only subscribe responses sent by that replica are
755    /// considered.
756    pub fn create_dataflow(
757        &mut self,
758        instance_id: ComputeInstanceId,
759        mut dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
760        subscribe_target_replica: Option<ReplicaId>,
761    ) -> Result<(), DataflowCreationError> {
762        use DataflowCreationError::*;
763
764        let instance = self.instance(instance_id)?;
765
766        // Validation: target replica
767        if let Some(replica_id) = subscribe_target_replica {
768            if !instance.replicas.contains(&replica_id) {
769                return Err(ReplicaMissing(replica_id));
770            }
771        }
772
773        // Validation: as_of
774        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
775        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
776            return Err(EmptyAsOfForSubscribe);
777        }
778        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
779            return Err(EmptyAsOfForCopyTo);
780        }
781
782        // Validation: input collections
783        let storage_ids = dataflow.imported_source_ids().collect();
784        let mut import_read_holds = self.storage_collections.acquire_read_holds(storage_ids)?;
785        for id in dataflow.imported_index_ids() {
786            let read_hold = instance.acquire_read_hold(id)?;
787            import_read_holds.push(read_hold);
788        }
789        for hold in &import_read_holds {
790            if PartialOrder::less_than(as_of, hold.since()) {
791                return Err(SinceViolation(hold.id()));
792            }
793        }
794
795        // Validation: storage sink collections
796        for id in dataflow.persist_sink_ids() {
797            if self.storage_collections.check_exists(id).is_err() {
798                return Err(CollectionMissing(id));
799            }
800        }
801        let time_dependence = self
802            .determine_time_dependence(instance_id, &dataflow)
803            .expect("must exist");
804
805        let instance = self.instance_mut(instance_id).expect("validated");
806
807        let mut shared_collection_state = BTreeMap::new();
808        for id in dataflow.export_ids() {
809            let shared = SharedCollectionState::new(as_of.clone());
810            let collection = Collection {
811                write_only: dataflow.sink_exports.contains_key(&id),
812                compute_dependencies: dataflow.imported_index_ids().collect(),
813                shared: shared.clone(),
814                time_dependence: time_dependence.clone(),
815            };
816            instance.collections.insert(id, collection);
817            shared_collection_state.insert(id, shared);
818        }
819
820        dataflow.time_dependence = time_dependence;
821
822        instance.call(move |i| {
823            i.create_dataflow(
824                dataflow,
825                import_read_holds,
826                subscribe_target_replica,
827                shared_collection_state,
828            )
829            .expect("validated")
830        });
831
832        Ok(())
833    }
834
835    /// Drop the read capability for the given collections and allow their resources to be
836    /// reclaimed.
837    pub fn drop_collections(
838        &mut self,
839        instance_id: ComputeInstanceId,
840        collection_ids: Vec<GlobalId>,
841    ) -> Result<(), CollectionUpdateError> {
842        let instance = self.instance_mut(instance_id)?;
843
844        // Validation
845        for id in &collection_ids {
846            instance.collection(*id)?;
847        }
848
849        for id in &collection_ids {
850            instance.collections.remove(id);
851        }
852
853        instance.call(|i| i.drop_collections(collection_ids).expect("validated"));
854
855        Ok(())
856    }
857
858    /// Initiate a peek request for the contents of the given collection at `timestamp`.
859    pub fn peek(
860        &self,
861        instance_id: ComputeInstanceId,
862        peek_target: PeekTarget,
863        literal_constraints: Option<Vec<Row>>,
864        uuid: Uuid,
865        timestamp: T,
866        result_desc: RelationDesc,
867        finishing: RowSetFinishing,
868        map_filter_project: mz_expr::SafeMfpPlan,
869        target_replica: Option<ReplicaId>,
870        peek_response_tx: oneshot::Sender<PeekResponse>,
871    ) -> Result<(), PeekError> {
872        use PeekError::*;
873
874        let instance = self.instance(instance_id)?;
875
876        // Validation: target replica
877        if let Some(replica_id) = target_replica {
878            if !instance.replicas.contains(&replica_id) {
879                return Err(ReplicaMissing(replica_id));
880            }
881        }
882
883        // Validation: peek target
884        let read_hold = match &peek_target {
885            PeekTarget::Index { id } => instance.acquire_read_hold(*id)?,
886            PeekTarget::Persist { id, .. } => self
887                .storage_collections
888                .acquire_read_holds(vec![*id])?
889                .into_element(),
890        };
891        if !read_hold.since().less_equal(&timestamp) {
892            return Err(SinceViolation(peek_target.id()));
893        }
894
895        instance.call(move |i| {
896            i.peek(
897                peek_target,
898                literal_constraints,
899                uuid,
900                timestamp,
901                result_desc,
902                finishing,
903                map_filter_project,
904                read_hold,
905                target_replica,
906                peek_response_tx,
907            )
908            .expect("validated")
909        });
910
911        Ok(())
912    }
913
914    /// Cancel an existing peek request.
915    ///
916    /// Canceling a peek is best effort. The caller may see any of the following
917    /// after canceling a peek request:
918    ///
919    ///   * A `PeekResponse::Rows` indicating that the cancellation request did
920    ///     not take effect in time and the query succeeded.
921    ///   * A `PeekResponse::Canceled` affirming that the peek was canceled.
922    ///   * No `PeekResponse` at all.
923    pub fn cancel_peek(
924        &self,
925        instance_id: ComputeInstanceId,
926        uuid: Uuid,
927        reason: PeekResponse,
928    ) -> Result<(), InstanceMissing> {
929        self.instance(instance_id)?
930            .call(move |i| i.cancel_peek(uuid, reason));
931        Ok(())
932    }
933
934    /// Assign a read policy to specific identifiers.
935    ///
936    /// The policies are assigned in the order presented, and repeated identifiers should
937    /// conclude with the last policy. Changing a policy will immediately downgrade the read
938    /// capability if appropriate, but it will not "recover" the read capability if the prior
939    /// capability is already ahead of it.
940    ///
941    /// Identifiers not present in `policies` retain their existing read policies.
942    ///
943    /// It is an error to attempt to set a read policy for a collection that is not readable in the
944    /// context of compute. At this time, only indexes are readable compute collections.
945    pub fn set_read_policy(
946        &self,
947        instance_id: ComputeInstanceId,
948        policies: Vec<(GlobalId, ReadPolicy<T>)>,
949    ) -> Result<(), ReadPolicyError> {
950        use ReadPolicyError::*;
951
952        let instance = self.instance(instance_id)?;
953
954        // Validation
955        for (id, _) in &policies {
956            let collection = instance.collection(*id)?;
957            if collection.write_only {
958                return Err(WriteOnlyCollection(*id));
959            }
960        }
961
962        self.instance(instance_id)?
963            .call(|i| i.set_read_policy(policies).expect("validated"));
964
965        Ok(())
966    }
967
968    /// Acquires a [`ReadHold`] for the identified compute collection.
969    pub fn acquire_read_hold(
970        &self,
971        instance_id: ComputeInstanceId,
972        collection_id: GlobalId,
973    ) -> Result<ReadHold<T>, CollectionUpdateError> {
974        let read_hold = self
975            .instance(instance_id)?
976            .acquire_read_hold(collection_id)?;
977        Ok(read_hold)
978    }
979
980    /// Determine the time dependence for a dataflow.
981    fn determine_time_dependence(
982        &self,
983        instance_id: ComputeInstanceId,
984        dataflow: &DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
985    ) -> Result<Option<TimeDependence>, TimeDependenceError> {
986        // TODO(ct3): Continual tasks don't support replica expiration
987        let is_continual_task = dataflow.continual_task_ids().next().is_some();
988        if is_continual_task {
989            return Ok(None);
990        }
991
992        let instance = self
993            .instance(instance_id)
994            .map_err(|err| TimeDependenceError::InstanceMissing(err.0))?;
995        let mut time_dependencies = Vec::new();
996
997        for id in dataflow.imported_index_ids() {
998            let dependence = instance
999                .get_time_dependence(id)
1000                .map_err(|err| TimeDependenceError::CollectionMissing(err.0))?;
1001            time_dependencies.push(dependence);
1002        }
1003
1004        'source: for id in dataflow.imported_source_ids() {
1005            // We first check whether the id is backed by a compute object, in which case we use
1006            // the time dependence we know. This is true for materialized views, continual tasks,
1007            // etc.
1008            for instance in self.instances.values() {
1009                if let Ok(dependence) = instance.get_time_dependence(id) {
1010                    time_dependencies.push(dependence);
1011                    continue 'source;
1012                }
1013            }
1014
1015            // Not a compute object: Consult the storage collections controller.
1016            time_dependencies.push(self.storage_collections.determine_time_dependence(id)?);
1017        }
1018
1019        Ok(TimeDependence::merge(
1020            time_dependencies,
1021            dataflow.refresh_schedule.as_ref(),
1022        ))
1023    }
1024
1025    /// Processes the work queued by [`ComputeController::ready`].
1026    #[mz_ore::instrument(level = "debug")]
1027    pub fn process(&mut self) -> Option<ComputeControllerResponse<T>> {
1028        // Perform periodic maintenance work.
1029        if self.maintenance_scheduled {
1030            self.maintain();
1031            self.maintenance_scheduled = false;
1032        }
1033
1034        // Return a ready response, if any.
1035        self.stashed_response.take()
1036    }
1037
1038    #[mz_ore::instrument(level = "debug")]
1039    fn maintain(&mut self) {
1040        // Perform instance maintenance work.
1041        for instance in self.instances.values_mut() {
1042            instance.call(Instance::maintain);
1043        }
1044    }
1045
1046    /// Allow writes for the specified collections on `instance_id`.
1047    ///
1048    /// If this controller is in read-only mode, this is a no-op.
1049    pub fn allow_writes(
1050        &mut self,
1051        instance_id: ComputeInstanceId,
1052        collection_id: GlobalId,
1053    ) -> Result<(), CollectionUpdateError> {
1054        if self.read_only {
1055            tracing::debug!("Skipping allow_writes in read-only mode");
1056            return Ok(());
1057        }
1058
1059        let instance = self.instance_mut(instance_id)?;
1060
1061        // Validation
1062        instance.collection(collection_id)?;
1063
1064        instance.call(move |i| i.allow_writes(collection_id).expect("validated"));
1065
1066        Ok(())
1067    }
1068}
1069
1070#[derive(Debug)]
1071struct InstanceState<T: ComputeControllerTimestamp> {
1072    client: InstanceClient<T>,
1073    replicas: BTreeSet<ReplicaId>,
1074    collections: BTreeMap<GlobalId, Collection<T>>,
1075}
1076
1077impl<T: ComputeControllerTimestamp> InstanceState<T> {
1078    fn new(client: InstanceClient<T>, collections: BTreeMap<GlobalId, Collection<T>>) -> Self {
1079        Self {
1080            client,
1081            replicas: Default::default(),
1082            collections,
1083        }
1084    }
1085
1086    fn collection(&self, id: GlobalId) -> Result<&Collection<T>, CollectionMissing> {
1087        self.collections.get(&id).ok_or(CollectionMissing(id))
1088    }
1089
1090    /// Calls the given function on the instance task. Does not await the result.
1091    ///
1092    /// # Panics
1093    ///
1094    /// Panics if the instance corresponding to `self` does not exist.
1095    fn call<F>(&self, f: F)
1096    where
1097        F: FnOnce(&mut Instance<T>) + Send + 'static,
1098    {
1099        self.client.call(f).expect("instance not dropped")
1100    }
1101
1102    /// Calls the given function on the instance task, and awaits the result.
1103    ///
1104    /// # Panics
1105    ///
1106    /// Panics if the instance corresponding to `self` does not exist.
1107    async fn call_sync<F, R>(&self, f: F) -> R
1108    where
1109        F: FnOnce(&mut Instance<T>) -> R + Send + 'static,
1110        R: Send + 'static,
1111    {
1112        self.client
1113            .call_sync(f)
1114            .await
1115            .expect("instance not dropped")
1116    }
1117
1118    /// Acquires a [`ReadHold`] for the identified compute collection.
1119    pub fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
1120        // We acquire read holds at the earliest possible time rather than returning a copy
1121        // of the implied read hold. This is so that in `create_dataflow` we can acquire read holds
1122        // on compute dependencies at frontiers that are held back by other read holds the caller
1123        // has previously taken.
1124        //
1125        // If/when we change the compute API to expect callers to pass in the `ReadHold`s rather
1126        // than acquiring them ourselves, we might tighten this up and instead acquire read holds
1127        // at the implied capability.
1128
1129        let collection = self.collection(id)?;
1130        let since = collection.shared.lock_read_capabilities(|caps| {
1131            let since = caps.frontier().to_owned();
1132            caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
1133            since
1134        });
1135
1136        let hold = ReadHold::new(id, since, self.client.read_hold_tx());
1137        Ok(hold)
1138    }
1139
1140    /// Return the stored time dependence for a collection.
1141    fn get_time_dependence(
1142        &self,
1143        id: GlobalId,
1144    ) -> Result<Option<TimeDependence>, CollectionMissing> {
1145        Ok(self.collection(id)?.time_dependence.clone())
1146    }
1147
1148    /// Returns the [`InstanceState`] formatted as JSON.
1149    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1150        // Destructure `self` here so we don't forget to consider dumping newly added fields.
1151        let Self {
1152            client: _,
1153            replicas,
1154            collections,
1155        } = self;
1156
1157        let instance = self.call_sync(|i| i.dump()).await?;
1158        let replicas: Vec<_> = replicas.iter().map(|id| id.to_string()).collect();
1159        let collections: BTreeMap<_, _> = collections
1160            .iter()
1161            .map(|(id, c)| (id.to_string(), format!("{c:?}")))
1162            .collect();
1163
1164        Ok(serde_json::json!({
1165            "instance": instance,
1166            "replicas": replicas,
1167            "collections": collections,
1168        }))
1169    }
1170}
1171
1172#[derive(Debug)]
1173struct Collection<T> {
1174    write_only: bool,
1175    compute_dependencies: BTreeSet<GlobalId>,
1176    shared: SharedCollectionState<T>,
1177    /// The computed time dependence for this collection. None indicates no specific information,
1178    /// a value describes how the collection relates to wall-clock time.
1179    time_dependence: Option<TimeDependence>,
1180}
1181
1182impl<T: Timestamp> Collection<T> {
1183    fn new_log() -> Self {
1184        let as_of = Antichain::from_elem(T::minimum());
1185        Self {
1186            write_only: false,
1187            compute_dependencies: Default::default(),
1188            shared: SharedCollectionState::new(as_of),
1189            time_dependence: Some(TimeDependence::default()),
1190        }
1191    }
1192
1193    fn frontiers(&self) -> CollectionFrontiers<T> {
1194        let read_frontier = self
1195            .shared
1196            .lock_read_capabilities(|c| c.frontier().to_owned());
1197        let write_frontier = self.shared.lock_write_frontier(|f| f.clone());
1198        CollectionFrontiers {
1199            read_frontier,
1200            write_frontier,
1201        }
1202    }
1203}
1204
1205/// The frontiers of a compute collection.
1206#[derive(Clone, Debug)]
1207pub struct CollectionFrontiers<T> {
1208    /// The read frontier.
1209    pub read_frontier: Antichain<T>,
1210    /// The write frontier.
1211    pub write_frontier: Antichain<T>,
1212}
1213
1214impl<T: Timestamp> Default for CollectionFrontiers<T> {
1215    fn default() -> Self {
1216        Self {
1217            read_frontier: Antichain::from_elem(T::minimum()),
1218            write_frontier: Antichain::from_elem(T::minimum()),
1219        }
1220    }
1221}