Skip to main content

mz_compute_client/
controller.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller that provides an interface to the compute layer, and the storage layer below it.
11//!
12//! The compute controller manages the creation, maintenance, and removal of compute instances.
13//! This involves ensuring the intended service state with the orchestrator, as well as maintaining
14//! a dedicated compute instance controller for each active compute instance.
15//!
16//! For each compute instance, the compute controller curates the creation of indexes and sinks
17//! installed on the instance, the progress of readers through these collections, and their
18//! eventual dropping and resource reclamation.
19//!
20//! The state maintained for a compute instance can be viewed as a partial map from `GlobalId` to
21//! collection. It is an error to use an identifier before it has been "created" with
22//! `create_dataflow()`. Once created, the controller holds a read capability for each output
23//! collection of a dataflow, which is manipulated with `set_read_policy()`. Eventually, a
24//! collection is dropped with `drop_collections()`.
25//!
26//! A dataflow can be in read-only or read-write mode. In read-only mode, the dataflow does not
27//! modify any persistent state. Sending a `allow_write` message to the compute instance will
28//! transition the dataflow to read-write mode, allowing it to write to persistent sinks.
29//!
30//! Created dataflows will prevent the compaction of their inputs, including other compute
31//! collections but also collections managed by the storage layer. Each dataflow input is prevented
32//! from compacting beyond the allowed compaction of each of its outputs, ensuring that we can
33//! recover each dataflow to its current state in case of failure or other reconfiguration.
34
35use std::collections::{BTreeMap, BTreeSet};
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use mz_build_info::BuildInfo;
40use mz_cluster_client::client::ClusterReplicaLocation;
41use mz_cluster_client::metrics::ControllerMetrics;
42use mz_cluster_client::{ReplicaId, WallclockLagFn};
43use mz_compute_types::ComputeInstanceId;
44use mz_compute_types::config::ComputeReplicaConfig;
45use mz_compute_types::dataflows::DataflowDescription;
46use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
47use mz_dyncfg::ConfigSet;
48use mz_expr::RowSetFinishing;
49use mz_expr::row::RowCollection;
50use mz_ore::cast::CastFrom;
51use mz_ore::metrics::MetricsRegistry;
52use mz_ore::now::NowFn;
53use mz_ore::tracing::OpenTelemetryContext;
54use mz_persist_types::PersistLocation;
55use mz_repr::{GlobalId, RelationDesc, Row, Timestamp};
56use mz_storage_client::controller::StorageController;
57use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
58use mz_storage_types::read_holds::ReadHold;
59use mz_storage_types::read_policy::ReadPolicy;
60use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
61use prometheus::proto::LabelPair;
62use serde::{Deserialize, Serialize};
63use timely::PartialOrder;
64use timely::progress::Antichain;
65use tokio::sync::{mpsc, oneshot};
66use tokio::time::{self, MissedTickBehavior};
67use uuid::Uuid;
68
69use crate::controller::error::{
70    CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
71    HydrationCheckBadTarget, InstanceExists, InstanceMissing, PeekError, ReadPolicyError,
72    ReplicaCreationError, ReplicaDropError,
73};
74use crate::controller::instance::{Instance, SharedCollectionState};
75use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection_sink};
76use crate::controller::replica::ReplicaConfig;
77use crate::logging::{LogVariant, LoggingConfig};
78use crate::metrics::ComputeControllerMetrics;
79use crate::protocol::command::{ComputeParameters, PeekTarget};
80use crate::protocol::response::{PeekResponse, SubscribeBatch};
81
82mod instance;
83mod introspection;
84mod replica;
85mod sequential_hydration;
86
87pub mod error;
88pub mod instance_client;
89pub use instance_client::InstanceClient;
90
91pub(crate) type StorageCollections =
92    Arc<dyn mz_storage_client::storage_collections::StorageCollections + Send + Sync>;
93
94/// Responses from the compute controller.
95#[derive(Debug)]
96pub enum ComputeControllerResponse {
97    /// See [`PeekNotification`].
98    PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
99    /// See [`crate::protocol::response::ComputeResponse::SubscribeResponse`].
100    SubscribeResponse(GlobalId, SubscribeBatch),
101    /// The response from a dataflow containing an `CopyToS3Oneshot` sink.
102    ///
103    /// The `GlobalId` identifies the sink. The `Result` is the response from
104    /// the sink, where an `Ok(n)` indicates that `n` rows were successfully
105    /// copied to S3 and an `Err` indicates that an error was encountered
106    /// during the copy operation.
107    ///
108    /// For a given `CopyToS3Oneshot` sink, there will be at most one `CopyToResponse`
109    /// produced. (The sink may produce no responses if its dataflow is dropped
110    /// before completion.)
111    CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
112    /// A response reporting advancement of a collection's upper frontier.
113    ///
114    /// Once a collection's upper (aka "write frontier") has advanced to beyond a given time, the
115    /// contents of the collection as of that time have been sealed and cannot change anymore.
116    FrontierUpper {
117        /// The ID of a compute collection.
118        id: GlobalId,
119        /// The new upper frontier of the identified compute collection.
120        upper: Antichain<Timestamp>,
121    },
122}
123
124/// Notification and summary of a received and forwarded [`crate::protocol::response::ComputeResponse::PeekResponse`].
125#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
126pub enum PeekNotification {
127    /// Returned rows of a successful peek.
128    Success {
129        /// Number of rows in the returned peek result.
130        rows: u64,
131        /// Size of the returned peek result in bytes.
132        result_size: u64,
133    },
134    /// Error of an unsuccessful peek, including the reason for the error.
135    Error(String),
136    /// The peek was canceled.
137    Canceled,
138}
139
140impl PeekNotification {
141    /// Construct a new [`PeekNotification`] from a [`PeekResponse`]. The `offset` and `limit`
142    /// parameters are used to calculate the number of rows in the peek result.
143    fn new(peek_response: &PeekResponse, offset: usize, limit: Option<usize>) -> Self {
144        match peek_response {
145            PeekResponse::Rows(rows) => {
146                let num_rows = u64::cast_from(RowCollection::offset_limit(
147                    rows.iter().map(|r| r.count()).sum(),
148                    offset,
149                    limit,
150                ));
151                let result_size = u64::cast_from(rows.iter().map(|r| r.byte_len()).sum::<usize>());
152
153                tracing::trace!(?num_rows, ?result_size, "inline peek result");
154
155                Self::Success {
156                    rows: num_rows,
157                    result_size,
158                }
159            }
160            PeekResponse::Stashed(stashed_response) => {
161                let rows = stashed_response.num_rows(offset, limit);
162                let result_size = stashed_response.size_bytes();
163
164                tracing::trace!(?rows, ?result_size, "stashed peek result");
165
166                Self::Success {
167                    rows: u64::cast_from(rows),
168                    result_size: u64::cast_from(result_size),
169                }
170            }
171            PeekResponse::Error(err) => Self::Error(err.clone()),
172            PeekResponse::Canceled => Self::Canceled,
173        }
174    }
175}
176
177/// A controller for the compute layer.
178pub struct ComputeController {
179    instances: BTreeMap<ComputeInstanceId, InstanceState>,
180    /// A map from an instance ID to an arbitrary string that describes the
181    /// class of the workload that compute instance is running (e.g.,
182    /// `production` or `staging`).
183    instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>,
184    build_info: &'static BuildInfo,
185    /// A handle providing access to storage collections.
186    storage_collections: StorageCollections,
187    /// Set to `true` once `initialization_complete` has been called.
188    initialized: bool,
189    /// Whether or not this controller is in read-only mode.
190    ///
191    /// When in read-only mode, neither this controller nor the instances
192    /// controlled by it are allowed to affect changes to external systems
193    /// (largely persist).
194    read_only: bool,
195    /// Compute configuration to apply to new instances.
196    config: ComputeParameters,
197    /// The persist location where we can stash large peek results.
198    peek_stash_persist_location: PersistLocation,
199    /// A controller response to be returned on the next call to [`ComputeController::process`].
200    stashed_response: Option<ComputeControllerResponse>,
201    /// The compute controller metrics.
202    metrics: ComputeControllerMetrics,
203    /// A function that produces the current wallclock time.
204    now: NowFn,
205    /// A function that computes the lag between the given time and wallclock time.
206    wallclock_lag: WallclockLagFn<Timestamp>,
207    /// Dynamic system configuration.
208    ///
209    /// Updated through `ComputeController::update_configuration` calls and shared with all
210    /// subcomponents of the compute controller.
211    dyncfg: Arc<ConfigSet>,
212
213    /// Receiver for responses produced by `Instance`s.
214    response_rx: mpsc::UnboundedReceiver<ComputeControllerResponse>,
215    /// Response sender that's passed to new `Instance`s.
216    response_tx: mpsc::UnboundedSender<ComputeControllerResponse>,
217    /// Receiver for introspection updates produced by `Instance`s.
218    ///
219    /// When [`ComputeController::start_introspection_sink`] is first called, this receiver is
220    /// passed to the introspection sink task.
221    introspection_rx: Option<mpsc::UnboundedReceiver<IntrospectionUpdates>>,
222    /// Introspection updates sender that's passed to new `Instance`s.
223    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
224
225    /// Ticker for scheduling periodic maintenance work.
226    maintenance_ticker: tokio::time::Interval,
227    /// Whether maintenance work was scheduled.
228    maintenance_scheduled: bool,
229}
230
231impl ComputeController {
232    /// Construct a new [`ComputeController`].
233    pub fn new(
234        build_info: &'static BuildInfo,
235        storage_collections: StorageCollections,
236        read_only: bool,
237        metrics_registry: &MetricsRegistry,
238        peek_stash_persist_location: PersistLocation,
239        controller_metrics: ControllerMetrics,
240        now: NowFn,
241        wallclock_lag: WallclockLagFn<Timestamp>,
242    ) -> Self {
243        let (response_tx, response_rx) = mpsc::unbounded_channel();
244        let (introspection_tx, introspection_rx) = mpsc::unbounded_channel();
245
246        let mut maintenance_ticker = time::interval(Duration::from_secs(1));
247        maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
248
249        let instance_workload_classes = Arc::new(Mutex::new(BTreeMap::<
250            ComputeInstanceId,
251            Option<String>,
252        >::new()));
253
254        // Apply a `workload_class` label to all metrics in the registry that
255        // have an `instance_id` label for an instance whose workload class is
256        // known.
257        metrics_registry.register_postprocessor({
258            let instance_workload_classes = Arc::clone(&instance_workload_classes);
259            move |metrics| {
260                let instance_workload_classes = instance_workload_classes
261                    .lock()
262                    .expect("lock poisoned")
263                    .iter()
264                    .map(|(id, workload_class)| (id.to_string(), workload_class.clone()))
265                    .collect::<BTreeMap<String, Option<String>>>();
266                for metric in metrics {
267                    'metric: for metric in metric.mut_metric() {
268                        for label in metric.get_label() {
269                            if label.name() == "instance_id" {
270                                if let Some(workload_class) = instance_workload_classes
271                                    .get(label.value())
272                                    .cloned()
273                                    .flatten()
274                                {
275                                    let mut label = LabelPair::default();
276                                    label.set_name("workload_class".into());
277                                    label.set_value(workload_class.clone());
278
279                                    let mut labels = metric.take_label();
280                                    labels.push(label);
281                                    metric.set_label(labels);
282                                }
283                                continue 'metric;
284                            }
285                        }
286                    }
287                }
288            }
289        });
290
291        let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics);
292
293        Self {
294            instances: BTreeMap::new(),
295            instance_workload_classes,
296            build_info,
297            storage_collections,
298            initialized: false,
299            read_only,
300            config: Default::default(),
301            peek_stash_persist_location,
302            stashed_response: None,
303            metrics,
304            now,
305            wallclock_lag,
306            dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()),
307            response_rx,
308            response_tx,
309            introspection_rx: Some(introspection_rx),
310            introspection_tx,
311            maintenance_ticker,
312            maintenance_scheduled: false,
313        }
314    }
315
316    /// Start sinking the compute controller's introspection data into storage.
317    ///
318    /// This method should be called once the introspection collections have been registered with
319    /// the storage controller. It will panic if invoked earlier than that.
320    pub fn start_introspection_sink(&mut self, storage_controller: &dyn StorageController) {
321        if let Some(rx) = self.introspection_rx.take() {
322            spawn_introspection_sink(rx, storage_controller);
323        }
324    }
325
326    /// TODO(database-issues#7533): Add documentation.
327    pub fn instance_exists(&self, id: ComputeInstanceId) -> bool {
328        self.instances.contains_key(&id)
329    }
330
331    /// Return a reference to the indicated compute instance.
332    fn instance(&self, id: ComputeInstanceId) -> Result<&InstanceState, InstanceMissing> {
333        self.instances.get(&id).ok_or(InstanceMissing(id))
334    }
335
336    /// Return an `InstanceClient` for the indicated compute instance.
337    pub fn instance_client(
338        &self,
339        id: ComputeInstanceId,
340    ) -> Result<InstanceClient, InstanceMissing> {
341        self.instance(id).map(|instance| instance.client.clone())
342    }
343
344    /// Return a mutable reference to the indicated compute instance.
345    fn instance_mut(
346        &mut self,
347        id: ComputeInstanceId,
348    ) -> Result<&mut InstanceState, InstanceMissing> {
349        self.instances.get_mut(&id).ok_or(InstanceMissing(id))
350    }
351
352    /// List the IDs of all collections in the identified compute instance.
353    pub fn collection_ids(
354        &self,
355        instance_id: ComputeInstanceId,
356    ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
357        let instance = self.instance(instance_id)?;
358        let ids = instance.collections.keys().copied();
359        Ok(ids)
360    }
361
362    /// Return the frontiers of the indicated collection.
363    ///
364    /// If an `instance_id` is provided, the collection is assumed to be installed on that
365    /// instance. Otherwise all available instances are searched.
366    pub fn collection_frontiers(
367        &self,
368        collection_id: GlobalId,
369        instance_id: Option<ComputeInstanceId>,
370    ) -> Result<CollectionFrontiers, CollectionLookupError> {
371        let collection = match instance_id {
372            Some(id) => self.instance(id)?.collection(collection_id)?,
373            None => self
374                .instances
375                .values()
376                .find_map(|i| i.collections.get(&collection_id))
377                .ok_or(CollectionMissing(collection_id))?,
378        };
379
380        Ok(collection.frontiers())
381    }
382
383    /// List compute collections that depend on the given collection.
384    pub fn collection_reverse_dependencies(
385        &self,
386        instance_id: ComputeInstanceId,
387        id: GlobalId,
388    ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
389        let instance = self.instance(instance_id)?;
390        let collections = instance.collections.iter();
391        let ids = collections
392            .filter_map(move |(cid, c)| c.compute_dependencies.contains(&id).then_some(*cid));
393        Ok(ids)
394    }
395
396    /// Returns `true` iff the given collection has been hydrated.
397    ///
398    /// For this check, zero-replica clusters are always considered hydrated.
399    /// Their collections would never normally be considered hydrated but it's
400    /// clearly intentional that they have no replicas.
401    pub async fn collection_hydrated(
402        &self,
403        instance_id: ComputeInstanceId,
404        collection_id: GlobalId,
405    ) -> Result<bool, anyhow::Error> {
406        let instance = self.instance(instance_id)?;
407
408        let res = instance
409            .call_sync(move |i| i.collection_hydrated(collection_id))
410            .await?;
411
412        Ok(res)
413    }
414
415    /// Returns `true` if all non-transient, non-excluded collections are hydrated on any of the
416    /// provided replicas.
417    ///
418    /// For this check, zero-replica clusters are always considered hydrated.
419    /// Their collections would never normally be considered hydrated but it's
420    /// clearly intentional that they have no replicas.
421    pub fn collections_hydrated_for_replicas(
422        &self,
423        instance_id: ComputeInstanceId,
424        replicas: Vec<ReplicaId>,
425        exclude_collections: BTreeSet<GlobalId>,
426    ) -> Result<oneshot::Receiver<bool>, anyhow::Error> {
427        let instance = self.instance(instance_id)?;
428
429        // Validation
430        if !instance.replicas.is_empty()
431            && !replicas.iter().any(|id| instance.replicas.contains(id))
432        {
433            return Err(HydrationCheckBadTarget(replicas).into());
434        }
435
436        let (tx, rx) = oneshot::channel();
437        instance.call(move |i| {
438            let result = i
439                .collections_hydrated_on_replicas(Some(replicas), &exclude_collections)
440                .expect("validated");
441            let _ = tx.send(result);
442        });
443
444        Ok(rx)
445    }
446
447    /// Returns the state of the [`ComputeController`] formatted as JSON.
448    ///
449    /// The returned value is not guaranteed to be stable and may change at any point in time.
450    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
451        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
452        // returned object as a tradeoff between usability and stability. `serde_json` will fail
453        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
454        // prevents a future unrelated change from silently breaking this method.
455
456        // Destructure `self` here so we don't forget to consider dumping newly added fields.
457        let Self {
458            instances,
459            instance_workload_classes,
460            build_info: _,
461            storage_collections: _,
462            initialized,
463            read_only,
464            config: _,
465            peek_stash_persist_location: _,
466            stashed_response,
467            metrics: _,
468            now: _,
469            wallclock_lag: _,
470            dyncfg: _,
471            response_rx: _,
472            response_tx: _,
473            introspection_rx: _,
474            introspection_tx: _,
475            maintenance_ticker: _,
476            maintenance_scheduled,
477        } = self;
478
479        let mut instances_dump = BTreeMap::new();
480        for (id, instance) in instances {
481            let dump = instance.dump().await?;
482            instances_dump.insert(id.to_string(), dump);
483        }
484
485        let instance_workload_classes: BTreeMap<_, _> = instance_workload_classes
486            .lock()
487            .expect("lock poisoned")
488            .iter()
489            .map(|(id, wc)| (id.to_string(), format!("{wc:?}")))
490            .collect();
491
492        Ok(serde_json::json!({
493            "instances": instances_dump,
494            "instance_workload_classes": instance_workload_classes,
495            "initialized": initialized,
496            "read_only": read_only,
497            "stashed_response": format!("{stashed_response:?}"),
498            "maintenance_scheduled": maintenance_scheduled,
499        }))
500    }
501}
502
503impl ComputeController {
504    /// Create a compute instance.
505    pub fn create_instance(
506        &mut self,
507        id: ComputeInstanceId,
508        arranged_logs: BTreeMap<LogVariant, GlobalId>,
509        workload_class: Option<String>,
510    ) -> Result<(), InstanceExists> {
511        if self.instances.contains_key(&id) {
512            return Err(InstanceExists(id));
513        }
514
515        let mut collections = BTreeMap::new();
516        let mut logs = Vec::with_capacity(arranged_logs.len());
517        for (&log, &id) in &arranged_logs {
518            let collection = Collection::new_log();
519            let shared = collection.shared.clone();
520            collections.insert(id, collection);
521            logs.push((log, id, shared));
522        }
523
524        let client = InstanceClient::spawn(
525            id,
526            self.build_info,
527            Arc::clone(&self.storage_collections),
528            self.peek_stash_persist_location.clone(),
529            logs,
530            self.metrics.for_instance(id),
531            self.now.clone(),
532            self.wallclock_lag.clone(),
533            Arc::clone(&self.dyncfg),
534            self.response_tx.clone(),
535            self.introspection_tx.clone(),
536            self.read_only,
537        );
538
539        let instance = InstanceState::new(client, collections);
540        self.instances.insert(id, instance);
541
542        self.instance_workload_classes
543            .lock()
544            .expect("lock poisoned")
545            .insert(id, workload_class.clone());
546
547        let instance = self.instances.get_mut(&id).expect("instance just added");
548        if self.initialized {
549            instance.call(Instance::initialization_complete);
550        }
551
552        let mut config_params = self.config.clone();
553        config_params.workload_class = Some(workload_class);
554        instance.call(|i| i.update_configuration(config_params));
555
556        Ok(())
557    }
558
559    /// Updates a compute instance's workload class.
560    pub fn update_instance_workload_class(
561        &mut self,
562        id: ComputeInstanceId,
563        workload_class: Option<String>,
564    ) -> Result<(), InstanceMissing> {
565        // Ensure that the instance exists first.
566        let _ = self.instance(id)?;
567
568        self.instance_workload_classes
569            .lock()
570            .expect("lock poisoned")
571            .insert(id, workload_class);
572
573        // Cause a config update to notify the instance about its new workload class.
574        self.update_configuration(Default::default());
575
576        Ok(())
577    }
578
579    /// Remove a compute instance.
580    ///
581    /// # Panics
582    ///
583    /// Panics if the identified `instance` still has active replicas.
584    pub fn drop_instance(&mut self, id: ComputeInstanceId) {
585        if let Some(instance) = self.instances.remove(&id) {
586            instance.call(|i| i.shutdown());
587        }
588
589        self.instance_workload_classes
590            .lock()
591            .expect("lock poisoned")
592            .remove(&id);
593    }
594
595    /// Returns the compute controller's config set.
596    pub fn dyncfg(&self) -> &Arc<ConfigSet> {
597        &self.dyncfg
598    }
599
600    /// Update compute configuration.
601    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
602        // Apply dyncfg updates.
603        config_params.dyncfg_updates.apply(&self.dyncfg);
604
605        let instance_workload_classes = self
606            .instance_workload_classes
607            .lock()
608            .expect("lock poisoned");
609
610        // Forward updates to existing clusters.
611        // Workload classes are cluster-specific, so we need to overwrite them here.
612        for (id, instance) in self.instances.iter_mut() {
613            let mut params = config_params.clone();
614            params.workload_class = Some(instance_workload_classes[id].clone());
615            instance.call(|i| i.update_configuration(params));
616        }
617
618        let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(&self.dyncfg);
619        match overflowing_behavior.parse() {
620            Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
621            Err(err) => {
622                tracing::error!(
623                    err,
624                    overflowing_behavior,
625                    "Invalid value for ore_overflowing_behavior"
626                );
627            }
628        }
629
630        // Remember updates for future clusters.
631        self.config.update(config_params);
632    }
633
634    /// Mark the end of any initialization commands.
635    ///
636    /// The implementor may wait for this method to be called before implementing prior commands,
637    /// and so it is important for a user to invoke this method as soon as it is comfortable.
638    /// This method can be invoked immediately, at the potential expense of performance.
639    pub fn initialization_complete(&mut self) {
640        self.initialized = true;
641        for instance in self.instances.values_mut() {
642            instance.call(Instance::initialization_complete);
643        }
644    }
645
646    /// Wait until the controller is ready to do some processing.
647    ///
648    /// This method may block for an arbitrarily long time.
649    ///
650    /// When the method returns, the caller should call [`ComputeController::process`].
651    ///
652    /// This method is cancellation safe.
653    pub async fn ready(&mut self) {
654        if self.stashed_response.is_some() {
655            // We still have a response stashed, which we are immediately ready to process.
656            return;
657        }
658        if self.maintenance_scheduled {
659            // Maintenance work has been scheduled.
660            return;
661        }
662
663        tokio::select! {
664            resp = self.response_rx.recv() => {
665                let resp = resp.expect("`self.response_tx` not dropped");
666                self.stashed_response = Some(resp);
667            }
668            _ = self.maintenance_ticker.tick() => {
669                self.maintenance_scheduled = true;
670            },
671        }
672    }
673
674    /// Adds replicas of an instance.
675    pub fn add_replica_to_instance(
676        &mut self,
677        instance_id: ComputeInstanceId,
678        replica_id: ReplicaId,
679        location: ClusterReplicaLocation,
680        config: ComputeReplicaConfig,
681    ) -> Result<(), ReplicaCreationError> {
682        use ReplicaCreationError::*;
683
684        let instance = self.instance(instance_id)?;
685
686        // Validation
687        if instance.replicas.contains(&replica_id) {
688            return Err(ReplicaExists(replica_id));
689        }
690
691        let (enable_logging, interval) = match config.logging.interval {
692            Some(interval) => (true, interval),
693            None => (false, Duration::from_secs(1)),
694        };
695
696        let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.dyncfg);
697
698        let replica_config = ReplicaConfig {
699            location,
700            logging: LoggingConfig {
701                interval,
702                enable_logging,
703                log_logging: config.logging.log_logging,
704                index_logs: Default::default(),
705            },
706            grpc_client: self.config.grpc_client.clone(),
707            expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
708        };
709
710        let instance = self.instance_mut(instance_id).expect("validated");
711        instance.replicas.insert(replica_id);
712
713        instance.call(move |i| {
714            i.add_replica(replica_id, replica_config, None)
715                .expect("validated")
716        });
717
718        Ok(())
719    }
720
721    /// Removes a replica from an instance, including its service in the orchestrator.
722    pub fn drop_replica(
723        &mut self,
724        instance_id: ComputeInstanceId,
725        replica_id: ReplicaId,
726    ) -> Result<(), ReplicaDropError> {
727        use ReplicaDropError::*;
728
729        let instance = self.instance_mut(instance_id)?;
730
731        // Validation
732        if !instance.replicas.contains(&replica_id) {
733            return Err(ReplicaMissing(replica_id));
734        }
735
736        instance.replicas.remove(&replica_id);
737
738        instance.call(move |i| i.remove_replica(replica_id).expect("validated"));
739
740        Ok(())
741    }
742
743    /// Creates the described dataflow and initializes state for its output.
744    ///
745    /// Only materialized views and subscribes are allowed to have a `target_replica`.
746    ///
747    /// Panics if called with a dataflow description that has index exports
748    /// when `target_replica` is set.
749    pub fn create_dataflow(
750        &mut self,
751        instance_id: ComputeInstanceId,
752        mut dataflow: DataflowDescription<mz_compute_types::plan::Plan, ()>,
753        target_replica: Option<ReplicaId>,
754    ) -> Result<(), DataflowCreationError> {
755        use DataflowCreationError::*;
756
757        let instance = self.instance(instance_id)?;
758
759        // Validation: target replica
760        if let Some(replica_id) = target_replica {
761            if !instance.replicas.contains(&replica_id) {
762                return Err(ReplicaMissing(replica_id));
763            }
764            assert!(
765                dataflow.exported_index_ids().next().is_none(),
766                "Replica-targeted indexes are not supported"
767            );
768        }
769
770        // Validation: as_of
771        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
772        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
773            return Err(EmptyAsOfForSubscribe);
774        }
775        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
776            return Err(EmptyAsOfForCopyTo);
777        }
778
779        // Validation: input collections
780        let storage_ids = dataflow.imported_source_ids().collect();
781        let mut import_read_holds = self.storage_collections.acquire_read_holds(storage_ids)?;
782        for id in dataflow.imported_index_ids() {
783            let read_hold = instance.acquire_read_hold(id)?;
784            import_read_holds.push(read_hold);
785        }
786        for hold in &import_read_holds {
787            if PartialOrder::less_than(as_of, hold.since()) {
788                return Err(SinceViolation(hold.id()));
789            }
790        }
791
792        // Validation: storage sink collections
793        for id in dataflow.persist_sink_ids() {
794            if self.storage_collections.check_exists(id).is_err() {
795                return Err(CollectionMissing(id));
796            }
797        }
798        let time_dependence = self
799            .determine_time_dependence(instance_id, &dataflow)
800            .expect("must exist");
801
802        let instance = self.instance_mut(instance_id).expect("validated");
803
804        let mut shared_collection_state = BTreeMap::new();
805        for id in dataflow.export_ids() {
806            let shared = SharedCollectionState::new(as_of.clone());
807            let collection = Collection {
808                write_only: dataflow.sink_exports.contains_key(&id),
809                compute_dependencies: dataflow.imported_index_ids().collect(),
810                shared: shared.clone(),
811                time_dependence: time_dependence.clone(),
812            };
813            instance.collections.insert(id, collection);
814            shared_collection_state.insert(id, shared);
815        }
816
817        dataflow.time_dependence = time_dependence;
818
819        instance.call(move |i| {
820            i.create_dataflow(
821                dataflow,
822                import_read_holds,
823                shared_collection_state,
824                target_replica,
825            )
826            .expect("validated")
827        });
828
829        Ok(())
830    }
831
832    /// Drop the read capability for the given collections and allow their resources to be
833    /// reclaimed.
834    pub fn drop_collections(
835        &mut self,
836        instance_id: ComputeInstanceId,
837        collection_ids: Vec<GlobalId>,
838    ) -> Result<(), CollectionUpdateError> {
839        let instance = self.instance_mut(instance_id)?;
840
841        // Validation
842        for id in &collection_ids {
843            instance.collection(*id)?;
844        }
845
846        for id in &collection_ids {
847            instance.collections.remove(id);
848        }
849
850        instance.call(|i| i.drop_collections(collection_ids).expect("validated"));
851
852        Ok(())
853    }
854
855    /// Initiate a peek request for the contents of the given collection at `timestamp`.
856    ///
857    /// The caller supplies a `read_hold` for the peek target — via
858    /// [`ComputeController::acquire_read_hold`] for `PeekTarget::Index`, or via the storage
859    /// collections for `PeekTarget::Persist`. The hold keeps the collection's `since` at
860    /// `<= timestamp` until the peek completes.
861    pub fn peek(
862        &self,
863        instance_id: ComputeInstanceId,
864        peek_target: PeekTarget,
865        literal_constraints: Option<Vec<Row>>,
866        uuid: Uuid,
867        timestamp: Timestamp,
868        result_desc: RelationDesc,
869        finishing: RowSetFinishing,
870        map_filter_project: mz_expr::SafeMfpPlan,
871        read_hold: ReadHold,
872        target_replica: Option<ReplicaId>,
873        peek_response_tx: oneshot::Sender<PeekResponse>,
874    ) -> Result<(), PeekError> {
875        use PeekError::*;
876
877        let instance = self.instance(instance_id)?;
878
879        // Validation: target replica
880        if let Some(replica_id) = target_replica {
881            if !instance.replicas.contains(&replica_id) {
882                return Err(ReplicaMissing(replica_id));
883            }
884        }
885
886        // Validation: the read hold must target this collection and must hold its `since`
887        // at `<= timestamp`.
888        if read_hold.id() != peek_target.id() {
889            return Err(ReadHoldIdMismatch(read_hold.id()));
890        }
891        if !read_hold.since().less_equal(&timestamp) {
892            return Err(SinceViolation(peek_target.id()));
893        }
894
895        instance.call(move |i| {
896            i.peek(
897                peek_target,
898                literal_constraints,
899                uuid,
900                timestamp,
901                result_desc,
902                finishing,
903                map_filter_project,
904                read_hold,
905                target_replica,
906                peek_response_tx,
907            )
908            .expect("validated")
909        });
910
911        Ok(())
912    }
913
914    /// Cancel an existing peek request.
915    ///
916    /// Canceling a peek is best effort. The caller may see any of the following
917    /// after canceling a peek request:
918    ///
919    ///   * A `PeekResponse::Rows` indicating that the cancellation request did
920    ///     not take effect in time and the query succeeded.
921    ///   * A `PeekResponse::Canceled` affirming that the peek was canceled.
922    ///   * No `PeekResponse` at all.
923    pub fn cancel_peek(
924        &self,
925        instance_id: ComputeInstanceId,
926        uuid: Uuid,
927        reason: PeekResponse,
928    ) -> Result<(), InstanceMissing> {
929        self.instance(instance_id)?
930            .call(move |i| i.cancel_peek(uuid, reason));
931        Ok(())
932    }
933
934    /// Assign a read policy to specific identifiers.
935    ///
936    /// The policies are assigned in the order presented, and repeated identifiers should
937    /// conclude with the last policy. Changing a policy will immediately downgrade the read
938    /// capability if appropriate, but it will not "recover" the read capability if the prior
939    /// capability is already ahead of it.
940    ///
941    /// Identifiers not present in `policies` retain their existing read policies.
942    ///
943    /// It is an error to attempt to set a read policy for a collection that is not readable in the
944    /// context of compute. At this time, only indexes are readable compute collections.
945    pub fn set_read_policy(
946        &self,
947        instance_id: ComputeInstanceId,
948        policies: Vec<(GlobalId, ReadPolicy)>,
949    ) -> Result<(), ReadPolicyError> {
950        use ReadPolicyError::*;
951
952        let instance = self.instance(instance_id)?;
953
954        // Validation
955        for (id, _) in &policies {
956            let collection = instance.collection(*id)?;
957            if collection.write_only {
958                return Err(WriteOnlyCollection(*id));
959            }
960        }
961
962        self.instance(instance_id)?
963            .call(|i| i.set_read_policy(policies).expect("validated"));
964
965        Ok(())
966    }
967
968    /// Acquires a [`ReadHold`] for the identified compute collection.
969    pub fn acquire_read_hold(
970        &self,
971        instance_id: ComputeInstanceId,
972        collection_id: GlobalId,
973    ) -> Result<ReadHold, CollectionUpdateError> {
974        let read_hold = self
975            .instance(instance_id)?
976            .acquire_read_hold(collection_id)?;
977        Ok(read_hold)
978    }
979
980    /// Determine the time dependence for a dataflow.
981    fn determine_time_dependence(
982        &self,
983        instance_id: ComputeInstanceId,
984        dataflow: &DataflowDescription<mz_compute_types::plan::Plan, ()>,
985    ) -> Result<Option<TimeDependence>, TimeDependenceError> {
986        let instance = self
987            .instance(instance_id)
988            .map_err(|err| TimeDependenceError::InstanceMissing(err.0))?;
989        let mut time_dependencies = Vec::new();
990
991        for id in dataflow.imported_index_ids() {
992            let dependence = instance
993                .get_time_dependence(id)
994                .map_err(|err| TimeDependenceError::CollectionMissing(err.0))?;
995            time_dependencies.push(dependence);
996        }
997
998        'source: for id in dataflow.imported_source_ids() {
999            // We first check whether the id is backed by a compute object, in which case we use
1000            // the time dependence we know. This is true for storage sinks.
1001            for instance in self.instances.values() {
1002                if let Ok(dependence) = instance.get_time_dependence(id) {
1003                    time_dependencies.push(dependence);
1004                    continue 'source;
1005                }
1006            }
1007
1008            // Not a compute object: Consult the storage collections controller.
1009            time_dependencies.push(self.storage_collections.determine_time_dependence(id)?);
1010        }
1011
1012        Ok(TimeDependence::merge(
1013            time_dependencies,
1014            dataflow.refresh_schedule.as_ref(),
1015        ))
1016    }
1017
1018    /// Processes the work queued by [`ComputeController::ready`].
1019    #[mz_ore::instrument(level = "debug")]
1020    pub fn process(&mut self) -> Option<ComputeControllerResponse> {
1021        // Perform periodic maintenance work.
1022        if self.maintenance_scheduled {
1023            self.maintain();
1024            self.maintenance_scheduled = false;
1025        }
1026
1027        // Return a ready response, if any.
1028        self.stashed_response.take()
1029    }
1030
1031    #[mz_ore::instrument(level = "debug")]
1032    fn maintain(&mut self) {
1033        // Perform instance maintenance work.
1034        for instance in self.instances.values_mut() {
1035            instance.call(Instance::maintain);
1036        }
1037    }
1038
1039    /// Allow writes for the specified collections on `instance_id`.
1040    ///
1041    /// If this controller is in read-only mode, this is a no-op.
1042    pub fn allow_writes(
1043        &mut self,
1044        instance_id: ComputeInstanceId,
1045        collection_id: GlobalId,
1046    ) -> Result<(), CollectionUpdateError> {
1047        if self.read_only {
1048            tracing::debug!("Skipping allow_writes in read-only mode");
1049            return Ok(());
1050        }
1051
1052        let instance = self.instance_mut(instance_id)?;
1053
1054        // Validation
1055        instance.collection(collection_id)?;
1056
1057        instance.call(move |i| i.allow_writes(collection_id).expect("validated"));
1058
1059        Ok(())
1060    }
1061}
1062
1063#[derive(Debug)]
1064struct InstanceState {
1065    client: InstanceClient,
1066    replicas: BTreeSet<ReplicaId>,
1067    collections: BTreeMap<GlobalId, Collection>,
1068}
1069
1070impl InstanceState {
1071    fn new(client: InstanceClient, collections: BTreeMap<GlobalId, Collection>) -> Self {
1072        Self {
1073            client,
1074            replicas: Default::default(),
1075            collections,
1076        }
1077    }
1078
1079    fn collection(&self, id: GlobalId) -> Result<&Collection, CollectionMissing> {
1080        self.collections.get(&id).ok_or(CollectionMissing(id))
1081    }
1082
1083    /// Calls the given function on the instance task. Does not await the result.
1084    ///
1085    /// # Panics
1086    ///
1087    /// Panics if the instance corresponding to `self` does not exist.
1088    fn call<F>(&self, f: F)
1089    where
1090        F: FnOnce(&mut Instance) + Send + 'static,
1091    {
1092        self.client.call(f).expect("instance not dropped")
1093    }
1094
1095    /// Calls the given function on the instance task, and awaits the result.
1096    ///
1097    /// # Panics
1098    ///
1099    /// Panics if the instance corresponding to `self` does not exist.
1100    async fn call_sync<F, R>(&self, f: F) -> R
1101    where
1102        F: FnOnce(&mut Instance) -> R + Send + 'static,
1103        R: Send + 'static,
1104    {
1105        self.client
1106            .call_sync(f)
1107            .await
1108            .expect("instance not dropped")
1109    }
1110
1111    /// Acquires a [`ReadHold`] for the identified compute collection.
1112    pub fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold, CollectionMissing> {
1113        // We acquire read holds at the earliest possible time rather than returning a copy
1114        // of the implied read hold. This is so that in `create_dataflow` we can acquire read holds
1115        // on compute dependencies at frontiers that are held back by other read holds the caller
1116        // has previously taken.
1117        //
1118        // If/when we change the compute API to expect callers to pass in the `ReadHold`s rather
1119        // than acquiring them ourselves, we might tighten this up and instead acquire read holds
1120        // at the implied capability.
1121
1122        let collection = self.collection(id)?;
1123        let since = collection.shared.lock_read_capabilities(|caps| {
1124            let since = caps.frontier().to_owned();
1125            caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
1126            since
1127        });
1128
1129        let hold = ReadHold::new(id, since, self.client.read_hold_tx());
1130        Ok(hold)
1131    }
1132
1133    /// Return the stored time dependence for a collection.
1134    fn get_time_dependence(
1135        &self,
1136        id: GlobalId,
1137    ) -> Result<Option<TimeDependence>, CollectionMissing> {
1138        Ok(self.collection(id)?.time_dependence.clone())
1139    }
1140
1141    /// Returns the [`InstanceState`] formatted as JSON.
1142    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1143        // Destructure `self` here so we don't forget to consider dumping newly added fields.
1144        let Self {
1145            client: _,
1146            replicas,
1147            collections,
1148        } = self;
1149
1150        let instance = self.call_sync(|i| i.dump()).await?;
1151        let replicas: Vec<_> = replicas.iter().map(|id| id.to_string()).collect();
1152        let collections: BTreeMap<_, _> = collections
1153            .iter()
1154            .map(|(id, c)| (id.to_string(), format!("{c:?}")))
1155            .collect();
1156
1157        Ok(serde_json::json!({
1158            "instance": instance,
1159            "replicas": replicas,
1160            "collections": collections,
1161        }))
1162    }
1163}
1164
1165#[derive(Debug)]
1166struct Collection {
1167    /// Whether a collection is write-only, i.e., we cannot read it directly like an index.
1168    write_only: bool,
1169    compute_dependencies: BTreeSet<GlobalId>,
1170    shared: SharedCollectionState,
1171    /// The computed time dependence for this collection. None indicates no specific information,
1172    /// a value describes how the collection relates to wall-clock time.
1173    time_dependence: Option<TimeDependence>,
1174}
1175
1176impl Collection {
1177    fn new_log() -> Self {
1178        let as_of = Antichain::from_elem(Timestamp::MIN);
1179        Self {
1180            write_only: false,
1181            compute_dependencies: Default::default(),
1182            shared: SharedCollectionState::new(as_of),
1183            time_dependence: Some(TimeDependence::default()),
1184        }
1185    }
1186
1187    fn frontiers(&self) -> CollectionFrontiers {
1188        let read_frontier = self
1189            .shared
1190            .lock_read_capabilities(|c| c.frontier().to_owned());
1191        let write_frontier = self.shared.lock_write_frontier(|f| f.clone());
1192        CollectionFrontiers {
1193            read_frontier,
1194            write_frontier,
1195        }
1196    }
1197}
1198
1199/// The frontiers of a compute collection.
1200#[derive(Clone, Debug)]
1201pub struct CollectionFrontiers {
1202    /// The read frontier.
1203    pub read_frontier: Antichain<Timestamp>,
1204    /// The write frontier.
1205    pub write_frontier: Antichain<Timestamp>,
1206}
1207
1208impl Default for CollectionFrontiers {
1209    fn default() -> Self {
1210        Self {
1211            read_frontier: Antichain::from_elem(Timestamp::MIN),
1212            write_frontier: Antichain::from_elem(Timestamp::MIN),
1213        }
1214    }
1215}