Skip to main content

mz_compute_client/
controller.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller that provides an interface to the compute layer, and the storage layer below it.
11//!
12//! The compute controller manages the creation, maintenance, and removal of compute instances.
13//! This involves ensuring the intended service state with the orchestrator, as well as maintaining
14//! a dedicated compute instance controller for each active compute instance.
15//!
16//! For each compute instance, the compute controller curates the creation of indexes and sinks
17//! installed on the instance, the progress of readers through these collections, and their
18//! eventual dropping and resource reclamation.
19//!
20//! The state maintained for a compute instance can be viewed as a partial map from `GlobalId` to
21//! collection. It is an error to use an identifier before it has been "created" with
22//! `create_dataflow()`. Once created, the controller holds a read capability for each output
23//! collection of a dataflow, which is manipulated with `set_read_policy()`. Eventually, a
24//! collection is dropped with `drop_collections()`.
25//!
26//! A dataflow can be in read-only or read-write mode. In read-only mode, the dataflow does not
27//! modify any persistent state. Sending a `allow_write` message to the compute instance will
28//! transition the dataflow to read-write mode, allowing it to write to persistent sinks.
29//!
30//! Created dataflows will prevent the compaction of their inputs, including other compute
31//! collections but also collections managed by the storage layer. Each dataflow input is prevented
32//! from compacting beyond the allowed compaction of each of its outputs, ensuring that we can
33//! recover each dataflow to its current state in case of failure or other reconfiguration.
34
35use std::collections::{BTreeMap, BTreeSet};
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use mz_build_info::BuildInfo;
40use mz_cluster_client::client::ClusterReplicaLocation;
41use mz_cluster_client::metrics::ControllerMetrics;
42use mz_cluster_client::{ReplicaId, WallclockLagFn};
43use mz_compute_types::ComputeInstanceId;
44use mz_compute_types::config::ComputeReplicaConfig;
45use mz_compute_types::dataflows::DataflowDescription;
46use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
47use mz_dyncfg::ConfigSet;
48use mz_expr::RowSetFinishing;
49use mz_expr::row::RowCollection;
50use mz_ore::cast::CastFrom;
51use mz_ore::collections::CollectionExt;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::NowFn;
54use mz_ore::tracing::OpenTelemetryContext;
55use mz_persist_types::PersistLocation;
56use mz_repr::{GlobalId, RelationDesc, Row, Timestamp};
57use mz_storage_client::controller::StorageController;
58use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
59use mz_storage_types::read_holds::ReadHold;
60use mz_storage_types::read_policy::ReadPolicy;
61use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
62use prometheus::proto::LabelPair;
63use serde::{Deserialize, Serialize};
64use timely::PartialOrder;
65use timely::progress::Antichain;
66use tokio::sync::{mpsc, oneshot};
67use tokio::time::{self, MissedTickBehavior};
68use uuid::Uuid;
69
70use crate::controller::error::{
71    CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
72    HydrationCheckBadTarget, InstanceExists, InstanceMissing, PeekError, ReadPolicyError,
73    ReplicaCreationError, ReplicaDropError,
74};
75use crate::controller::instance::{Instance, SharedCollectionState};
76use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection_sink};
77use crate::controller::replica::ReplicaConfig;
78use crate::logging::{LogVariant, LoggingConfig};
79use crate::metrics::ComputeControllerMetrics;
80use crate::protocol::command::{ComputeParameters, PeekTarget};
81use crate::protocol::response::{PeekResponse, SubscribeBatch};
82
83mod instance;
84mod introspection;
85mod replica;
86mod sequential_hydration;
87
88pub mod error;
89pub mod instance_client;
90pub use instance_client::InstanceClient;
91
92pub(crate) type StorageCollections =
93    Arc<dyn mz_storage_client::storage_collections::StorageCollections + Send + Sync>;
94
95/// Responses from the compute controller.
96#[derive(Debug)]
97pub enum ComputeControllerResponse {
98    /// See [`PeekNotification`].
99    PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
100    /// See [`crate::protocol::response::ComputeResponse::SubscribeResponse`].
101    SubscribeResponse(GlobalId, SubscribeBatch),
102    /// The response from a dataflow containing an `CopyToS3Oneshot` sink.
103    ///
104    /// The `GlobalId` identifies the sink. The `Result` is the response from
105    /// the sink, where an `Ok(n)` indicates that `n` rows were successfully
106    /// copied to S3 and an `Err` indicates that an error was encountered
107    /// during the copy operation.
108    ///
109    /// For a given `CopyToS3Oneshot` sink, there will be at most one `CopyToResponse`
110    /// produced. (The sink may produce no responses if its dataflow is dropped
111    /// before completion.)
112    CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
113    /// A response reporting advancement of a collection's upper frontier.
114    ///
115    /// Once a collection's upper (aka "write frontier") has advanced to beyond a given time, the
116    /// contents of the collection as of that time have been sealed and cannot change anymore.
117    FrontierUpper {
118        /// The ID of a compute collection.
119        id: GlobalId,
120        /// The new upper frontier of the identified compute collection.
121        upper: Antichain<Timestamp>,
122    },
123}
124
125/// Notification and summary of a received and forwarded [`crate::protocol::response::ComputeResponse::PeekResponse`].
126#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
127pub enum PeekNotification {
128    /// Returned rows of a successful peek.
129    Success {
130        /// Number of rows in the returned peek result.
131        rows: u64,
132        /// Size of the returned peek result in bytes.
133        result_size: u64,
134    },
135    /// Error of an unsuccessful peek, including the reason for the error.
136    Error(String),
137    /// The peek was canceled.
138    Canceled,
139}
140
141impl PeekNotification {
142    /// Construct a new [`PeekNotification`] from a [`PeekResponse`]. The `offset` and `limit`
143    /// parameters are used to calculate the number of rows in the peek result.
144    fn new(peek_response: &PeekResponse, offset: usize, limit: Option<usize>) -> Self {
145        match peek_response {
146            PeekResponse::Rows(rows) => {
147                let num_rows = u64::cast_from(RowCollection::offset_limit(
148                    rows.iter().map(|r| r.count()).sum(),
149                    offset,
150                    limit,
151                ));
152                let result_size = u64::cast_from(rows.iter().map(|r| r.byte_len()).sum::<usize>());
153
154                tracing::trace!(?num_rows, ?result_size, "inline peek result");
155
156                Self::Success {
157                    rows: num_rows,
158                    result_size,
159                }
160            }
161            PeekResponse::Stashed(stashed_response) => {
162                let rows = stashed_response.num_rows(offset, limit);
163                let result_size = stashed_response.size_bytes();
164
165                tracing::trace!(?rows, ?result_size, "stashed peek result");
166
167                Self::Success {
168                    rows: u64::cast_from(rows),
169                    result_size: u64::cast_from(result_size),
170                }
171            }
172            PeekResponse::Error(err) => Self::Error(err.clone()),
173            PeekResponse::Canceled => Self::Canceled,
174        }
175    }
176}
177
178/// A controller for the compute layer.
179pub struct ComputeController {
180    instances: BTreeMap<ComputeInstanceId, InstanceState>,
181    /// A map from an instance ID to an arbitrary string that describes the
182    /// class of the workload that compute instance is running (e.g.,
183    /// `production` or `staging`).
184    instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>,
185    build_info: &'static BuildInfo,
186    /// A handle providing access to storage collections.
187    storage_collections: StorageCollections,
188    /// Set to `true` once `initialization_complete` has been called.
189    initialized: bool,
190    /// Whether or not this controller is in read-only mode.
191    ///
192    /// When in read-only mode, neither this controller nor the instances
193    /// controlled by it are allowed to affect changes to external systems
194    /// (largely persist).
195    read_only: bool,
196    /// Compute configuration to apply to new instances.
197    config: ComputeParameters,
198    /// The persist location where we can stash large peek results.
199    peek_stash_persist_location: PersistLocation,
200    /// A controller response to be returned on the next call to [`ComputeController::process`].
201    stashed_response: Option<ComputeControllerResponse>,
202    /// The compute controller metrics.
203    metrics: ComputeControllerMetrics,
204    /// A function that produces the current wallclock time.
205    now: NowFn,
206    /// A function that computes the lag between the given time and wallclock time.
207    wallclock_lag: WallclockLagFn<Timestamp>,
208    /// Dynamic system configuration.
209    ///
210    /// Updated through `ComputeController::update_configuration` calls and shared with all
211    /// subcomponents of the compute controller.
212    dyncfg: Arc<ConfigSet>,
213
214    /// Receiver for responses produced by `Instance`s.
215    response_rx: mpsc::UnboundedReceiver<ComputeControllerResponse>,
216    /// Response sender that's passed to new `Instance`s.
217    response_tx: mpsc::UnboundedSender<ComputeControllerResponse>,
218    /// Receiver for introspection updates produced by `Instance`s.
219    ///
220    /// When [`ComputeController::start_introspection_sink`] is first called, this receiver is
221    /// passed to the introspection sink task.
222    introspection_rx: Option<mpsc::UnboundedReceiver<IntrospectionUpdates>>,
223    /// Introspection updates sender that's passed to new `Instance`s.
224    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
225
226    /// Ticker for scheduling periodic maintenance work.
227    maintenance_ticker: tokio::time::Interval,
228    /// Whether maintenance work was scheduled.
229    maintenance_scheduled: bool,
230}
231
232impl ComputeController {
233    /// Construct a new [`ComputeController`].
234    pub fn new(
235        build_info: &'static BuildInfo,
236        storage_collections: StorageCollections,
237        read_only: bool,
238        metrics_registry: &MetricsRegistry,
239        peek_stash_persist_location: PersistLocation,
240        controller_metrics: ControllerMetrics,
241        now: NowFn,
242        wallclock_lag: WallclockLagFn<Timestamp>,
243    ) -> Self {
244        let (response_tx, response_rx) = mpsc::unbounded_channel();
245        let (introspection_tx, introspection_rx) = mpsc::unbounded_channel();
246
247        let mut maintenance_ticker = time::interval(Duration::from_secs(1));
248        maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
249
250        let instance_workload_classes = Arc::new(Mutex::new(BTreeMap::<
251            ComputeInstanceId,
252            Option<String>,
253        >::new()));
254
255        // Apply a `workload_class` label to all metrics in the registry that
256        // have an `instance_id` label for an instance whose workload class is
257        // known.
258        metrics_registry.register_postprocessor({
259            let instance_workload_classes = Arc::clone(&instance_workload_classes);
260            move |metrics| {
261                let instance_workload_classes = instance_workload_classes
262                    .lock()
263                    .expect("lock poisoned")
264                    .iter()
265                    .map(|(id, workload_class)| (id.to_string(), workload_class.clone()))
266                    .collect::<BTreeMap<String, Option<String>>>();
267                for metric in metrics {
268                    'metric: for metric in metric.mut_metric() {
269                        for label in metric.get_label() {
270                            if label.name() == "instance_id" {
271                                if let Some(workload_class) = instance_workload_classes
272                                    .get(label.value())
273                                    .cloned()
274                                    .flatten()
275                                {
276                                    let mut label = LabelPair::default();
277                                    label.set_name("workload_class".into());
278                                    label.set_value(workload_class.clone());
279
280                                    let mut labels = metric.take_label();
281                                    labels.push(label);
282                                    metric.set_label(labels);
283                                }
284                                continue 'metric;
285                            }
286                        }
287                    }
288                }
289            }
290        });
291
292        let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics);
293
294        Self {
295            instances: BTreeMap::new(),
296            instance_workload_classes,
297            build_info,
298            storage_collections,
299            initialized: false,
300            read_only,
301            config: Default::default(),
302            peek_stash_persist_location,
303            stashed_response: None,
304            metrics,
305            now,
306            wallclock_lag,
307            dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()),
308            response_rx,
309            response_tx,
310            introspection_rx: Some(introspection_rx),
311            introspection_tx,
312            maintenance_ticker,
313            maintenance_scheduled: false,
314        }
315    }
316
317    /// Start sinking the compute controller's introspection data into storage.
318    ///
319    /// This method should be called once the introspection collections have been registered with
320    /// the storage controller. It will panic if invoked earlier than that.
321    pub fn start_introspection_sink(&mut self, storage_controller: &dyn StorageController) {
322        if let Some(rx) = self.introspection_rx.take() {
323            spawn_introspection_sink(rx, storage_controller);
324        }
325    }
326
327    /// TODO(database-issues#7533): Add documentation.
328    pub fn instance_exists(&self, id: ComputeInstanceId) -> bool {
329        self.instances.contains_key(&id)
330    }
331
332    /// Return a reference to the indicated compute instance.
333    fn instance(&self, id: ComputeInstanceId) -> Result<&InstanceState, InstanceMissing> {
334        self.instances.get(&id).ok_or(InstanceMissing(id))
335    }
336
337    /// Return an `InstanceClient` for the indicated compute instance.
338    pub fn instance_client(
339        &self,
340        id: ComputeInstanceId,
341    ) -> Result<InstanceClient, InstanceMissing> {
342        self.instance(id).map(|instance| instance.client.clone())
343    }
344
345    /// Return a mutable reference to the indicated compute instance.
346    fn instance_mut(
347        &mut self,
348        id: ComputeInstanceId,
349    ) -> Result<&mut InstanceState, InstanceMissing> {
350        self.instances.get_mut(&id).ok_or(InstanceMissing(id))
351    }
352
353    /// List the IDs of all collections in the identified compute instance.
354    pub fn collection_ids(
355        &self,
356        instance_id: ComputeInstanceId,
357    ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
358        let instance = self.instance(instance_id)?;
359        let ids = instance.collections.keys().copied();
360        Ok(ids)
361    }
362
363    /// Return the frontiers of the indicated collection.
364    ///
365    /// If an `instance_id` is provided, the collection is assumed to be installed on that
366    /// instance. Otherwise all available instances are searched.
367    pub fn collection_frontiers(
368        &self,
369        collection_id: GlobalId,
370        instance_id: Option<ComputeInstanceId>,
371    ) -> Result<CollectionFrontiers, CollectionLookupError> {
372        let collection = match instance_id {
373            Some(id) => self.instance(id)?.collection(collection_id)?,
374            None => self
375                .instances
376                .values()
377                .find_map(|i| i.collections.get(&collection_id))
378                .ok_or(CollectionMissing(collection_id))?,
379        };
380
381        Ok(collection.frontiers())
382    }
383
384    /// List compute collections that depend on the given collection.
385    pub fn collection_reverse_dependencies(
386        &self,
387        instance_id: ComputeInstanceId,
388        id: GlobalId,
389    ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
390        let instance = self.instance(instance_id)?;
391        let collections = instance.collections.iter();
392        let ids = collections
393            .filter_map(move |(cid, c)| c.compute_dependencies.contains(&id).then_some(*cid));
394        Ok(ids)
395    }
396
397    /// Returns `true` iff the given collection has been hydrated.
398    ///
399    /// For this check, zero-replica clusters are always considered hydrated.
400    /// Their collections would never normally be considered hydrated but it's
401    /// clearly intentional that they have no replicas.
402    pub async fn collection_hydrated(
403        &self,
404        instance_id: ComputeInstanceId,
405        collection_id: GlobalId,
406    ) -> Result<bool, anyhow::Error> {
407        let instance = self.instance(instance_id)?;
408
409        let res = instance
410            .call_sync(move |i| i.collection_hydrated(collection_id))
411            .await?;
412
413        Ok(res)
414    }
415
416    /// Returns `true` if all non-transient, non-excluded collections are hydrated on any of the
417    /// provided replicas.
418    ///
419    /// For this check, zero-replica clusters are always considered hydrated.
420    /// Their collections would never normally be considered hydrated but it's
421    /// clearly intentional that they have no replicas.
422    pub fn collections_hydrated_for_replicas(
423        &self,
424        instance_id: ComputeInstanceId,
425        replicas: Vec<ReplicaId>,
426        exclude_collections: BTreeSet<GlobalId>,
427    ) -> Result<oneshot::Receiver<bool>, anyhow::Error> {
428        let instance = self.instance(instance_id)?;
429
430        // Validation
431        if !instance.replicas.is_empty()
432            && !replicas.iter().any(|id| instance.replicas.contains(id))
433        {
434            return Err(HydrationCheckBadTarget(replicas).into());
435        }
436
437        let (tx, rx) = oneshot::channel();
438        instance.call(move |i| {
439            let result = i
440                .collections_hydrated_on_replicas(Some(replicas), &exclude_collections)
441                .expect("validated");
442            let _ = tx.send(result);
443        });
444
445        Ok(rx)
446    }
447
448    /// Returns the state of the [`ComputeController`] formatted as JSON.
449    ///
450    /// The returned value is not guaranteed to be stable and may change at any point in time.
451    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
452        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
453        // returned object as a tradeoff between usability and stability. `serde_json` will fail
454        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
455        // prevents a future unrelated change from silently breaking this method.
456
457        // Destructure `self` here so we don't forget to consider dumping newly added fields.
458        let Self {
459            instances,
460            instance_workload_classes,
461            build_info: _,
462            storage_collections: _,
463            initialized,
464            read_only,
465            config: _,
466            peek_stash_persist_location: _,
467            stashed_response,
468            metrics: _,
469            now: _,
470            wallclock_lag: _,
471            dyncfg: _,
472            response_rx: _,
473            response_tx: _,
474            introspection_rx: _,
475            introspection_tx: _,
476            maintenance_ticker: _,
477            maintenance_scheduled,
478        } = self;
479
480        let mut instances_dump = BTreeMap::new();
481        for (id, instance) in instances {
482            let dump = instance.dump().await?;
483            instances_dump.insert(id.to_string(), dump);
484        }
485
486        let instance_workload_classes: BTreeMap<_, _> = instance_workload_classes
487            .lock()
488            .expect("lock poisoned")
489            .iter()
490            .map(|(id, wc)| (id.to_string(), format!("{wc:?}")))
491            .collect();
492
493        Ok(serde_json::json!({
494            "instances": instances_dump,
495            "instance_workload_classes": instance_workload_classes,
496            "initialized": initialized,
497            "read_only": read_only,
498            "stashed_response": format!("{stashed_response:?}"),
499            "maintenance_scheduled": maintenance_scheduled,
500        }))
501    }
502}
503
504impl ComputeController {
505    /// Create a compute instance.
506    pub fn create_instance(
507        &mut self,
508        id: ComputeInstanceId,
509        arranged_logs: BTreeMap<LogVariant, GlobalId>,
510        workload_class: Option<String>,
511    ) -> Result<(), InstanceExists> {
512        if self.instances.contains_key(&id) {
513            return Err(InstanceExists(id));
514        }
515
516        let mut collections = BTreeMap::new();
517        let mut logs = Vec::with_capacity(arranged_logs.len());
518        for (&log, &id) in &arranged_logs {
519            let collection = Collection::new_log();
520            let shared = collection.shared.clone();
521            collections.insert(id, collection);
522            logs.push((log, id, shared));
523        }
524
525        let client = InstanceClient::spawn(
526            id,
527            self.build_info,
528            Arc::clone(&self.storage_collections),
529            self.peek_stash_persist_location.clone(),
530            logs,
531            self.metrics.for_instance(id),
532            self.now.clone(),
533            self.wallclock_lag.clone(),
534            Arc::clone(&self.dyncfg),
535            self.response_tx.clone(),
536            self.introspection_tx.clone(),
537            self.read_only,
538        );
539
540        let instance = InstanceState::new(client, collections);
541        self.instances.insert(id, instance);
542
543        self.instance_workload_classes
544            .lock()
545            .expect("lock poisoned")
546            .insert(id, workload_class.clone());
547
548        let instance = self.instances.get_mut(&id).expect("instance just added");
549        if self.initialized {
550            instance.call(Instance::initialization_complete);
551        }
552
553        let mut config_params = self.config.clone();
554        config_params.workload_class = Some(workload_class);
555        instance.call(|i| i.update_configuration(config_params));
556
557        Ok(())
558    }
559
560    /// Updates a compute instance's workload class.
561    pub fn update_instance_workload_class(
562        &mut self,
563        id: ComputeInstanceId,
564        workload_class: Option<String>,
565    ) -> Result<(), InstanceMissing> {
566        // Ensure that the instance exists first.
567        let _ = self.instance(id)?;
568
569        self.instance_workload_classes
570            .lock()
571            .expect("lock poisoned")
572            .insert(id, workload_class);
573
574        // Cause a config update to notify the instance about its new workload class.
575        self.update_configuration(Default::default());
576
577        Ok(())
578    }
579
580    /// Remove a compute instance.
581    ///
582    /// # Panics
583    ///
584    /// Panics if the identified `instance` still has active replicas.
585    pub fn drop_instance(&mut self, id: ComputeInstanceId) {
586        if let Some(instance) = self.instances.remove(&id) {
587            instance.call(|i| i.shutdown());
588        }
589
590        self.instance_workload_classes
591            .lock()
592            .expect("lock poisoned")
593            .remove(&id);
594    }
595
596    /// Returns the compute controller's config set.
597    pub fn dyncfg(&self) -> &Arc<ConfigSet> {
598        &self.dyncfg
599    }
600
601    /// Update compute configuration.
602    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
603        // Apply dyncfg updates.
604        config_params.dyncfg_updates.apply(&self.dyncfg);
605
606        let instance_workload_classes = self
607            .instance_workload_classes
608            .lock()
609            .expect("lock poisoned");
610
611        // Forward updates to existing clusters.
612        // Workload classes are cluster-specific, so we need to overwrite them here.
613        for (id, instance) in self.instances.iter_mut() {
614            let mut params = config_params.clone();
615            params.workload_class = Some(instance_workload_classes[id].clone());
616            instance.call(|i| i.update_configuration(params));
617        }
618
619        let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(&self.dyncfg);
620        match overflowing_behavior.parse() {
621            Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
622            Err(err) => {
623                tracing::error!(
624                    err,
625                    overflowing_behavior,
626                    "Invalid value for ore_overflowing_behavior"
627                );
628            }
629        }
630
631        // Remember updates for future clusters.
632        self.config.update(config_params);
633    }
634
635    /// Mark the end of any initialization commands.
636    ///
637    /// The implementor may wait for this method to be called before implementing prior commands,
638    /// and so it is important for a user to invoke this method as soon as it is comfortable.
639    /// This method can be invoked immediately, at the potential expense of performance.
640    pub fn initialization_complete(&mut self) {
641        self.initialized = true;
642        for instance in self.instances.values_mut() {
643            instance.call(Instance::initialization_complete);
644        }
645    }
646
647    /// Wait until the controller is ready to do some processing.
648    ///
649    /// This method may block for an arbitrarily long time.
650    ///
651    /// When the method returns, the caller should call [`ComputeController::process`].
652    ///
653    /// This method is cancellation safe.
654    pub async fn ready(&mut self) {
655        if self.stashed_response.is_some() {
656            // We still have a response stashed, which we are immediately ready to process.
657            return;
658        }
659        if self.maintenance_scheduled {
660            // Maintenance work has been scheduled.
661            return;
662        }
663
664        tokio::select! {
665            resp = self.response_rx.recv() => {
666                let resp = resp.expect("`self.response_tx` not dropped");
667                self.stashed_response = Some(resp);
668            }
669            _ = self.maintenance_ticker.tick() => {
670                self.maintenance_scheduled = true;
671            },
672        }
673    }
674
675    /// Adds replicas of an instance.
676    pub fn add_replica_to_instance(
677        &mut self,
678        instance_id: ComputeInstanceId,
679        replica_id: ReplicaId,
680        location: ClusterReplicaLocation,
681        config: ComputeReplicaConfig,
682    ) -> Result<(), ReplicaCreationError> {
683        use ReplicaCreationError::*;
684
685        let instance = self.instance(instance_id)?;
686
687        // Validation
688        if instance.replicas.contains(&replica_id) {
689            return Err(ReplicaExists(replica_id));
690        }
691
692        let (enable_logging, interval) = match config.logging.interval {
693            Some(interval) => (true, interval),
694            None => (false, Duration::from_secs(1)),
695        };
696
697        let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.dyncfg);
698
699        let replica_config = ReplicaConfig {
700            location,
701            logging: LoggingConfig {
702                interval,
703                enable_logging,
704                log_logging: config.logging.log_logging,
705                index_logs: Default::default(),
706            },
707            grpc_client: self.config.grpc_client.clone(),
708            expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
709        };
710
711        let instance = self.instance_mut(instance_id).expect("validated");
712        instance.replicas.insert(replica_id);
713
714        instance.call(move |i| {
715            i.add_replica(replica_id, replica_config, None)
716                .expect("validated")
717        });
718
719        Ok(())
720    }
721
722    /// Removes a replica from an instance, including its service in the orchestrator.
723    pub fn drop_replica(
724        &mut self,
725        instance_id: ComputeInstanceId,
726        replica_id: ReplicaId,
727    ) -> Result<(), ReplicaDropError> {
728        use ReplicaDropError::*;
729
730        let instance = self.instance_mut(instance_id)?;
731
732        // Validation
733        if !instance.replicas.contains(&replica_id) {
734            return Err(ReplicaMissing(replica_id));
735        }
736
737        instance.replicas.remove(&replica_id);
738
739        instance.call(move |i| i.remove_replica(replica_id).expect("validated"));
740
741        Ok(())
742    }
743
744    /// Creates the described dataflow and initializes state for its output.
745    ///
746    /// Only materialized views and subscribes are allowed to have a `target_replica`.
747    ///
748    /// Panics if called with a dataflow description that has index exports
749    /// when `target_replica` is set.
750    pub fn create_dataflow(
751        &mut self,
752        instance_id: ComputeInstanceId,
753        mut dataflow: DataflowDescription<mz_compute_types::plan::Plan, ()>,
754        target_replica: Option<ReplicaId>,
755    ) -> Result<(), DataflowCreationError> {
756        use DataflowCreationError::*;
757
758        let instance = self.instance(instance_id)?;
759
760        // Validation: target replica
761        if let Some(replica_id) = target_replica {
762            if !instance.replicas.contains(&replica_id) {
763                return Err(ReplicaMissing(replica_id));
764            }
765            assert!(
766                dataflow.exported_index_ids().next().is_none(),
767                "Replica-targeted indexes are not supported"
768            );
769        }
770
771        // Validation: as_of
772        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
773        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
774            return Err(EmptyAsOfForSubscribe);
775        }
776        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
777            return Err(EmptyAsOfForCopyTo);
778        }
779
780        // Validation: input collections
781        let storage_ids = dataflow.imported_source_ids().collect();
782        let mut import_read_holds = self.storage_collections.acquire_read_holds(storage_ids)?;
783        for id in dataflow.imported_index_ids() {
784            let read_hold = instance.acquire_read_hold(id)?;
785            import_read_holds.push(read_hold);
786        }
787        for hold in &import_read_holds {
788            if PartialOrder::less_than(as_of, hold.since()) {
789                return Err(SinceViolation(hold.id()));
790            }
791        }
792
793        // Validation: storage sink collections
794        for id in dataflow.persist_sink_ids() {
795            if self.storage_collections.check_exists(id).is_err() {
796                return Err(CollectionMissing(id));
797            }
798        }
799        let time_dependence = self
800            .determine_time_dependence(instance_id, &dataflow)
801            .expect("must exist");
802
803        let instance = self.instance_mut(instance_id).expect("validated");
804
805        let mut shared_collection_state = BTreeMap::new();
806        for id in dataflow.export_ids() {
807            let shared = SharedCollectionState::new(as_of.clone());
808            let collection = Collection {
809                write_only: dataflow.sink_exports.contains_key(&id),
810                compute_dependencies: dataflow.imported_index_ids().collect(),
811                shared: shared.clone(),
812                time_dependence: time_dependence.clone(),
813            };
814            instance.collections.insert(id, collection);
815            shared_collection_state.insert(id, shared);
816        }
817
818        dataflow.time_dependence = time_dependence;
819
820        instance.call(move |i| {
821            i.create_dataflow(
822                dataflow,
823                import_read_holds,
824                shared_collection_state,
825                target_replica,
826            )
827            .expect("validated")
828        });
829
830        Ok(())
831    }
832
833    /// Drop the read capability for the given collections and allow their resources to be
834    /// reclaimed.
835    pub fn drop_collections(
836        &mut self,
837        instance_id: ComputeInstanceId,
838        collection_ids: Vec<GlobalId>,
839    ) -> Result<(), CollectionUpdateError> {
840        let instance = self.instance_mut(instance_id)?;
841
842        // Validation
843        for id in &collection_ids {
844            instance.collection(*id)?;
845        }
846
847        for id in &collection_ids {
848            instance.collections.remove(id);
849        }
850
851        instance.call(|i| i.drop_collections(collection_ids).expect("validated"));
852
853        Ok(())
854    }
855
856    /// Initiate a peek request for the contents of the given collection at `timestamp`.
857    pub fn peek(
858        &self,
859        instance_id: ComputeInstanceId,
860        peek_target: PeekTarget,
861        literal_constraints: Option<Vec<Row>>,
862        uuid: Uuid,
863        timestamp: Timestamp,
864        result_desc: RelationDesc,
865        finishing: RowSetFinishing,
866        map_filter_project: mz_expr::SafeMfpPlan,
867        target_replica: Option<ReplicaId>,
868        peek_response_tx: oneshot::Sender<PeekResponse>,
869    ) -> Result<(), PeekError> {
870        use PeekError::*;
871
872        let instance = self.instance(instance_id)?;
873
874        // Validation: target replica
875        if let Some(replica_id) = target_replica {
876            if !instance.replicas.contains(&replica_id) {
877                return Err(ReplicaMissing(replica_id));
878            }
879        }
880
881        // Validation: peek target
882        let read_hold = match &peek_target {
883            PeekTarget::Index { id } => instance.acquire_read_hold(*id)?,
884            PeekTarget::Persist { id, .. } => self
885                .storage_collections
886                .acquire_read_holds(vec![*id])?
887                .into_element(),
888        };
889        if !read_hold.since().less_equal(&timestamp) {
890            return Err(SinceViolation(peek_target.id()));
891        }
892
893        instance.call(move |i| {
894            i.peek(
895                peek_target,
896                literal_constraints,
897                uuid,
898                timestamp,
899                result_desc,
900                finishing,
901                map_filter_project,
902                read_hold,
903                target_replica,
904                peek_response_tx,
905            )
906            .expect("validated")
907        });
908
909        Ok(())
910    }
911
912    /// Cancel an existing peek request.
913    ///
914    /// Canceling a peek is best effort. The caller may see any of the following
915    /// after canceling a peek request:
916    ///
917    ///   * A `PeekResponse::Rows` indicating that the cancellation request did
918    ///     not take effect in time and the query succeeded.
919    ///   * A `PeekResponse::Canceled` affirming that the peek was canceled.
920    ///   * No `PeekResponse` at all.
921    pub fn cancel_peek(
922        &self,
923        instance_id: ComputeInstanceId,
924        uuid: Uuid,
925        reason: PeekResponse,
926    ) -> Result<(), InstanceMissing> {
927        self.instance(instance_id)?
928            .call(move |i| i.cancel_peek(uuid, reason));
929        Ok(())
930    }
931
932    /// Assign a read policy to specific identifiers.
933    ///
934    /// The policies are assigned in the order presented, and repeated identifiers should
935    /// conclude with the last policy. Changing a policy will immediately downgrade the read
936    /// capability if appropriate, but it will not "recover" the read capability if the prior
937    /// capability is already ahead of it.
938    ///
939    /// Identifiers not present in `policies` retain their existing read policies.
940    ///
941    /// It is an error to attempt to set a read policy for a collection that is not readable in the
942    /// context of compute. At this time, only indexes are readable compute collections.
943    pub fn set_read_policy(
944        &self,
945        instance_id: ComputeInstanceId,
946        policies: Vec<(GlobalId, ReadPolicy)>,
947    ) -> Result<(), ReadPolicyError> {
948        use ReadPolicyError::*;
949
950        let instance = self.instance(instance_id)?;
951
952        // Validation
953        for (id, _) in &policies {
954            let collection = instance.collection(*id)?;
955            if collection.write_only {
956                return Err(WriteOnlyCollection(*id));
957            }
958        }
959
960        self.instance(instance_id)?
961            .call(|i| i.set_read_policy(policies).expect("validated"));
962
963        Ok(())
964    }
965
966    /// Acquires a [`ReadHold`] for the identified compute collection.
967    pub fn acquire_read_hold(
968        &self,
969        instance_id: ComputeInstanceId,
970        collection_id: GlobalId,
971    ) -> Result<ReadHold, CollectionUpdateError> {
972        let read_hold = self
973            .instance(instance_id)?
974            .acquire_read_hold(collection_id)?;
975        Ok(read_hold)
976    }
977
978    /// Determine the time dependence for a dataflow.
979    fn determine_time_dependence(
980        &self,
981        instance_id: ComputeInstanceId,
982        dataflow: &DataflowDescription<mz_compute_types::plan::Plan, ()>,
983    ) -> Result<Option<TimeDependence>, TimeDependenceError> {
984        // TODO(ct3): Continual tasks don't support replica expiration
985        let is_continual_task = dataflow.continual_task_ids().next().is_some();
986        if is_continual_task {
987            return Ok(None);
988        }
989
990        let instance = self
991            .instance(instance_id)
992            .map_err(|err| TimeDependenceError::InstanceMissing(err.0))?;
993        let mut time_dependencies = Vec::new();
994
995        for id in dataflow.imported_index_ids() {
996            let dependence = instance
997                .get_time_dependence(id)
998                .map_err(|err| TimeDependenceError::CollectionMissing(err.0))?;
999            time_dependencies.push(dependence);
1000        }
1001
1002        'source: for id in dataflow.imported_source_ids() {
1003            // We first check whether the id is backed by a compute object, in which case we use
1004            // the time dependence we know. This is true for materialized views, continual tasks,
1005            // etc.
1006            for instance in self.instances.values() {
1007                if let Ok(dependence) = instance.get_time_dependence(id) {
1008                    time_dependencies.push(dependence);
1009                    continue 'source;
1010                }
1011            }
1012
1013            // Not a compute object: Consult the storage collections controller.
1014            time_dependencies.push(self.storage_collections.determine_time_dependence(id)?);
1015        }
1016
1017        Ok(TimeDependence::merge(
1018            time_dependencies,
1019            dataflow.refresh_schedule.as_ref(),
1020        ))
1021    }
1022
1023    /// Processes the work queued by [`ComputeController::ready`].
1024    #[mz_ore::instrument(level = "debug")]
1025    pub fn process(&mut self) -> Option<ComputeControllerResponse> {
1026        // Perform periodic maintenance work.
1027        if self.maintenance_scheduled {
1028            self.maintain();
1029            self.maintenance_scheduled = false;
1030        }
1031
1032        // Return a ready response, if any.
1033        self.stashed_response.take()
1034    }
1035
1036    #[mz_ore::instrument(level = "debug")]
1037    fn maintain(&mut self) {
1038        // Perform instance maintenance work.
1039        for instance in self.instances.values_mut() {
1040            instance.call(Instance::maintain);
1041        }
1042    }
1043
1044    /// Allow writes for the specified collections on `instance_id`.
1045    ///
1046    /// If this controller is in read-only mode, this is a no-op.
1047    pub fn allow_writes(
1048        &mut self,
1049        instance_id: ComputeInstanceId,
1050        collection_id: GlobalId,
1051    ) -> Result<(), CollectionUpdateError> {
1052        if self.read_only {
1053            tracing::debug!("Skipping allow_writes in read-only mode");
1054            return Ok(());
1055        }
1056
1057        let instance = self.instance_mut(instance_id)?;
1058
1059        // Validation
1060        instance.collection(collection_id)?;
1061
1062        instance.call(move |i| i.allow_writes(collection_id).expect("validated"));
1063
1064        Ok(())
1065    }
1066}
1067
1068#[derive(Debug)]
1069struct InstanceState {
1070    client: InstanceClient,
1071    replicas: BTreeSet<ReplicaId>,
1072    collections: BTreeMap<GlobalId, Collection>,
1073}
1074
1075impl InstanceState {
1076    fn new(client: InstanceClient, collections: BTreeMap<GlobalId, Collection>) -> Self {
1077        Self {
1078            client,
1079            replicas: Default::default(),
1080            collections,
1081        }
1082    }
1083
1084    fn collection(&self, id: GlobalId) -> Result<&Collection, CollectionMissing> {
1085        self.collections.get(&id).ok_or(CollectionMissing(id))
1086    }
1087
1088    /// Calls the given function on the instance task. Does not await the result.
1089    ///
1090    /// # Panics
1091    ///
1092    /// Panics if the instance corresponding to `self` does not exist.
1093    fn call<F>(&self, f: F)
1094    where
1095        F: FnOnce(&mut Instance) + Send + 'static,
1096    {
1097        self.client.call(f).expect("instance not dropped")
1098    }
1099
1100    /// Calls the given function on the instance task, and awaits the result.
1101    ///
1102    /// # Panics
1103    ///
1104    /// Panics if the instance corresponding to `self` does not exist.
1105    async fn call_sync<F, R>(&self, f: F) -> R
1106    where
1107        F: FnOnce(&mut Instance) -> R + Send + 'static,
1108        R: Send + 'static,
1109    {
1110        self.client
1111            .call_sync(f)
1112            .await
1113            .expect("instance not dropped")
1114    }
1115
1116    /// Acquires a [`ReadHold`] for the identified compute collection.
1117    pub fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold, CollectionMissing> {
1118        // We acquire read holds at the earliest possible time rather than returning a copy
1119        // of the implied read hold. This is so that in `create_dataflow` we can acquire read holds
1120        // on compute dependencies at frontiers that are held back by other read holds the caller
1121        // has previously taken.
1122        //
1123        // If/when we change the compute API to expect callers to pass in the `ReadHold`s rather
1124        // than acquiring them ourselves, we might tighten this up and instead acquire read holds
1125        // at the implied capability.
1126
1127        let collection = self.collection(id)?;
1128        let since = collection.shared.lock_read_capabilities(|caps| {
1129            let since = caps.frontier().to_owned();
1130            caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
1131            since
1132        });
1133
1134        let hold = ReadHold::new(id, since, self.client.read_hold_tx());
1135        Ok(hold)
1136    }
1137
1138    /// Return the stored time dependence for a collection.
1139    fn get_time_dependence(
1140        &self,
1141        id: GlobalId,
1142    ) -> Result<Option<TimeDependence>, CollectionMissing> {
1143        Ok(self.collection(id)?.time_dependence.clone())
1144    }
1145
1146    /// Returns the [`InstanceState`] formatted as JSON.
1147    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1148        // Destructure `self` here so we don't forget to consider dumping newly added fields.
1149        let Self {
1150            client: _,
1151            replicas,
1152            collections,
1153        } = self;
1154
1155        let instance = self.call_sync(|i| i.dump()).await?;
1156        let replicas: Vec<_> = replicas.iter().map(|id| id.to_string()).collect();
1157        let collections: BTreeMap<_, _> = collections
1158            .iter()
1159            .map(|(id, c)| (id.to_string(), format!("{c:?}")))
1160            .collect();
1161
1162        Ok(serde_json::json!({
1163            "instance": instance,
1164            "replicas": replicas,
1165            "collections": collections,
1166        }))
1167    }
1168}
1169
1170#[derive(Debug)]
1171struct Collection {
1172    /// Whether a collection is write-only, i.e., we cannot read it directly like an index.
1173    write_only: bool,
1174    compute_dependencies: BTreeSet<GlobalId>,
1175    shared: SharedCollectionState,
1176    /// The computed time dependence for this collection. None indicates no specific information,
1177    /// a value describes how the collection relates to wall-clock time.
1178    time_dependence: Option<TimeDependence>,
1179}
1180
1181impl Collection {
1182    fn new_log() -> Self {
1183        let as_of = Antichain::from_elem(Timestamp::MIN);
1184        Self {
1185            write_only: false,
1186            compute_dependencies: Default::default(),
1187            shared: SharedCollectionState::new(as_of),
1188            time_dependence: Some(TimeDependence::default()),
1189        }
1190    }
1191
1192    fn frontiers(&self) -> CollectionFrontiers {
1193        let read_frontier = self
1194            .shared
1195            .lock_read_capabilities(|c| c.frontier().to_owned());
1196        let write_frontier = self.shared.lock_write_frontier(|f| f.clone());
1197        CollectionFrontiers {
1198            read_frontier,
1199            write_frontier,
1200        }
1201    }
1202}
1203
1204/// The frontiers of a compute collection.
1205#[derive(Clone, Debug)]
1206pub struct CollectionFrontiers {
1207    /// The read frontier.
1208    pub read_frontier: Antichain<Timestamp>,
1209    /// The write frontier.
1210    pub write_frontier: Antichain<Timestamp>,
1211}
1212
1213impl Default for CollectionFrontiers {
1214    fn default() -> Self {
1215        Self {
1216            read_frontier: Antichain::from_elem(Timestamp::MIN),
1217            write_frontier: Antichain::from_elem(Timestamp::MIN),
1218        }
1219    }
1220}