mz_compute_client/
controller.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller that provides an interface to the compute layer, and the storage layer below it.
11//!
12//! The compute controller manages the creation, maintenance, and removal of compute instances.
13//! This involves ensuring the intended service state with the orchestrator, as well as maintaining
14//! a dedicated compute instance controller for each active compute instance.
15//!
16//! For each compute instance, the compute controller curates the creation of indexes and sinks
17//! installed on the instance, the progress of readers through these collections, and their
18//! eventual dropping and resource reclamation.
19//!
20//! The state maintained for a compute instance can be viewed as a partial map from `GlobalId` to
21//! collection. It is an error to use an identifier before it has been "created" with
22//! `create_dataflow()`. Once created, the controller holds a read capability for each output
23//! collection of a dataflow, which is manipulated with `set_read_policy()`. Eventually, a
24//! collection is dropped with `drop_collections()`.
25//!
26//! Created dataflows will prevent the compaction of their inputs, including other compute
27//! collections but also collections managed by the storage layer. Each dataflow input is prevented
28//! from compacting beyond the allowed compaction of each of its outputs, ensuring that we can
29//! recover each dataflow to its current state in case of failure or other reconfiguration.
30
31use std::collections::{BTreeMap, BTreeSet};
32use std::num::NonZeroI64;
33use std::sync::{Arc, Mutex};
34use std::time::Duration;
35
36use futures::stream::FuturesUnordered;
37use futures::{FutureExt, StreamExt};
38use mz_build_info::BuildInfo;
39use mz_cluster_client::client::ClusterReplicaLocation;
40use mz_cluster_client::metrics::ControllerMetrics;
41use mz_cluster_client::{ReplicaId, WallclockLagFn};
42use mz_compute_types::ComputeInstanceId;
43use mz_compute_types::config::ComputeReplicaConfig;
44use mz_compute_types::dataflows::DataflowDescription;
45use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
46use mz_controller_types::dyncfgs::{
47    ENABLE_TIMELY_ZERO_COPY, ENABLE_TIMELY_ZERO_COPY_LGALLOC, TIMELY_ZERO_COPY_LIMIT,
48};
49use mz_dyncfg::ConfigSet;
50use mz_expr::RowSetFinishing;
51use mz_ore::cast::CastFrom;
52use mz_ore::collections::CollectionExt;
53use mz_ore::metrics::MetricsRegistry;
54use mz_ore::now::NowFn;
55use mz_ore::tracing::OpenTelemetryContext;
56use mz_repr::{Datum, GlobalId, Row, TimestampManipulation};
57use mz_storage_client::controller::StorageController;
58use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
59use mz_storage_types::read_holds::ReadHold;
60use mz_storage_types::read_policy::ReadPolicy;
61use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
62use prometheus::proto::LabelPair;
63use serde::{Deserialize, Serialize};
64use timely::PartialOrder;
65use timely::progress::{Antichain, Timestamp};
66use tokio::sync::{mpsc, oneshot};
67use tokio::time::{self, MissedTickBehavior};
68use tracing::debug_span;
69use uuid::Uuid;
70
71use crate::controller::error::{
72    CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
73    HydrationCheckBadTarget, InstanceExists, InstanceMissing, PeekError, ReadPolicyError,
74    ReplicaCreationError, ReplicaDropError,
75};
76use crate::controller::instance::{Instance, SharedCollectionState};
77use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection_sink};
78use crate::controller::replica::ReplicaConfig;
79use crate::logging::{LogVariant, LoggingConfig};
80use crate::metrics::ComputeControllerMetrics;
81use crate::protocol::command::{ComputeParameters, InitialComputeParameters, PeekTarget};
82use crate::protocol::response::{PeekResponse, SubscribeBatch};
83use crate::service::{ComputeClient, ComputeGrpcClient};
84
85mod instance;
86mod introspection;
87mod replica;
88mod sequential_hydration;
89
90pub mod error;
91
92pub(crate) type StorageCollections<T> = Arc<
93    dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T> + Send + Sync,
94>;
95
96/// A composite trait for types that serve as timestamps in the Compute Controller.
97/// `Into<Datum<'a>>` is needed for writing timestamps to introspection collections.
98pub trait ComputeControllerTimestamp: TimestampManipulation + Into<Datum<'static>> + Sync {}
99
100impl ComputeControllerTimestamp for mz_repr::Timestamp {}
101
102/// Responses from the compute controller.
103#[derive(Debug)]
104pub enum ComputeControllerResponse<T> {
105    /// See [`PeekNotification`].
106    PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
107    /// See [`crate::protocol::response::ComputeResponse::SubscribeResponse`].
108    SubscribeResponse(GlobalId, SubscribeBatch<T>),
109    /// The response from a dataflow containing an `CopyToS3Oneshot` sink.
110    ///
111    /// The `GlobalId` identifies the sink. The `Result` is the response from
112    /// the sink, where an `Ok(n)` indicates that `n` rows were successfully
113    /// copied to S3 and an `Err` indicates that an error was encountered
114    /// during the copy operation.
115    ///
116    /// For a given `CopyToS3Oneshot` sink, there will be at most one `CopyToResponse`
117    /// produced. (The sink may produce no responses if its dataflow is dropped
118    /// before completion.)
119    CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
120    /// A response reporting advancement of a collection's upper frontier.
121    ///
122    /// Once a collection's upper (aka "write frontier") has advanced to beyond a given time, the
123    /// contents of the collection as of that time have been sealed and cannot change anymore.
124    FrontierUpper {
125        /// The ID of a compute collection.
126        id: GlobalId,
127        /// The new upper frontier of the identified compute collection.
128        upper: Antichain<T>,
129    },
130}
131
132/// Notification and summary of a received and forwarded [`crate::protocol::response::ComputeResponse::PeekResponse`].
133#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
134pub enum PeekNotification {
135    /// Returned rows of a successful peek.
136    Success {
137        /// Number of rows in the returned peek result.
138        rows: u64,
139        /// Size of the returned peek result in bytes.
140        result_size: u64,
141    },
142    /// Error of an unsuccessful peek, including the reason for the error.
143    Error(String),
144    /// The peek was canceled.
145    Canceled,
146}
147
148impl PeekNotification {
149    /// Construct a new [`PeekNotification`] from a [`PeekResponse`]. The `offset` and `limit`
150    /// parameters are used to calculate the number of rows in the peek result.
151    fn new(peek_response: &PeekResponse, offset: usize, limit: Option<usize>) -> Self {
152        match peek_response {
153            PeekResponse::Rows(rows) => Self::Success {
154                rows: u64::cast_from(rows.count(offset, limit)),
155                result_size: u64::cast_from(rows.byte_len()),
156            },
157            PeekResponse::Error(err) => Self::Error(err.clone()),
158            PeekResponse::Canceled => Self::Canceled,
159        }
160    }
161}
162
163/// A controller for the compute layer.
164pub struct ComputeController<T: ComputeControllerTimestamp> {
165    instances: BTreeMap<ComputeInstanceId, InstanceState<T>>,
166    /// A map from an instance ID to an arbitrary string that describes the
167    /// class of the workload that compute instance is running (e.g.,
168    /// `production` or `staging`).
169    instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>,
170    build_info: &'static BuildInfo,
171    /// A handle providing access to storage collections.
172    storage_collections: StorageCollections<T>,
173    /// Set to `true` once `initialization_complete` has been called.
174    initialized: bool,
175    /// Whether or not this controller is in read-only mode.
176    ///
177    /// When in read-only mode, neither this controller nor the instances
178    /// controlled by it are allowed to affect changes to external systems
179    /// (largely persist).
180    read_only: bool,
181    /// Compute configuration to apply to new instances.
182    config: ComputeParameters,
183    /// Compute configuration to apply to new instances as part of the Timely initialization.
184    initial_config: InitialComputeParameters,
185    /// A controller response to be returned on the next call to [`ComputeController::process`].
186    stashed_response: Option<ComputeControllerResponse<T>>,
187    /// A number that increases on every `environmentd` restart.
188    envd_epoch: NonZeroI64,
189    /// The compute controller metrics.
190    metrics: ComputeControllerMetrics,
191    /// A function that produces the current wallclock time.
192    now: NowFn,
193    /// A function that computes the lag between the given time and wallclock time.
194    wallclock_lag: WallclockLagFn<T>,
195    /// Dynamic system configuration.
196    ///
197    /// Updated through `ComputeController::update_configuration` calls and shared with all
198    /// subcomponents of the compute controller.
199    dyncfg: Arc<ConfigSet>,
200
201    /// Receiver for responses produced by `Instance`s.
202    response_rx: mpsc::UnboundedReceiver<ComputeControllerResponse<T>>,
203    /// Response sender that's passed to new `Instance`s.
204    response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
205    /// Receiver for introspection updates produced by `Instance`s.
206    ///
207    /// When [`ComputeController::start_introspection_sink`] is first called, this receiver is
208    /// passed to the introspection sink task.
209    introspection_rx: Option<mpsc::UnboundedReceiver<IntrospectionUpdates>>,
210    /// Introspection updates sender that's passed to new `Instance`s.
211    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
212
213    /// Ticker for scheduling periodic maintenance work.
214    maintenance_ticker: tokio::time::Interval,
215    /// Whether maintenance work was scheduled.
216    maintenance_scheduled: bool,
217}
218
219impl<T: ComputeControllerTimestamp> ComputeController<T> {
220    /// Construct a new [`ComputeController`].
221    pub fn new(
222        build_info: &'static BuildInfo,
223        storage_collections: StorageCollections<T>,
224        envd_epoch: NonZeroI64,
225        read_only: bool,
226        metrics_registry: &MetricsRegistry,
227        controller_metrics: ControllerMetrics,
228        now: NowFn,
229        wallclock_lag: WallclockLagFn<T>,
230    ) -> Self {
231        let (response_tx, response_rx) = mpsc::unbounded_channel();
232        let (introspection_tx, introspection_rx) = mpsc::unbounded_channel();
233
234        let mut maintenance_ticker = time::interval(Duration::from_secs(1));
235        maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
236
237        let instance_workload_classes = Arc::new(Mutex::new(BTreeMap::<
238            ComputeInstanceId,
239            Option<String>,
240        >::new()));
241
242        // Apply a `workload_class` label to all metrics in the registry that
243        // have an `instance_id` label for an instance whose workload class is
244        // known.
245        metrics_registry.register_postprocessor({
246            let instance_workload_classes = Arc::clone(&instance_workload_classes);
247            move |metrics| {
248                let instance_workload_classes = instance_workload_classes
249                    .lock()
250                    .expect("lock poisoned")
251                    .iter()
252                    .map(|(id, workload_class)| (id.to_string(), workload_class.clone()))
253                    .collect::<BTreeMap<String, Option<String>>>();
254                for metric in metrics {
255                    'metric: for metric in metric.mut_metric() {
256                        for label in metric.get_label() {
257                            if label.get_name() == "instance_id" {
258                                if let Some(workload_class) = instance_workload_classes
259                                    .get(label.get_value())
260                                    .cloned()
261                                    .flatten()
262                                {
263                                    let mut label = LabelPair::default();
264                                    label.set_name("workload_class".into());
265                                    label.set_value(workload_class.clone());
266
267                                    let mut labels = metric.take_label();
268                                    labels.push(label);
269                                    metric.set_label(labels);
270                                }
271                                continue 'metric;
272                            }
273                        }
274                    }
275                }
276            }
277        });
278
279        let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics);
280
281        let initial_config = InitialComputeParameters {
282            arrangement_exert_proportionality: 16,
283            enable_zero_copy: false,
284            enable_zero_copy_lgalloc: false,
285            zero_copy_limit: None,
286        };
287
288        Self {
289            instances: BTreeMap::new(),
290            instance_workload_classes,
291            build_info,
292            storage_collections,
293            initialized: false,
294            read_only,
295            config: Default::default(),
296            initial_config,
297            stashed_response: None,
298            envd_epoch,
299            metrics,
300            now,
301            wallclock_lag,
302            dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()),
303            response_rx,
304            response_tx,
305            introspection_rx: Some(introspection_rx),
306            introspection_tx,
307            maintenance_ticker,
308            maintenance_scheduled: false,
309        }
310    }
311
312    /// Start sinking the compute controller's introspection data into storage.
313    ///
314    /// This method should be called once the introspection collections have been registered with
315    /// the storage controller. It will panic if invoked earlier than that.
316    pub fn start_introspection_sink(
317        &mut self,
318        storage_controller: &dyn StorageController<Timestamp = T>,
319    ) {
320        if let Some(rx) = self.introspection_rx.take() {
321            spawn_introspection_sink(rx, storage_controller);
322        }
323    }
324
325    /// TODO(database-issues#7533): Add documentation.
326    pub fn instance_exists(&self, id: ComputeInstanceId) -> bool {
327        self.instances.contains_key(&id)
328    }
329
330    /// Return a reference to the indicated compute instance.
331    fn instance(&self, id: ComputeInstanceId) -> Result<&InstanceState<T>, InstanceMissing> {
332        self.instances.get(&id).ok_or(InstanceMissing(id))
333    }
334
335    /// Return a mutable reference to the indicated compute instance.
336    fn instance_mut(
337        &mut self,
338        id: ComputeInstanceId,
339    ) -> Result<&mut InstanceState<T>, InstanceMissing> {
340        self.instances.get_mut(&id).ok_or(InstanceMissing(id))
341    }
342
343    /// List the IDs of all collections in the identified compute instance.
344    pub fn collection_ids(
345        &self,
346        instance_id: ComputeInstanceId,
347    ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
348        let instance = self.instance(instance_id)?;
349        let ids = instance.collections.keys().copied();
350        Ok(ids)
351    }
352
353    /// Return the frontiers of the indicated collection.
354    ///
355    /// If an `instance_id` is provided, the collection is assumed to be installed on that
356    /// instance. Otherwise all available instances are searched.
357    pub fn collection_frontiers(
358        &self,
359        collection_id: GlobalId,
360        instance_id: Option<ComputeInstanceId>,
361    ) -> Result<CollectionFrontiers<T>, CollectionLookupError> {
362        let collection = match instance_id {
363            Some(id) => self.instance(id)?.collection(collection_id)?,
364            None => self
365                .instances
366                .values()
367                .find_map(|i| i.collections.get(&collection_id))
368                .ok_or(CollectionMissing(collection_id))?,
369        };
370
371        Ok(collection.frontiers())
372    }
373
374    /// List compute collections that depend on the given collection.
375    pub fn collection_reverse_dependencies(
376        &self,
377        instance_id: ComputeInstanceId,
378        id: GlobalId,
379    ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
380        let instance = self.instance(instance_id)?;
381        let collections = instance.collections.iter();
382        let ids = collections
383            .filter_map(move |(cid, c)| c.compute_dependencies.contains(&id).then_some(*cid));
384        Ok(ids)
385    }
386
387    /// Set the `arrangement_exert_proportionality` value to be passed to new replicas.
388    pub fn set_arrangement_exert_proportionality(&mut self, value: u32) {
389        self.initial_config.arrangement_exert_proportionality = value;
390    }
391
392    /// Set the `enable_zero_copy` value to be passed to new replicas.
393    pub fn set_enable_zero_copy(&mut self, value: bool) {
394        self.initial_config.enable_zero_copy = value;
395    }
396
397    /// Set the `enable_zero_copy_lgalloc` value to be passed to new replicas.
398    pub fn set_enable_zero_copy_lgalloc(&mut self, value: bool) {
399        self.initial_config.enable_zero_copy_lgalloc = value;
400    }
401
402    /// Set the `zero_copy_limit` value to be passed to new replicas.
403    pub fn set_zero_copy_limit(&mut self, value: Option<usize>) {
404        self.initial_config.zero_copy_limit = value;
405    }
406
407    /// Returns `true` if all non-transient, non-excluded collections on all clusters have been
408    /// hydrated.
409    ///
410    /// For this check, zero-replica clusters are always considered hydrated.
411    /// Their collections would never normally be considered hydrated but it's
412    /// clearly intentional that they have no replicas.
413    pub async fn clusters_hydrated(&self, exclude_collections: &BTreeSet<GlobalId>) -> bool {
414        let instances = self.instances.iter();
415        let mut pending: FuturesUnordered<_> = instances
416            .map(|(id, instance)| {
417                let exclude_collections = exclude_collections.clone();
418                instance
419                    .call_sync(move |i| i.collections_hydrated(&exclude_collections))
420                    .map(move |x| (id, x))
421            })
422            .collect();
423
424        let mut result = true;
425        while let Some((id, hydrated)) = pending.next().await {
426            if !hydrated {
427                result = false;
428
429                // We continue with our loop instead of breaking out early, so
430                // that we log all non-hydrated clusters.
431                tracing::info!("cluster {id} is not hydrated");
432            }
433        }
434
435        result
436    }
437
438    /// Returns `true` iff the given collection has been hydrated.
439    ///
440    /// For this check, zero-replica clusters are always considered hydrated.
441    /// Their collections would never normally be considered hydrated but it's
442    /// clearly intentional that they have no replicas.
443    pub async fn collection_hydrated(
444        &self,
445        instance_id: ComputeInstanceId,
446        collection_id: GlobalId,
447    ) -> Result<bool, anyhow::Error> {
448        let instance = self.instance(instance_id)?;
449
450        let res = instance
451            .call_sync(move |i| i.collection_hydrated(collection_id))
452            .await?;
453
454        Ok(res)
455    }
456
457    /// Returns `true` if all non-transient, non-excluded collections are hydrated on any of the
458    /// provided replicas.
459    ///
460    /// For this check, zero-replica clusters are always considered hydrated.
461    /// Their collections would never normally be considered hydrated but it's
462    /// clearly intentional that they have no replicas.
463    pub fn collections_hydrated_for_replicas(
464        &self,
465        instance_id: ComputeInstanceId,
466        replicas: Vec<ReplicaId>,
467        exclude_collections: BTreeSet<GlobalId>,
468    ) -> Result<oneshot::Receiver<bool>, anyhow::Error> {
469        let instance = self.instance(instance_id)?;
470
471        // Validation
472        if instance.replicas.is_empty() && !replicas.iter().any(|id| instance.replicas.contains(id))
473        {
474            return Err(HydrationCheckBadTarget(replicas).into());
475        }
476
477        let (tx, rx) = oneshot::channel();
478        instance.call(move |i| {
479            let result = i
480                .collections_hydrated_on_replicas(Some(replicas), &exclude_collections)
481                .expect("validated");
482            let _ = tx.send(result);
483        });
484
485        Ok(rx)
486    }
487
488    /// Returns the state of the [`ComputeController`] formatted as JSON.
489    ///
490    /// The returned value is not guaranteed to be stable and may change at any point in time.
491    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
492        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
493        // returned object as a tradeoff between usability and stability. `serde_json` will fail
494        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
495        // prevents a future unrelated change from silently breaking this method.
496
497        // Destructure `self` here so we don't forget to consider dumping newly added fields.
498        let Self {
499            instances,
500            instance_workload_classes,
501            build_info: _,
502            storage_collections: _,
503            initialized,
504            read_only,
505            config: _,
506            initial_config,
507            stashed_response,
508            envd_epoch,
509            metrics: _,
510            now: _,
511            wallclock_lag: _,
512            dyncfg: _,
513            response_rx: _,
514            response_tx: _,
515            introspection_rx: _,
516            introspection_tx: _,
517            maintenance_ticker: _,
518            maintenance_scheduled,
519        } = self;
520
521        let mut instances_dump = BTreeMap::new();
522        for (id, instance) in instances {
523            let dump = instance.dump().await?;
524            instances_dump.insert(id.to_string(), dump);
525        }
526
527        let instance_workload_classes: BTreeMap<_, _> = instance_workload_classes
528            .lock()
529            .expect("lock poisoned")
530            .iter()
531            .map(|(id, wc)| (id.to_string(), format!("{wc:?}")))
532            .collect();
533
534        fn field(
535            key: &str,
536            value: impl Serialize,
537        ) -> Result<(String, serde_json::Value), anyhow::Error> {
538            let value = serde_json::to_value(value)?;
539            Ok((key.to_string(), value))
540        }
541
542        let map = serde_json::Map::from_iter([
543            field("instances", instances_dump)?,
544            field("instance_workload_classes", instance_workload_classes)?,
545            field("initialized", initialized)?,
546            field("read_only", read_only)?,
547            field("initial_config", initial_config)?,
548            field("stashed_response", format!("{stashed_response:?}"))?,
549            field("envd_epoch", envd_epoch)?,
550            field("maintenance_scheduled", maintenance_scheduled)?,
551        ]);
552        Ok(serde_json::Value::Object(map))
553    }
554}
555
556impl<T> ComputeController<T>
557where
558    T: ComputeControllerTimestamp,
559    ComputeGrpcClient: ComputeClient<T>,
560{
561    /// Create a compute instance.
562    pub fn create_instance(
563        &mut self,
564        id: ComputeInstanceId,
565        arranged_logs: BTreeMap<LogVariant, GlobalId>,
566        workload_class: Option<String>,
567    ) -> Result<(), InstanceExists> {
568        if self.instances.contains_key(&id) {
569            return Err(InstanceExists(id));
570        }
571
572        let mut collections = BTreeMap::new();
573        let mut logs = Vec::with_capacity(arranged_logs.len());
574        for (&log, &id) in &arranged_logs {
575            let collection = Collection::new_log();
576            let shared = collection.shared.clone();
577            collections.insert(id, collection);
578            logs.push((log, id, shared));
579        }
580
581        let client = instance::Client::spawn(
582            id,
583            self.build_info,
584            Arc::clone(&self.storage_collections),
585            logs,
586            self.envd_epoch,
587            self.metrics.for_instance(id),
588            self.now.clone(),
589            self.wallclock_lag.clone(),
590            Arc::clone(&self.dyncfg),
591            self.response_tx.clone(),
592            self.introspection_tx.clone(),
593        );
594
595        let instance = InstanceState::new(client, collections);
596        self.instances.insert(id, instance);
597
598        self.instance_workload_classes
599            .lock()
600            .expect("lock poisoned")
601            .insert(id, workload_class.clone());
602
603        let instance = self.instances.get_mut(&id).expect("instance just added");
604        if self.initialized {
605            instance.call(Instance::initialization_complete);
606        }
607
608        if !self.read_only {
609            instance.call(Instance::allow_writes);
610        }
611
612        let mut config_params = self.config.clone();
613        config_params.workload_class = Some(workload_class);
614        instance.call(|i| i.update_configuration(config_params));
615
616        Ok(())
617    }
618
619    /// Updates a compute instance's workload class.
620    pub fn update_instance_workload_class(
621        &mut self,
622        id: ComputeInstanceId,
623        workload_class: Option<String>,
624    ) -> Result<(), InstanceMissing> {
625        // Ensure that the instance exists first.
626        let _ = self.instance(id)?;
627
628        self.instance_workload_classes
629            .lock()
630            .expect("lock poisoned")
631            .insert(id, workload_class);
632
633        // Cause a config update to notify the instance about its new workload class.
634        self.update_configuration(Default::default());
635
636        Ok(())
637    }
638
639    /// Remove a compute instance.
640    ///
641    /// # Panics
642    ///
643    /// Panics if the identified `instance` still has active replicas.
644    pub fn drop_instance(&mut self, id: ComputeInstanceId) {
645        if let Some(instance) = self.instances.remove(&id) {
646            instance.call(|i| i.shutdown());
647        }
648
649        self.instance_workload_classes
650            .lock()
651            .expect("lock poisoned")
652            .remove(&id);
653    }
654
655    /// Returns the compute controller's config set.
656    pub fn dyncfg(&self) -> &Arc<ConfigSet> {
657        &self.dyncfg
658    }
659
660    /// Update compute configuration.
661    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
662        // Apply dyncfg updates.
663        config_params.dyncfg_updates.apply(&self.dyncfg);
664
665        // Update zero-copy settings.
666        self.set_enable_zero_copy(ENABLE_TIMELY_ZERO_COPY.get(&self.dyncfg));
667        self.set_enable_zero_copy_lgalloc(ENABLE_TIMELY_ZERO_COPY_LGALLOC.get(&self.dyncfg));
668        self.set_zero_copy_limit(TIMELY_ZERO_COPY_LIMIT.get(&self.dyncfg));
669
670        let instance_workload_classes = self
671            .instance_workload_classes
672            .lock()
673            .expect("lock poisoned");
674
675        // Forward updates to existing clusters.
676        // Workload classes are cluster-specific, so we need to overwrite them here.
677        for (id, instance) in self.instances.iter_mut() {
678            let mut params = config_params.clone();
679            params.workload_class = Some(instance_workload_classes[id].clone());
680            instance.call(|i| i.update_configuration(params));
681        }
682
683        let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(&self.dyncfg);
684        match overflowing_behavior.parse() {
685            Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
686            Err(err) => {
687                tracing::error!(
688                    err,
689                    overflowing_behavior,
690                    "Invalid value for ore_overflowing_behavior"
691                );
692            }
693        }
694
695        // Remember updates for future clusters.
696        self.config.update(config_params);
697    }
698
699    /// Mark the end of any initialization commands.
700    ///
701    /// The implementor may wait for this method to be called before implementing prior commands,
702    /// and so it is important for a user to invoke this method as soon as it is comfortable.
703    /// This method can be invoked immediately, at the potential expense of performance.
704    pub fn initialization_complete(&mut self) {
705        self.initialized = true;
706        for instance in self.instances.values_mut() {
707            instance.call(Instance::initialization_complete);
708        }
709    }
710
711    /// Wait until the controller is ready to do some processing.
712    ///
713    /// This method may block for an arbitrarily long time.
714    ///
715    /// When the method returns, the caller should call [`ComputeController::process`].
716    ///
717    /// This method is cancellation safe.
718    pub async fn ready(&mut self) {
719        if self.stashed_response.is_some() {
720            // We still have a response stashed, which we are immediately ready to process.
721            return;
722        }
723        if self.maintenance_scheduled {
724            // Maintenance work has been scheduled.
725            return;
726        }
727
728        tokio::select! {
729            resp = self.response_rx.recv() => {
730                let resp = resp.expect("`self.response_tx` not dropped");
731                self.stashed_response = Some(resp);
732            }
733            _ = self.maintenance_ticker.tick() => {
734                self.maintenance_scheduled = true;
735            },
736        }
737    }
738
739    /// Adds replicas of an instance.
740    pub fn add_replica_to_instance(
741        &mut self,
742        instance_id: ComputeInstanceId,
743        replica_id: ReplicaId,
744        location: ClusterReplicaLocation,
745        config: ComputeReplicaConfig,
746    ) -> Result<(), ReplicaCreationError> {
747        use ReplicaCreationError::*;
748
749        let instance = self.instance(instance_id)?;
750
751        // Validation
752        if instance.replicas.contains(&replica_id) {
753            return Err(ReplicaExists(replica_id));
754        }
755
756        let (enable_logging, interval) = match config.logging.interval {
757            Some(interval) => (true, interval),
758            None => (false, Duration::from_secs(1)),
759        };
760
761        let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.dyncfg);
762
763        let replica_config = ReplicaConfig {
764            location,
765            logging: LoggingConfig {
766                interval,
767                enable_logging,
768                log_logging: config.logging.log_logging,
769                index_logs: Default::default(),
770            },
771            grpc_client: self.config.grpc_client.clone(),
772            expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
773            initial_config: self.initial_config.clone(),
774        };
775
776        let instance = self.instance_mut(instance_id).expect("validated");
777        instance.replicas.insert(replica_id);
778
779        instance.call(move |i| {
780            i.add_replica(replica_id, replica_config)
781                .expect("validated")
782        });
783
784        Ok(())
785    }
786
787    /// Removes a replica from an instance, including its service in the orchestrator.
788    pub fn drop_replica(
789        &mut self,
790        instance_id: ComputeInstanceId,
791        replica_id: ReplicaId,
792    ) -> Result<(), ReplicaDropError> {
793        use ReplicaDropError::*;
794
795        let instance = self.instance_mut(instance_id)?;
796
797        // Validation
798        if !instance.replicas.contains(&replica_id) {
799            return Err(ReplicaMissing(replica_id));
800        }
801
802        instance.replicas.remove(&replica_id);
803
804        instance.call(move |i| i.remove_replica(replica_id).expect("validated"));
805
806        Ok(())
807    }
808
809    /// Creates the described dataflow and initializes state for its output.
810    ///
811    /// If a `subscribe_target_replica` is given, any subscribes exported by the dataflow are
812    /// configured to target that replica, i.e., only subscribe responses sent by that replica are
813    /// considered.
814    pub fn create_dataflow(
815        &mut self,
816        instance_id: ComputeInstanceId,
817        mut dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
818        subscribe_target_replica: Option<ReplicaId>,
819    ) -> Result<(), DataflowCreationError> {
820        use DataflowCreationError::*;
821
822        let instance = self.instance(instance_id)?;
823
824        // Validation: target replica
825        if let Some(replica_id) = subscribe_target_replica {
826            if !instance.replicas.contains(&replica_id) {
827                return Err(ReplicaMissing(replica_id));
828            }
829        }
830
831        // Validation: as_of
832        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
833        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
834            return Err(EmptyAsOfForSubscribe);
835        }
836        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
837            return Err(EmptyAsOfForCopyTo);
838        }
839
840        // Validation: input collections
841        let storage_ids = dataflow.imported_source_ids().collect();
842        let mut import_read_holds = self.storage_collections.acquire_read_holds(storage_ids)?;
843        for id in dataflow.imported_index_ids() {
844            let read_hold = instance.acquire_read_hold(id)?;
845            import_read_holds.push(read_hold);
846        }
847        for hold in &import_read_holds {
848            if PartialOrder::less_than(as_of, hold.since()) {
849                return Err(SinceViolation(hold.id()));
850            }
851        }
852
853        // Validation: storage sink collections
854        for id in dataflow.persist_sink_ids() {
855            if self.storage_collections.check_exists(id).is_err() {
856                return Err(CollectionMissing(id));
857            }
858        }
859        let time_dependence = self
860            .determine_time_dependence(instance_id, &dataflow)
861            .expect("must exist");
862
863        let instance = self.instance_mut(instance_id).expect("validated");
864
865        let mut shared_collection_state = BTreeMap::new();
866        for id in dataflow.export_ids() {
867            let shared = SharedCollectionState::new(as_of.clone());
868            let collection = Collection {
869                write_only: dataflow.sink_exports.contains_key(&id),
870                compute_dependencies: dataflow.imported_index_ids().collect(),
871                shared: shared.clone(),
872                time_dependence: time_dependence.clone(),
873            };
874            instance.collections.insert(id, collection);
875            shared_collection_state.insert(id, shared);
876        }
877
878        dataflow.time_dependence = time_dependence;
879
880        instance.call(move |i| {
881            i.create_dataflow(
882                dataflow,
883                import_read_holds,
884                subscribe_target_replica,
885                shared_collection_state,
886            )
887            .expect("validated")
888        });
889
890        Ok(())
891    }
892
893    /// Drop the read capability for the given collections and allow their resources to be
894    /// reclaimed.
895    pub fn drop_collections(
896        &mut self,
897        instance_id: ComputeInstanceId,
898        collection_ids: Vec<GlobalId>,
899    ) -> Result<(), CollectionUpdateError> {
900        let instance = self.instance_mut(instance_id)?;
901
902        // Validation
903        for id in &collection_ids {
904            instance.collection(*id)?;
905        }
906
907        for id in &collection_ids {
908            instance.collections.remove(id);
909        }
910
911        instance.call(|i| i.drop_collections(collection_ids).expect("validated"));
912
913        Ok(())
914    }
915
916    /// Initiate a peek request for the contents of the given collection at `timestamp`.
917    pub fn peek(
918        &self,
919        instance_id: ComputeInstanceId,
920        peek_target: PeekTarget,
921        literal_constraints: Option<Vec<Row>>,
922        uuid: Uuid,
923        timestamp: T,
924        finishing: RowSetFinishing,
925        map_filter_project: mz_expr::SafeMfpPlan,
926        target_replica: Option<ReplicaId>,
927        peek_response_tx: oneshot::Sender<PeekResponse>,
928    ) -> Result<(), PeekError> {
929        use PeekError::*;
930
931        let instance = self.instance(instance_id)?;
932
933        // Validation: target replica
934        if let Some(replica_id) = target_replica {
935            if !instance.replicas.contains(&replica_id) {
936                return Err(ReplicaMissing(replica_id));
937            }
938        }
939
940        // Validation: peek target
941        let read_hold = match &peek_target {
942            PeekTarget::Index { id } => instance.acquire_read_hold(*id)?,
943            PeekTarget::Persist { id, .. } => self
944                .storage_collections
945                .acquire_read_holds(vec![*id])?
946                .into_element(),
947        };
948        if !read_hold.since().less_equal(&timestamp) {
949            return Err(SinceViolation(peek_target.id()));
950        }
951
952        instance.call(move |i| {
953            i.peek(
954                peek_target,
955                literal_constraints,
956                uuid,
957                timestamp,
958                finishing,
959                map_filter_project,
960                read_hold,
961                target_replica,
962                peek_response_tx,
963            )
964            .expect("validated")
965        });
966
967        Ok(())
968    }
969
970    /// Cancel an existing peek request.
971    ///
972    /// Canceling a peek is best effort. The caller may see any of the following
973    /// after canceling a peek request:
974    ///
975    ///   * A `PeekResponse::Rows` indicating that the cancellation request did
976    ///     not take effect in time and the query succeeded.
977    ///   * A `PeekResponse::Canceled` affirming that the peek was canceled.
978    ///   * No `PeekResponse` at all.
979    pub fn cancel_peek(
980        &self,
981        instance_id: ComputeInstanceId,
982        uuid: Uuid,
983        reason: PeekResponse,
984    ) -> Result<(), InstanceMissing> {
985        self.instance(instance_id)?
986            .call(move |i| i.cancel_peek(uuid, reason));
987        Ok(())
988    }
989
990    /// Assign a read policy to specific identifiers.
991    ///
992    /// The policies are assigned in the order presented, and repeated identifiers should
993    /// conclude with the last policy. Changing a policy will immediately downgrade the read
994    /// capability if appropriate, but it will not "recover" the read capability if the prior
995    /// capability is already ahead of it.
996    ///
997    /// Identifiers not present in `policies` retain their existing read policies.
998    ///
999    /// It is an error to attempt to set a read policy for a collection that is not readable in the
1000    /// context of compute. At this time, only indexes are readable compute collections.
1001    pub fn set_read_policy(
1002        &self,
1003        instance_id: ComputeInstanceId,
1004        policies: Vec<(GlobalId, ReadPolicy<T>)>,
1005    ) -> Result<(), ReadPolicyError> {
1006        use ReadPolicyError::*;
1007
1008        let instance = self.instance(instance_id)?;
1009
1010        // Validation
1011        for (id, _) in &policies {
1012            let collection = instance.collection(*id)?;
1013            if collection.write_only {
1014                return Err(WriteOnlyCollection(*id));
1015            }
1016        }
1017
1018        self.instance(instance_id)?
1019            .call(|i| i.set_read_policy(policies).expect("validated"));
1020
1021        Ok(())
1022    }
1023
1024    /// Acquires a [`ReadHold`] for the identified compute collection.
1025    pub fn acquire_read_hold(
1026        &self,
1027        instance_id: ComputeInstanceId,
1028        collection_id: GlobalId,
1029    ) -> Result<ReadHold<T>, CollectionUpdateError> {
1030        let read_hold = self
1031            .instance(instance_id)?
1032            .acquire_read_hold(collection_id)?;
1033        Ok(read_hold)
1034    }
1035
1036    /// Determine the time dependence for a dataflow.
1037    fn determine_time_dependence(
1038        &self,
1039        instance_id: ComputeInstanceId,
1040        dataflow: &DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1041    ) -> Result<Option<TimeDependence>, TimeDependenceError> {
1042        // TODO(ct3): Continual tasks don't support replica expiration
1043        let is_continual_task = dataflow.continual_task_ids().next().is_some();
1044        if is_continual_task {
1045            return Ok(None);
1046        }
1047
1048        let instance = self
1049            .instance(instance_id)
1050            .map_err(|err| TimeDependenceError::InstanceMissing(err.0))?;
1051        let mut time_dependencies = Vec::new();
1052
1053        for id in dataflow.imported_index_ids() {
1054            let dependence = instance
1055                .get_time_dependence(id)
1056                .map_err(|err| TimeDependenceError::CollectionMissing(err.0))?;
1057            time_dependencies.push(dependence);
1058        }
1059
1060        'source: for id in dataflow.imported_source_ids() {
1061            // We first check whether the id is backed by a compute object, in which case we use
1062            // the time dependence we know. This is true for materialized views, continual tasks,
1063            // etc.
1064            for instance in self.instances.values() {
1065                if let Ok(dependence) = instance.get_time_dependence(id) {
1066                    time_dependencies.push(dependence);
1067                    continue 'source;
1068                }
1069            }
1070
1071            // Not a compute object: Consult the storage collections controller.
1072            time_dependencies.push(self.storage_collections.determine_time_dependence(id)?);
1073        }
1074
1075        Ok(TimeDependence::merge(
1076            time_dependencies,
1077            dataflow.refresh_schedule.as_ref(),
1078        ))
1079    }
1080
1081    /// Processes the work queued by [`ComputeController::ready`].
1082    #[mz_ore::instrument(level = "debug")]
1083    pub fn process(&mut self) -> Option<ComputeControllerResponse<T>> {
1084        // Perform periodic maintenance work.
1085        if self.maintenance_scheduled {
1086            self.maintain();
1087            self.maintenance_scheduled = false;
1088        }
1089
1090        // Return a ready response, if any.
1091        self.stashed_response.take()
1092    }
1093
1094    #[mz_ore::instrument(level = "debug")]
1095    fn maintain(&mut self) {
1096        // Perform instance maintenance work.
1097        for instance in self.instances.values_mut() {
1098            instance.call(Instance::maintain);
1099        }
1100    }
1101}
1102
1103#[derive(Debug)]
1104struct InstanceState<T: ComputeControllerTimestamp> {
1105    client: instance::Client<T>,
1106    replicas: BTreeSet<ReplicaId>,
1107    collections: BTreeMap<GlobalId, Collection<T>>,
1108}
1109
1110impl<T: ComputeControllerTimestamp> InstanceState<T> {
1111    fn new(client: instance::Client<T>, collections: BTreeMap<GlobalId, Collection<T>>) -> Self {
1112        Self {
1113            client,
1114            replicas: Default::default(),
1115            collections,
1116        }
1117    }
1118
1119    fn collection(&self, id: GlobalId) -> Result<&Collection<T>, CollectionMissing> {
1120        self.collections.get(&id).ok_or(CollectionMissing(id))
1121    }
1122
1123    fn call<F>(&self, f: F)
1124    where
1125        F: FnOnce(&mut Instance<T>) + Send + 'static,
1126    {
1127        let otel_ctx = OpenTelemetryContext::obtain();
1128        self.client
1129            .send(Box::new(move |instance| {
1130                let _span = debug_span!("instance::call").entered();
1131                otel_ctx.attach_as_parent();
1132
1133                f(instance)
1134            }))
1135            .expect("instance not dropped");
1136    }
1137
1138    async fn call_sync<F, R>(&self, f: F) -> R
1139    where
1140        F: FnOnce(&mut Instance<T>) -> R + Send + 'static,
1141        R: Send + 'static,
1142    {
1143        let (tx, rx) = oneshot::channel();
1144        let otel_ctx = OpenTelemetryContext::obtain();
1145        self.client
1146            .send(Box::new(move |instance| {
1147                let _span = debug_span!("instance::call_sync").entered();
1148                otel_ctx.attach_as_parent();
1149
1150                let result = f(instance);
1151                let _ = tx.send(result);
1152            }))
1153            .expect("instance not dropped");
1154
1155        rx.await.expect("instance not dropped")
1156    }
1157
1158    /// Acquires a [`ReadHold`] for the identified compute collection.
1159    pub fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
1160        // We acquire read holds at the earliest possible time rather than returning a copy
1161        // of the implied read hold. This is so that in `create_dataflow` we can acquire read holds
1162        // on compute dependencies at frontiers that are held back by other read holds the caller
1163        // has previously taken.
1164        //
1165        // If/when we change the compute API to expect callers to pass in the `ReadHold`s rather
1166        // than acquiring them ourselves, we might tighten this up and instead acquire read holds
1167        // at the implied capability.
1168
1169        let collection = self.collection(id)?;
1170        let since = collection.shared.lock_read_capabilities(|caps| {
1171            let since = caps.frontier().to_owned();
1172            caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
1173            since
1174        });
1175
1176        let hold = ReadHold::new(id, since, self.client.read_hold_tx());
1177        Ok(hold)
1178    }
1179
1180    /// Return the stored time dependence for a collection.
1181    fn get_time_dependence(
1182        &self,
1183        id: GlobalId,
1184    ) -> Result<Option<TimeDependence>, CollectionMissing> {
1185        Ok(self.collection(id)?.time_dependence.clone())
1186    }
1187
1188    /// Returns the [`InstanceState`] formatted as JSON.
1189    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1190        // Destructure `self` here so we don't forget to consider dumping newly added fields.
1191        let Self {
1192            client: _,
1193            replicas,
1194            collections,
1195        } = self;
1196
1197        let instance = self.call_sync(|i| i.dump()).await?;
1198        let replicas: Vec<_> = replicas.iter().map(|id| id.to_string()).collect();
1199        let collections: BTreeMap<_, _> = collections
1200            .iter()
1201            .map(|(id, c)| (id.to_string(), format!("{c:?}")))
1202            .collect();
1203
1204        fn field(
1205            key: &str,
1206            value: impl Serialize,
1207        ) -> Result<(String, serde_json::Value), anyhow::Error> {
1208            let value = serde_json::to_value(value)?;
1209            Ok((key.to_string(), value))
1210        }
1211
1212        let map = serde_json::Map::from_iter([
1213            field("instance", instance)?,
1214            field("replicas", replicas)?,
1215            field("collections", collections)?,
1216        ]);
1217        Ok(serde_json::Value::Object(map))
1218    }
1219}
1220
1221#[derive(Debug)]
1222struct Collection<T> {
1223    write_only: bool,
1224    compute_dependencies: BTreeSet<GlobalId>,
1225    shared: SharedCollectionState<T>,
1226    /// The computed time dependence for this collection. None indicates no specific information,
1227    /// a value describes how the collection relates to wall-clock time.
1228    time_dependence: Option<TimeDependence>,
1229}
1230
1231impl<T: Timestamp> Collection<T> {
1232    fn new_log() -> Self {
1233        let as_of = Antichain::from_elem(T::minimum());
1234        Self {
1235            write_only: false,
1236            compute_dependencies: Default::default(),
1237            shared: SharedCollectionState::new(as_of),
1238            time_dependence: Some(TimeDependence::default()),
1239        }
1240    }
1241
1242    fn frontiers(&self) -> CollectionFrontiers<T> {
1243        let read_frontier = self
1244            .shared
1245            .lock_read_capabilities(|c| c.frontier().to_owned());
1246        let write_frontier = self.shared.lock_write_frontier(|f| f.clone());
1247        CollectionFrontiers {
1248            read_frontier,
1249            write_frontier,
1250        }
1251    }
1252}
1253
1254/// The frontiers of a compute collection.
1255#[derive(Clone, Debug)]
1256pub struct CollectionFrontiers<T> {
1257    /// The read frontier.
1258    pub read_frontier: Antichain<T>,
1259    /// The write frontier.
1260    pub write_frontier: Antichain<T>,
1261}
1262
1263impl<T: Timestamp> Default for CollectionFrontiers<T> {
1264    fn default() -> Self {
1265        Self {
1266            read_frontier: Antichain::from_elem(T::minimum()),
1267            write_frontier: Antichain::from_elem(T::minimum()),
1268        }
1269    }
1270}