Skip to main content

mz_compute_client/
controller.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller that provides an interface to the compute layer, and the storage layer below it.
11//!
12//! The compute controller manages the creation, maintenance, and removal of compute instances.
13//! This involves ensuring the intended service state with the orchestrator, as well as maintaining
14//! a dedicated compute instance controller for each active compute instance.
15//!
16//! For each compute instance, the compute controller curates the creation of indexes and sinks
17//! installed on the instance, the progress of readers through these collections, and their
18//! eventual dropping and resource reclamation.
19//!
20//! The state maintained for a compute instance can be viewed as a partial map from `GlobalId` to
21//! collection. It is an error to use an identifier before it has been "created" with
22//! `create_dataflow()`. Once created, the controller holds a read capability for each output
23//! collection of a dataflow, which is manipulated with `set_read_policy()`. Eventually, a
24//! collection is dropped with `drop_collections()`.
25//!
26//! A dataflow can be in read-only or read-write mode. In read-only mode, the dataflow does not
27//! modify any persistent state. Sending a `allow_write` message to the compute instance will
28//! transition the dataflow to read-write mode, allowing it to write to persistent sinks.
29//!
30//! Created dataflows will prevent the compaction of their inputs, including other compute
31//! collections but also collections managed by the storage layer. Each dataflow input is prevented
32//! from compacting beyond the allowed compaction of each of its outputs, ensuring that we can
33//! recover each dataflow to its current state in case of failure or other reconfiguration.
34
35use std::collections::{BTreeMap, BTreeSet};
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use mz_build_info::BuildInfo;
40use mz_cluster_client::client::ClusterReplicaLocation;
41use mz_cluster_client::metrics::ControllerMetrics;
42use mz_cluster_client::{ReplicaId, WallclockLagFn};
43use mz_compute_types::ComputeInstanceId;
44use mz_compute_types::config::ComputeReplicaConfig;
45use mz_compute_types::dataflows::DataflowDescription;
46use mz_compute_types::dyncfgs::{
47    COMPUTE_REPLICA_EXPIRATION_OFFSET, ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA,
48};
49use mz_dyncfg::{ConfigSet, ConfigUpdates};
50use mz_expr::RowSetFinishing;
51use mz_expr::row::RowCollection;
52use mz_ore::cast::CastFrom;
53use mz_ore::metrics::MetricsRegistry;
54use mz_ore::now::NowFn;
55use mz_ore::tracing::OpenTelemetryContext;
56use mz_persist_types::PersistLocation;
57use mz_repr::{GlobalId, RelationDesc, Row, Timestamp};
58use mz_storage_client::controller::StorageController;
59use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
60use mz_storage_types::read_holds::ReadHold;
61use mz_storage_types::read_policy::ReadPolicy;
62use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
63use prometheus::proto::LabelPair;
64use serde::{Deserialize, Serialize};
65use timely::PartialOrder;
66use timely::progress::Antichain;
67use tokio::sync::{mpsc, oneshot};
68use tokio::time::{self, MissedTickBehavior};
69use uuid::Uuid;
70
71use crate::controller::error::{
72    CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
73    HydrationCheckBadTarget, InstanceExists, InstanceMissing, PeekError, ReadPolicyError,
74    ReplicaCreationError, ReplicaDropError,
75};
76use crate::controller::instance::{Instance, SharedCollectionState};
77use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection_sink};
78use crate::controller::replica::ReplicaConfig;
79use crate::logging::{LogVariant, LoggingConfig};
80use crate::metrics::ComputeControllerMetrics;
81use crate::protocol::command::{ComputeParameters, PeekTarget};
82use crate::protocol::response::{PeekResponse, SubscribeBatch};
83
84mod instance;
85mod introspection;
86mod replica;
87mod sequential_hydration;
88
89pub mod error;
90pub mod instance_client;
91pub use instance_client::InstanceClient;
92
93pub(crate) type StorageCollections =
94    Arc<dyn mz_storage_client::storage_collections::StorageCollections + Send + Sync>;
95
96/// Responses from the compute controller.
97#[derive(Debug)]
98pub enum ComputeControllerResponse {
99    /// See [`PeekNotification`].
100    PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
101    /// See [`crate::protocol::response::ComputeResponse::SubscribeResponse`].
102    SubscribeResponse(GlobalId, SubscribeBatch),
103    /// The response from a dataflow containing an `CopyToS3Oneshot` sink.
104    ///
105    /// The `GlobalId` identifies the sink. The `Result` is the response from
106    /// the sink, where an `Ok(n)` indicates that `n` rows were successfully
107    /// copied to S3 and an `Err` indicates that an error was encountered
108    /// during the copy operation.
109    ///
110    /// For a given `CopyToS3Oneshot` sink, there will be at most one `CopyToResponse`
111    /// produced. (The sink may produce no responses if its dataflow is dropped
112    /// before completion.)
113    CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
114    /// A response reporting advancement of a collection's upper frontier.
115    ///
116    /// Once a collection's upper (aka "write frontier") has advanced to beyond a given time, the
117    /// contents of the collection as of that time have been sealed and cannot change anymore.
118    FrontierUpper {
119        /// The ID of a compute collection.
120        id: GlobalId,
121        /// The new upper frontier of the identified compute collection.
122        upper: Antichain<Timestamp>,
123    },
124}
125
126/// Notification and summary of a received and forwarded [`crate::protocol::response::ComputeResponse::PeekResponse`].
127#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
128pub enum PeekNotification {
129    /// Returned rows of a successful peek.
130    Success {
131        /// Number of rows in the returned peek result.
132        rows: u64,
133        /// Size of the returned peek result in bytes.
134        result_size: u64,
135    },
136    /// Error of an unsuccessful peek, including the reason for the error.
137    Error(String),
138    /// The peek was canceled.
139    Canceled,
140}
141
142impl PeekNotification {
143    /// Construct a new [`PeekNotification`] from a [`PeekResponse`]. The `offset` and `limit`
144    /// parameters are used to calculate the number of rows in the peek result.
145    fn new(peek_response: &PeekResponse, offset: usize, limit: Option<usize>) -> Self {
146        match peek_response {
147            PeekResponse::Rows(rows) => {
148                let num_rows = u64::cast_from(RowCollection::offset_limit(
149                    rows.iter().map(|r| r.count()).sum(),
150                    offset,
151                    limit,
152                ));
153                let result_size = u64::cast_from(rows.iter().map(|r| r.byte_len()).sum::<usize>());
154
155                tracing::trace!(?num_rows, ?result_size, "inline peek result");
156
157                Self::Success {
158                    rows: num_rows,
159                    result_size,
160                }
161            }
162            PeekResponse::Stashed(stashed_response) => {
163                let rows = stashed_response.num_rows(offset, limit);
164                let result_size = stashed_response.size_bytes();
165
166                tracing::trace!(?rows, ?result_size, "stashed peek result");
167
168                Self::Success {
169                    rows: u64::cast_from(rows),
170                    result_size: u64::cast_from(result_size),
171                }
172            }
173            PeekResponse::Error(err) => Self::Error(err.clone()),
174            PeekResponse::Canceled => Self::Canceled,
175        }
176    }
177}
178
179/// A controller for the compute layer.
180pub struct ComputeController {
181    instances: BTreeMap<ComputeInstanceId, InstanceState>,
182    /// A map from an instance ID to an arbitrary string that describes the
183    /// class of the workload that compute instance is running (e.g.,
184    /// `production` or `staging`).
185    instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>,
186    build_info: &'static BuildInfo,
187    /// A handle providing access to storage collections.
188    storage_collections: StorageCollections,
189    /// Set to `true` once `initialization_complete` has been called.
190    initialized: bool,
191    /// Whether or not this controller is in read-only mode.
192    ///
193    /// When in read-only mode, neither this controller nor the instances
194    /// controlled by it are allowed to affect changes to external systems
195    /// (largely persist).
196    read_only: bool,
197    /// Compute configuration to apply to new instances.
198    config: ComputeParameters,
199    /// The persist location where we can stash large peek results.
200    peek_stash_persist_location: PersistLocation,
201    /// A controller response to be returned on the next call to [`ComputeController::process`].
202    stashed_response: Option<ComputeControllerResponse>,
203    /// The compute controller metrics.
204    metrics: ComputeControllerMetrics,
205    /// A function that produces the current wallclock time.
206    now: NowFn,
207    /// A function that computes the lag between the given time and wallclock time.
208    wallclock_lag: WallclockLagFn<Timestamp>,
209    /// Dynamic system configuration.
210    ///
211    /// Updated through `ComputeController::update_configuration` calls and shared with all
212    /// subcomponents of the compute controller.
213    dyncfg: Arc<ConfigSet>,
214
215    /// Receiver for responses produced by `Instance`s.
216    response_rx: mpsc::UnboundedReceiver<ComputeControllerResponse>,
217    /// Response sender that's passed to new `Instance`s.
218    response_tx: mpsc::UnboundedSender<ComputeControllerResponse>,
219    /// Receiver for introspection updates produced by `Instance`s.
220    ///
221    /// When [`ComputeController::start_introspection_sink`] is first called, this receiver is
222    /// passed to the introspection sink task.
223    introspection_rx: Option<mpsc::UnboundedReceiver<IntrospectionUpdates>>,
224    /// Introspection updates sender that's passed to new `Instance`s.
225    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
226
227    /// Ticker for scheduling periodic maintenance work.
228    maintenance_ticker: tokio::time::Interval,
229    /// Whether maintenance work was scheduled.
230    maintenance_scheduled: bool,
231}
232
233impl ComputeController {
234    /// Construct a new [`ComputeController`].
235    pub fn new(
236        build_info: &'static BuildInfo,
237        storage_collections: StorageCollections,
238        read_only: bool,
239        metrics_registry: &MetricsRegistry,
240        peek_stash_persist_location: PersistLocation,
241        controller_metrics: ControllerMetrics,
242        now: NowFn,
243        wallclock_lag: WallclockLagFn<Timestamp>,
244    ) -> Self {
245        let (response_tx, response_rx) = mpsc::unbounded_channel();
246        let (introspection_tx, introspection_rx) = mpsc::unbounded_channel();
247
248        let mut maintenance_ticker = time::interval(Duration::from_secs(1));
249        maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
250
251        let instance_workload_classes = Arc::new(Mutex::new(BTreeMap::<
252            ComputeInstanceId,
253            Option<String>,
254        >::new()));
255
256        // Apply a `workload_class` label to all metrics in the registry that
257        // have an `instance_id` label for an instance whose workload class is
258        // known.
259        metrics_registry.register_postprocessor({
260            let instance_workload_classes = Arc::clone(&instance_workload_classes);
261            move |metrics| {
262                let instance_workload_classes = instance_workload_classes
263                    .lock()
264                    .expect("lock poisoned")
265                    .iter()
266                    .map(|(id, workload_class)| (id.to_string(), workload_class.clone()))
267                    .collect::<BTreeMap<String, Option<String>>>();
268                for metric in metrics {
269                    'metric: for metric in metric.mut_metric() {
270                        for label in metric.get_label() {
271                            if label.name() == "instance_id" {
272                                if let Some(workload_class) = instance_workload_classes
273                                    .get(label.value())
274                                    .cloned()
275                                    .flatten()
276                                {
277                                    let mut label = LabelPair::default();
278                                    label.set_name("workload_class".into());
279                                    label.set_value(workload_class.clone());
280
281                                    let mut labels = metric.take_label();
282                                    labels.push(label);
283                                    metric.set_label(labels);
284                                }
285                                continue 'metric;
286                            }
287                        }
288                    }
289                }
290            }
291        });
292
293        let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics);
294
295        Self {
296            instances: BTreeMap::new(),
297            instance_workload_classes,
298            build_info,
299            storage_collections,
300            initialized: false,
301            read_only,
302            config: Default::default(),
303            peek_stash_persist_location,
304            stashed_response: None,
305            metrics,
306            now,
307            wallclock_lag,
308            dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()),
309            response_rx,
310            response_tx,
311            introspection_rx: Some(introspection_rx),
312            introspection_tx,
313            maintenance_ticker,
314            maintenance_scheduled: false,
315        }
316    }
317
318    /// Start sinking the compute controller's introspection data into storage.
319    ///
320    /// This method should be called once the introspection collections have been registered with
321    /// the storage controller. It will panic if invoked earlier than that.
322    pub fn start_introspection_sink(&mut self, storage_controller: &dyn StorageController) {
323        if let Some(rx) = self.introspection_rx.take() {
324            spawn_introspection_sink(rx, storage_controller);
325        }
326    }
327
328    /// TODO(database-issues#7533): Add documentation.
329    pub fn instance_exists(&self, id: ComputeInstanceId) -> bool {
330        self.instances.contains_key(&id)
331    }
332
333    /// Return a reference to the indicated compute instance.
334    fn instance(&self, id: ComputeInstanceId) -> Result<&InstanceState, InstanceMissing> {
335        self.instances.get(&id).ok_or(InstanceMissing(id))
336    }
337
338    /// Return an `InstanceClient` for the indicated compute instance.
339    pub fn instance_client(
340        &self,
341        id: ComputeInstanceId,
342    ) -> Result<InstanceClient, InstanceMissing> {
343        self.instance(id).map(|instance| instance.client.clone())
344    }
345
346    /// Return a mutable reference to the indicated compute instance.
347    fn instance_mut(
348        &mut self,
349        id: ComputeInstanceId,
350    ) -> Result<&mut InstanceState, InstanceMissing> {
351        self.instances.get_mut(&id).ok_or(InstanceMissing(id))
352    }
353
354    /// List the IDs of all collections in the identified compute instance.
355    pub fn collection_ids(
356        &self,
357        instance_id: ComputeInstanceId,
358    ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
359        let instance = self.instance(instance_id)?;
360        let ids = instance.collections.keys().copied();
361        Ok(ids)
362    }
363
364    /// Return the frontiers of the indicated collection.
365    ///
366    /// If an `instance_id` is provided, the collection is assumed to be installed on that
367    /// instance. Otherwise all available instances are searched.
368    pub fn collection_frontiers(
369        &self,
370        collection_id: GlobalId,
371        instance_id: Option<ComputeInstanceId>,
372    ) -> Result<CollectionFrontiers, CollectionLookupError> {
373        let collection = match instance_id {
374            Some(id) => self.instance(id)?.collection(collection_id)?,
375            None => self
376                .instances
377                .values()
378                .find_map(|i| i.collections.get(&collection_id))
379                .ok_or(CollectionMissing(collection_id))?,
380        };
381
382        Ok(collection.frontiers())
383    }
384
385    /// List compute collections that depend on the given collection.
386    pub fn collection_reverse_dependencies(
387        &self,
388        instance_id: ComputeInstanceId,
389        id: GlobalId,
390    ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
391        let instance = self.instance(instance_id)?;
392        let collections = instance.collections.iter();
393        let ids = collections
394            .filter_map(move |(cid, c)| c.compute_dependencies.contains(&id).then_some(*cid));
395        Ok(ids)
396    }
397
398    /// Returns `true` iff the given collection has been hydrated.
399    ///
400    /// For this check, zero-replica clusters are always considered hydrated.
401    /// Their collections would never normally be considered hydrated but it's
402    /// clearly intentional that they have no replicas.
403    pub async fn collection_hydrated(
404        &self,
405        instance_id: ComputeInstanceId,
406        collection_id: GlobalId,
407    ) -> Result<bool, anyhow::Error> {
408        let instance = self.instance(instance_id)?;
409
410        let res = instance
411            .call_sync(move |i| i.collection_hydrated(collection_id))
412            .await?;
413
414        Ok(res)
415    }
416
417    /// Returns `true` if all non-transient, non-excluded collections are hydrated on any of the
418    /// provided replicas.
419    ///
420    /// For this check, zero-replica clusters are always considered hydrated.
421    /// Their collections would never normally be considered hydrated but it's
422    /// clearly intentional that they have no replicas.
423    pub fn collections_hydrated_for_replicas(
424        &self,
425        instance_id: ComputeInstanceId,
426        replicas: Vec<ReplicaId>,
427        exclude_collections: BTreeSet<GlobalId>,
428    ) -> Result<oneshot::Receiver<bool>, anyhow::Error> {
429        let instance = self.instance(instance_id)?;
430
431        // Validation
432        if !instance.replicas.is_empty()
433            && !replicas.iter().any(|id| instance.replicas.contains(id))
434        {
435            return Err(HydrationCheckBadTarget(replicas).into());
436        }
437
438        let (tx, rx) = oneshot::channel();
439        instance.call(move |i| {
440            let result = i
441                .collections_hydrated_on_replicas(Some(replicas), &exclude_collections)
442                .expect("validated");
443            let _ = tx.send(result);
444        });
445
446        Ok(rx)
447    }
448
449    /// Returns the state of the [`ComputeController`] formatted as JSON.
450    ///
451    /// The returned value is not guaranteed to be stable and may change at any point in time.
452    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
453        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
454        // returned object as a tradeoff between usability and stability. `serde_json` will fail
455        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
456        // prevents a future unrelated change from silently breaking this method.
457
458        // Destructure `self` here so we don't forget to consider dumping newly added fields.
459        let Self {
460            instances,
461            instance_workload_classes,
462            build_info: _,
463            storage_collections: _,
464            initialized,
465            read_only,
466            config: _,
467            peek_stash_persist_location: _,
468            stashed_response,
469            metrics: _,
470            now: _,
471            wallclock_lag: _,
472            dyncfg: _,
473            response_rx: _,
474            response_tx: _,
475            introspection_rx: _,
476            introspection_tx: _,
477            maintenance_ticker: _,
478            maintenance_scheduled,
479        } = self;
480
481        let mut instances_dump = BTreeMap::new();
482        for (id, instance) in instances {
483            let dump = instance.dump().await?;
484            instances_dump.insert(id.to_string(), dump);
485        }
486
487        let instance_workload_classes: BTreeMap<_, _> = instance_workload_classes
488            .lock()
489            .expect("lock poisoned")
490            .iter()
491            .map(|(id, wc)| (id.to_string(), format!("{wc:?}")))
492            .collect();
493
494        Ok(serde_json::json!({
495            "instances": instances_dump,
496            "instance_workload_classes": instance_workload_classes,
497            "initialized": initialized,
498            "read_only": read_only,
499            "stashed_response": format!("{stashed_response:?}"),
500            "maintenance_scheduled": maintenance_scheduled,
501        }))
502    }
503}
504
505impl ComputeController {
506    /// Create a compute instance.
507    pub fn create_instance(
508        &mut self,
509        id: ComputeInstanceId,
510        arranged_logs: BTreeMap<LogVariant, GlobalId>,
511        workload_class: Option<String>,
512    ) -> Result<(), InstanceExists> {
513        if self.instances.contains_key(&id) {
514            return Err(InstanceExists(id));
515        }
516
517        let mut collections = BTreeMap::new();
518        let mut logs = Vec::with_capacity(arranged_logs.len());
519        for (&log, &id) in &arranged_logs {
520            let collection = Collection::new_log();
521            let shared = collection.shared.clone();
522            collections.insert(id, collection);
523            logs.push((log, id, shared));
524        }
525
526        let client = InstanceClient::spawn(
527            id,
528            self.build_info,
529            Arc::clone(&self.storage_collections),
530            self.peek_stash_persist_location.clone(),
531            logs,
532            self.metrics.for_instance(id),
533            self.now.clone(),
534            self.wallclock_lag.clone(),
535            Arc::clone(&self.dyncfg),
536            self.response_tx.clone(),
537            self.introspection_tx.clone(),
538            self.read_only,
539        );
540
541        let instance = InstanceState::new(client, collections);
542        self.instances.insert(id, instance);
543
544        self.instance_workload_classes
545            .lock()
546            .expect("lock poisoned")
547            .insert(id, workload_class.clone());
548
549        let instance = self.instances.get_mut(&id).expect("instance just added");
550        if self.initialized {
551            instance.call(Instance::initialization_complete);
552        }
553
554        let mut config_params = self.config.clone();
555        config_params.workload_class = Some(workload_class);
556        instance.call(|i| i.update_configuration(config_params));
557
558        Ok(())
559    }
560
561    /// Updates a compute instance's workload class.
562    pub fn update_instance_workload_class(
563        &mut self,
564        id: ComputeInstanceId,
565        workload_class: Option<String>,
566    ) -> Result<(), InstanceMissing> {
567        // Ensure that the instance exists first.
568        let _ = self.instance(id)?;
569
570        self.instance_workload_classes
571            .lock()
572            .expect("lock poisoned")
573            .insert(id, workload_class);
574
575        // Cause a config update to notify the instance about its new workload class.
576        self.update_configuration(Default::default());
577
578        Ok(())
579    }
580
581    /// Remove a compute instance.
582    ///
583    /// # Panics
584    ///
585    /// Panics if the identified `instance` still has active replicas.
586    pub fn drop_instance(&mut self, id: ComputeInstanceId) {
587        if let Some(instance) = self.instances.remove(&id) {
588            instance.call(|i| i.shutdown());
589        }
590
591        self.instance_workload_classes
592            .lock()
593            .expect("lock poisoned")
594            .remove(&id);
595    }
596
597    /// Returns the compute controller's config set.
598    pub fn dyncfg(&self) -> &Arc<ConfigSet> {
599        &self.dyncfg
600    }
601
602    /// Update compute configuration.
603    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
604        // Apply dyncfg updates.
605        config_params.dyncfg_updates.apply(&self.dyncfg);
606
607        let instance_workload_classes = self
608            .instance_workload_classes
609            .lock()
610            .expect("lock poisoned");
611
612        // Forward updates to existing clusters.
613        // Workload classes are cluster-specific, so we need to overwrite them here.
614        for (id, instance) in self.instances.iter_mut() {
615            let mut params = config_params.clone();
616            params.workload_class = Some(instance_workload_classes[id].clone());
617            instance.call(|i| i.update_configuration(params));
618        }
619
620        let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(&self.dyncfg);
621        match overflowing_behavior.parse() {
622            Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
623            Err(err) => {
624                tracing::error!(
625                    err,
626                    overflowing_behavior,
627                    "Invalid value for ore_overflowing_behavior"
628                );
629            }
630        }
631
632        // Remember updates for future clusters.
633        self.config.update(config_params);
634    }
635
636    /// Replaces the per-replica dyncfg overrides for the given instances.
637    ///
638    /// This only stores the overrides; callers should follow with a
639    /// configuration push (e.g. [`Self::update_configuration`]) so existing
640    /// replicas observe the new values. Instances absent from `overrides` have
641    /// their overrides cleared, so a replica that no longer has an override
642    /// reverts to the environment-wide configuration. Used by the scoped
643    /// feature flags (replica-local) layer.
644    pub fn update_replica_dyncfg_overrides(
645        &mut self,
646        mut overrides: BTreeMap<ComputeInstanceId, BTreeMap<ReplicaId, ConfigUpdates>>,
647    ) {
648        for (id, instance) in self.instances.iter_mut() {
649            let instance_overrides = overrides.remove(id).unwrap_or_default();
650            instance.call(move |i| i.update_replica_dyncfg_overrides(instance_overrides));
651        }
652    }
653
654    /// Mark the end of any initialization commands.
655    ///
656    /// The implementor may wait for this method to be called before implementing prior commands,
657    /// and so it is important for a user to invoke this method as soon as it is comfortable.
658    /// This method can be invoked immediately, at the potential expense of performance.
659    pub fn initialization_complete(&mut self) {
660        self.initialized = true;
661        for instance in self.instances.values_mut() {
662            instance.call(Instance::initialization_complete);
663        }
664    }
665
666    /// Wait until the controller is ready to do some processing.
667    ///
668    /// This method may block for an arbitrarily long time.
669    ///
670    /// When the method returns, the caller should call [`ComputeController::process`].
671    ///
672    /// This method is cancellation safe.
673    pub async fn ready(&mut self) {
674        if self.stashed_response.is_some() {
675            // We still have a response stashed, which we are immediately ready to process.
676            return;
677        }
678        if self.maintenance_scheduled {
679            // Maintenance work has been scheduled.
680            return;
681        }
682
683        tokio::select! {
684            resp = self.response_rx.recv() => {
685                let resp = resp.expect("`self.response_tx` not dropped");
686                self.stashed_response = Some(resp);
687            }
688            _ = self.maintenance_ticker.tick() => {
689                self.maintenance_scheduled = true;
690            },
691        }
692    }
693
694    /// Adds replicas of an instance.
695    pub fn add_replica_to_instance(
696        &mut self,
697        instance_id: ComputeInstanceId,
698        replica_id: ReplicaId,
699        location: ClusterReplicaLocation,
700        config: ComputeReplicaConfig,
701    ) -> Result<(), ReplicaCreationError> {
702        use ReplicaCreationError::*;
703
704        let instance = self.instance(instance_id)?;
705
706        // Validation
707        if instance.replicas.contains(&replica_id) {
708            return Err(ReplicaExists(replica_id));
709        }
710
711        let (enable_logging, interval) = match config.logging.interval {
712            Some(interval) => (true, interval),
713            None => (false, Duration::from_secs(1)),
714        };
715
716        let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.dyncfg);
717
718        // Capture dictionary compression once, at replica creation, and hold it fixed for the
719        // replica's lifetime (see `InstanceConfig::arrangement_dictionary_compression`). This is
720        // why a later flip of the flag only affects replicas created afterwards.
721        let arrangement_dictionary_compression =
722            ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA.get(&self.dyncfg);
723
724        let replica_config = ReplicaConfig {
725            location,
726            logging: LoggingConfig {
727                interval,
728                enable_logging,
729                log_logging: config.logging.log_logging,
730                index_logs: Default::default(),
731            },
732            grpc_client: self.config.grpc_client.clone(),
733            expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
734            arrangement_dictionary_compression,
735        };
736
737        let instance = self.instance_mut(instance_id).expect("validated");
738        instance.replicas.insert(replica_id);
739
740        instance.call(move |i| {
741            i.add_replica(replica_id, replica_config, None)
742                .expect("validated")
743        });
744
745        Ok(())
746    }
747
748    /// Removes a replica from an instance, including its service in the orchestrator.
749    pub fn drop_replica(
750        &mut self,
751        instance_id: ComputeInstanceId,
752        replica_id: ReplicaId,
753    ) -> Result<(), ReplicaDropError> {
754        use ReplicaDropError::*;
755
756        let instance = self.instance_mut(instance_id)?;
757
758        // Validation
759        if !instance.replicas.contains(&replica_id) {
760            return Err(ReplicaMissing(replica_id));
761        }
762
763        instance.replicas.remove(&replica_id);
764
765        instance.call(move |i| i.remove_replica(replica_id).expect("validated"));
766
767        Ok(())
768    }
769
770    /// Creates the described dataflow and initializes state for its output.
771    ///
772    /// Only materialized views and subscribes are allowed to have a `target_replica`.
773    ///
774    /// Panics if called with a dataflow description that has index exports
775    /// when `target_replica` is set.
776    pub fn create_dataflow(
777        &mut self,
778        instance_id: ComputeInstanceId,
779        mut dataflow: DataflowDescription<mz_compute_types::plan::Plan, ()>,
780        target_replica: Option<ReplicaId>,
781    ) -> Result<(), DataflowCreationError> {
782        use DataflowCreationError::*;
783
784        let instance = self.instance(instance_id)?;
785
786        // Validation: target replica
787        if let Some(replica_id) = target_replica {
788            if !instance.replicas.contains(&replica_id) {
789                return Err(ReplicaMissing(replica_id));
790            }
791            assert!(
792                dataflow.exported_index_ids().next().is_none(),
793                "Replica-targeted indexes are not supported"
794            );
795        }
796
797        // Validation: as_of
798        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
799        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
800            return Err(EmptyAsOfForSubscribe);
801        }
802        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
803            return Err(EmptyAsOfForCopyTo);
804        }
805
806        // Validation: input collections
807        let storage_ids = dataflow.imported_source_ids().collect();
808        let mut import_read_holds = self.storage_collections.acquire_read_holds(storage_ids)?;
809        for id in dataflow.imported_index_ids() {
810            let read_hold = instance.acquire_read_hold(id)?;
811            import_read_holds.push(read_hold);
812        }
813        for hold in &import_read_holds {
814            if PartialOrder::less_than(as_of, hold.since()) {
815                return Err(SinceViolation(hold.id()));
816            }
817        }
818
819        // Validation: storage sink collections
820        for id in dataflow.persist_sink_ids() {
821            if self.storage_collections.check_exists(id).is_err() {
822                return Err(CollectionMissing(id));
823            }
824        }
825        let time_dependence = self
826            .determine_time_dependence(instance_id, &dataflow)
827            .expect("must exist");
828
829        let instance = self.instance_mut(instance_id).expect("validated");
830
831        let mut shared_collection_state = BTreeMap::new();
832        for id in dataflow.export_ids() {
833            let shared = SharedCollectionState::new(as_of.clone());
834            let collection = Collection {
835                write_only: dataflow.sink_exports.contains_key(&id),
836                compute_dependencies: dataflow.imported_index_ids().collect(),
837                shared: shared.clone(),
838                time_dependence: time_dependence.clone(),
839            };
840            instance.collections.insert(id, collection);
841            shared_collection_state.insert(id, shared);
842        }
843
844        dataflow.time_dependence = time_dependence;
845
846        instance.call(move |i| {
847            i.create_dataflow(
848                dataflow,
849                import_read_holds,
850                shared_collection_state,
851                target_replica,
852            )
853            .expect("validated")
854        });
855
856        Ok(())
857    }
858
859    /// Drop the read capability for the given collections and allow their resources to be
860    /// reclaimed.
861    pub fn drop_collections(
862        &mut self,
863        instance_id: ComputeInstanceId,
864        collection_ids: Vec<GlobalId>,
865    ) -> Result<(), CollectionUpdateError> {
866        let instance = self.instance_mut(instance_id)?;
867
868        // Validation
869        for id in &collection_ids {
870            instance.collection(*id)?;
871        }
872
873        for id in &collection_ids {
874            instance.collections.remove(id);
875        }
876
877        instance.call(|i| i.drop_collections(collection_ids).expect("validated"));
878
879        Ok(())
880    }
881
882    /// Initiate a peek request for the contents of the given collection at `timestamp`.
883    ///
884    /// The caller supplies a `read_hold` for the peek target — via
885    /// [`ComputeController::acquire_read_hold`] for `PeekTarget::Index`, or via the storage
886    /// collections for `PeekTarget::Persist`. The hold keeps the collection's `since` at
887    /// `<= timestamp` until the peek completes.
888    pub fn peek(
889        &self,
890        instance_id: ComputeInstanceId,
891        peek_target: PeekTarget,
892        literal_constraints: Option<Vec<Row>>,
893        uuid: Uuid,
894        timestamp: Timestamp,
895        result_desc: RelationDesc,
896        finishing: RowSetFinishing,
897        map_filter_project: mz_expr::SafeMfpPlan,
898        read_hold: ReadHold,
899        target_replica: Option<ReplicaId>,
900        peek_response_tx: oneshot::Sender<PeekResponse>,
901    ) -> Result<(), PeekError> {
902        use PeekError::*;
903
904        let instance = self.instance(instance_id)?;
905
906        // Validation: target replica
907        if let Some(replica_id) = target_replica {
908            if !instance.replicas.contains(&replica_id) {
909                return Err(ReplicaMissing(replica_id));
910            }
911        }
912
913        // Validation: the read hold must target this collection and must hold its `since`
914        // at `<= timestamp`.
915        if read_hold.id() != peek_target.id() {
916            return Err(ReadHoldIdMismatch(read_hold.id()));
917        }
918        if !read_hold.since().less_equal(&timestamp) {
919            return Err(SinceViolation(peek_target.id()));
920        }
921
922        instance.call(move |i| {
923            i.peek(
924                peek_target,
925                literal_constraints,
926                uuid,
927                timestamp,
928                result_desc,
929                finishing,
930                map_filter_project,
931                read_hold,
932                target_replica,
933                peek_response_tx,
934            )
935            .expect("validated")
936        });
937
938        Ok(())
939    }
940
941    /// Cancel an existing peek request.
942    ///
943    /// Canceling a peek is best effort. The caller may see any of the following
944    /// after canceling a peek request:
945    ///
946    ///   * A `PeekResponse::Rows` indicating that the cancellation request did
947    ///     not take effect in time and the query succeeded.
948    ///   * A `PeekResponse::Canceled` affirming that the peek was canceled.
949    ///   * No `PeekResponse` at all.
950    pub fn cancel_peek(
951        &self,
952        instance_id: ComputeInstanceId,
953        uuid: Uuid,
954        reason: PeekResponse,
955    ) -> Result<(), InstanceMissing> {
956        self.instance(instance_id)?
957            .call(move |i| i.cancel_peek(uuid, reason));
958        Ok(())
959    }
960
961    /// Assign a read policy to specific identifiers.
962    ///
963    /// The policies are assigned in the order presented, and repeated identifiers should
964    /// conclude with the last policy. Changing a policy will immediately downgrade the read
965    /// capability if appropriate, but it will not "recover" the read capability if the prior
966    /// capability is already ahead of it.
967    ///
968    /// Identifiers not present in `policies` retain their existing read policies.
969    ///
970    /// It is an error to attempt to set a read policy for a collection that is not readable in the
971    /// context of compute. At this time, only indexes are readable compute collections.
972    pub fn set_read_policy(
973        &self,
974        instance_id: ComputeInstanceId,
975        policies: Vec<(GlobalId, ReadPolicy)>,
976    ) -> Result<(), ReadPolicyError> {
977        use ReadPolicyError::*;
978
979        let instance = self.instance(instance_id)?;
980
981        // Validation
982        for (id, _) in &policies {
983            let collection = instance.collection(*id)?;
984            if collection.write_only {
985                return Err(WriteOnlyCollection(*id));
986            }
987        }
988
989        self.instance(instance_id)?
990            .call(|i| i.set_read_policy(policies).expect("validated"));
991
992        Ok(())
993    }
994
995    /// Acquires a [`ReadHold`] for the identified compute collection.
996    pub fn acquire_read_hold(
997        &self,
998        instance_id: ComputeInstanceId,
999        collection_id: GlobalId,
1000    ) -> Result<ReadHold, CollectionUpdateError> {
1001        let read_hold = self
1002            .instance(instance_id)?
1003            .acquire_read_hold(collection_id)?;
1004        Ok(read_hold)
1005    }
1006
1007    /// Determine the time dependence for a dataflow.
1008    fn determine_time_dependence(
1009        &self,
1010        instance_id: ComputeInstanceId,
1011        dataflow: &DataflowDescription<mz_compute_types::plan::Plan, ()>,
1012    ) -> Result<Option<TimeDependence>, TimeDependenceError> {
1013        let instance = self
1014            .instance(instance_id)
1015            .map_err(|err| TimeDependenceError::InstanceMissing(err.0))?;
1016        let mut time_dependencies = Vec::new();
1017
1018        for id in dataflow.imported_index_ids() {
1019            let dependence = instance
1020                .get_time_dependence(id)
1021                .map_err(|err| TimeDependenceError::CollectionMissing(err.0))?;
1022            time_dependencies.push(dependence);
1023        }
1024
1025        'source: for id in dataflow.imported_source_ids() {
1026            // We first check whether the id is backed by a compute object, in which case we use
1027            // the time dependence we know. This is true for storage sinks.
1028            for instance in self.instances.values() {
1029                if let Ok(dependence) = instance.get_time_dependence(id) {
1030                    time_dependencies.push(dependence);
1031                    continue 'source;
1032                }
1033            }
1034
1035            // Not a compute object: Consult the storage collections controller.
1036            time_dependencies.push(self.storage_collections.determine_time_dependence(id)?);
1037        }
1038
1039        Ok(TimeDependence::merge(
1040            time_dependencies,
1041            dataflow.refresh_schedule.as_ref(),
1042        ))
1043    }
1044
1045    /// Processes the work queued by [`ComputeController::ready`].
1046    #[mz_ore::instrument(level = "debug")]
1047    pub fn process(&mut self) -> Option<ComputeControllerResponse> {
1048        // Perform periodic maintenance work.
1049        if self.maintenance_scheduled {
1050            self.maintain();
1051            self.maintenance_scheduled = false;
1052        }
1053
1054        // Return a ready response, if any.
1055        self.stashed_response.take()
1056    }
1057
1058    #[mz_ore::instrument(level = "debug")]
1059    fn maintain(&mut self) {
1060        // Perform instance maintenance work.
1061        for instance in self.instances.values_mut() {
1062            instance.call(Instance::maintain);
1063        }
1064    }
1065
1066    /// Allow writes for the specified collections on `instance_id`.
1067    ///
1068    /// If this controller is in read-only mode, this is a no-op.
1069    pub fn allow_writes(
1070        &mut self,
1071        instance_id: ComputeInstanceId,
1072        collection_id: GlobalId,
1073    ) -> Result<(), CollectionUpdateError> {
1074        if self.read_only {
1075            tracing::debug!("Skipping allow_writes in read-only mode");
1076            return Ok(());
1077        }
1078
1079        let instance = self.instance_mut(instance_id)?;
1080
1081        // Validation
1082        instance.collection(collection_id)?;
1083
1084        instance.call(move |i| i.allow_writes(collection_id).expect("validated"));
1085
1086        Ok(())
1087    }
1088}
1089
1090#[derive(Debug)]
1091struct InstanceState {
1092    client: InstanceClient,
1093    replicas: BTreeSet<ReplicaId>,
1094    collections: BTreeMap<GlobalId, Collection>,
1095}
1096
1097impl InstanceState {
1098    fn new(client: InstanceClient, collections: BTreeMap<GlobalId, Collection>) -> Self {
1099        Self {
1100            client,
1101            replicas: Default::default(),
1102            collections,
1103        }
1104    }
1105
1106    fn collection(&self, id: GlobalId) -> Result<&Collection, CollectionMissing> {
1107        self.collections.get(&id).ok_or(CollectionMissing(id))
1108    }
1109
1110    /// Calls the given function on the instance task. Does not await the result.
1111    ///
1112    /// # Panics
1113    ///
1114    /// Panics if the instance corresponding to `self` does not exist.
1115    fn call<F>(&self, f: F)
1116    where
1117        F: FnOnce(&mut Instance) + Send + 'static,
1118    {
1119        self.client.call(f).expect("instance not dropped")
1120    }
1121
1122    /// Calls the given function on the instance task, and awaits the result.
1123    ///
1124    /// # Panics
1125    ///
1126    /// Panics if the instance corresponding to `self` does not exist.
1127    async fn call_sync<F, R>(&self, f: F) -> R
1128    where
1129        F: FnOnce(&mut Instance) -> R + Send + 'static,
1130        R: Send + 'static,
1131    {
1132        self.client
1133            .call_sync(f)
1134            .await
1135            .expect("instance not dropped")
1136    }
1137
1138    /// Acquires a [`ReadHold`] for the identified compute collection.
1139    pub fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold, CollectionMissing> {
1140        // We acquire read holds at the earliest possible time rather than returning a copy
1141        // of the implied read hold. This is so that in `create_dataflow` we can acquire read holds
1142        // on compute dependencies at frontiers that are held back by other read holds the caller
1143        // has previously taken.
1144        //
1145        // If/when we change the compute API to expect callers to pass in the `ReadHold`s rather
1146        // than acquiring them ourselves, we might tighten this up and instead acquire read holds
1147        // at the implied capability.
1148
1149        let collection = self.collection(id)?;
1150        let since = collection.shared.lock_read_capabilities(|caps| {
1151            let since = caps.frontier().to_owned();
1152            caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
1153            since
1154        });
1155
1156        let hold = ReadHold::new(id, since, self.client.read_hold_tx());
1157        Ok(hold)
1158    }
1159
1160    /// Return the stored time dependence for a collection.
1161    fn get_time_dependence(
1162        &self,
1163        id: GlobalId,
1164    ) -> Result<Option<TimeDependence>, CollectionMissing> {
1165        Ok(self.collection(id)?.time_dependence.clone())
1166    }
1167
1168    /// Returns the [`InstanceState`] formatted as JSON.
1169    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1170        // Destructure `self` here so we don't forget to consider dumping newly added fields.
1171        let Self {
1172            client: _,
1173            replicas,
1174            collections,
1175        } = self;
1176
1177        let instance = self.call_sync(|i| i.dump()).await?;
1178        let replicas: Vec<_> = replicas.iter().map(|id| id.to_string()).collect();
1179        let collections: BTreeMap<_, _> = collections
1180            .iter()
1181            .map(|(id, c)| (id.to_string(), format!("{c:?}")))
1182            .collect();
1183
1184        Ok(serde_json::json!({
1185            "instance": instance,
1186            "replicas": replicas,
1187            "collections": collections,
1188        }))
1189    }
1190}
1191
1192#[derive(Debug)]
1193struct Collection {
1194    /// Whether a collection is write-only, i.e., we cannot read it directly like an index.
1195    write_only: bool,
1196    compute_dependencies: BTreeSet<GlobalId>,
1197    shared: SharedCollectionState,
1198    /// The computed time dependence for this collection. None indicates no specific information,
1199    /// a value describes how the collection relates to wall-clock time.
1200    time_dependence: Option<TimeDependence>,
1201}
1202
1203impl Collection {
1204    fn new_log() -> Self {
1205        let as_of = Antichain::from_elem(Timestamp::MIN);
1206        Self {
1207            write_only: false,
1208            compute_dependencies: Default::default(),
1209            shared: SharedCollectionState::new(as_of),
1210            time_dependence: Some(TimeDependence::default()),
1211        }
1212    }
1213
1214    fn frontiers(&self) -> CollectionFrontiers {
1215        let read_frontier = self
1216            .shared
1217            .lock_read_capabilities(|c| c.frontier().to_owned());
1218        let write_frontier = self.shared.lock_write_frontier(|f| f.clone());
1219        CollectionFrontiers {
1220            read_frontier,
1221            write_frontier,
1222        }
1223    }
1224}
1225
1226/// The frontiers of a compute collection.
1227#[derive(Clone, Debug)]
1228pub struct CollectionFrontiers {
1229    /// The read frontier.
1230    pub read_frontier: Antichain<Timestamp>,
1231    /// The write frontier.
1232    pub write_frontier: Antichain<Timestamp>,
1233}
1234
1235impl Default for CollectionFrontiers {
1236    fn default() -> Self {
1237        Self {
1238            read_frontier: Antichain::from_elem(Timestamp::MIN),
1239            write_frontier: Antichain::from_elem(Timestamp::MIN),
1240        }
1241    }
1242}