mz_compute_client/controller/
instance.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller for a compute instance.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::ComputeInstanceId;
21use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
22use mz_compute_types::plan::render_plan::RenderPlan;
23use mz_compute_types::sinks::{
24    ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, MaterializedViewSinkConnection,
25};
26use mz_compute_types::sources::SourceInstanceDesc;
27use mz_controller_types::dyncfgs::{
28    ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, WALLCLOCK_LAG_RECORDING_INTERVAL,
29};
30use mz_dyncfg::ConfigSet;
31use mz_expr::RowSetFinishing;
32use mz_ore::cast::CastFrom;
33use mz_ore::channel::instrumented_unbounded_channel;
34use mz_ore::now::NowFn;
35use mz_ore::tracing::OpenTelemetryContext;
36use mz_ore::{soft_assert_or_log, soft_panic_or_log};
37use mz_persist_types::PersistLocation;
38use mz_repr::adt::timestamp::CheckedTimestamp;
39use mz_repr::refresh_schedule::RefreshSchedule;
40use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row};
41use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
42use mz_storage_types::read_holds::{self, ReadHold};
43use mz_storage_types::read_policy::ReadPolicy;
44use serde::Serialize;
45use thiserror::Error;
46use timely::PartialOrder;
47use timely::progress::frontier::MutableAntichain;
48use timely::progress::{Antichain, ChangeBatch, Timestamp};
49use tokio::sync::mpsc::error::SendError;
50use tokio::sync::{mpsc, oneshot};
51use tracing::debug_span;
52use uuid::Uuid;
53
54use crate::controller::error::{
55    CollectionLookupError, CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
56};
57use crate::controller::replica::{ReplicaClient, ReplicaConfig};
58use crate::controller::{
59    ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, PeekNotification,
60    ReplicaId, StorageCollections,
61};
62use crate::logging::LogVariant;
63use crate::metrics::IntCounter;
64use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
65use crate::protocol::command::{
66    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
67};
68use crate::protocol::history::ComputeCommandHistory;
69use crate::protocol::response::{
70    ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StatusResponse,
71    SubscribeBatch, SubscribeResponse,
72};
73
74#[derive(Error, Debug)]
75#[error("replica exists already: {0}")]
76pub(super) struct ReplicaExists(pub ReplicaId);
77
78#[derive(Error, Debug)]
79#[error("replica does not exist: {0}")]
80pub(super) struct ReplicaMissing(pub ReplicaId);
81
82#[derive(Error, Debug)]
83pub(super) enum DataflowCreationError {
84    #[error("collection does not exist: {0}")]
85    CollectionMissing(GlobalId),
86    #[error("replica does not exist: {0}")]
87    ReplicaMissing(ReplicaId),
88    #[error("dataflow definition lacks an as_of value")]
89    MissingAsOf,
90    #[error("subscribe dataflow has an empty as_of")]
91    EmptyAsOfForSubscribe,
92    #[error("copy to dataflow has an empty as_of")]
93    EmptyAsOfForCopyTo,
94    #[error("no read hold provided for dataflow import: {0}")]
95    ReadHoldMissing(GlobalId),
96    #[error("insufficient read hold provided for dataflow import: {0}")]
97    ReadHoldInsufficient(GlobalId),
98}
99
100impl From<CollectionMissing> for DataflowCreationError {
101    fn from(error: CollectionMissing) -> Self {
102        Self::CollectionMissing(error.0)
103    }
104}
105
106#[derive(Error, Debug)]
107#[error("the instance has shut down")]
108pub(super) struct InstanceShutDown;
109
110/// Errors arising during peek processing.
111#[derive(Error, Debug)]
112pub enum PeekError {
113    /// The replica that the peek was issued against does not exist.
114    #[error("replica does not exist: {0}")]
115    ReplicaMissing(ReplicaId),
116    /// The read hold that was passed in is against the wrong collection.
117    #[error("read hold ID does not match peeked collection: {0}")]
118    ReadHoldIdMismatch(GlobalId),
119    /// The read hold that was passed in is for a later time than the peek's timestamp.
120    #[error("insufficient read hold provided: {0}")]
121    ReadHoldInsufficient(GlobalId),
122    /// The peek's target instance has shut down.
123    #[error("the instance has shut down")]
124    InstanceShutDown,
125}
126
127impl From<InstanceShutDown> for PeekError {
128    fn from(_error: InstanceShutDown) -> Self {
129        Self::InstanceShutDown
130    }
131}
132
133#[derive(Error, Debug)]
134pub(super) enum ReadPolicyError {
135    #[error("collection does not exist: {0}")]
136    CollectionMissing(GlobalId),
137    #[error("collection is write-only: {0}")]
138    WriteOnlyCollection(GlobalId),
139}
140
141impl From<CollectionMissing> for ReadPolicyError {
142    fn from(error: CollectionMissing) -> Self {
143        Self::CollectionMissing(error.0)
144    }
145}
146
147/// A command sent to an [`Instance`] task.
148type Command<T> = Box<dyn FnOnce(&mut Instance<T>) + Send>;
149
150/// A client for an `Instance` task.
151#[derive(Clone, derivative::Derivative)]
152#[derivative(Debug)]
153pub struct Client<T: ComputeControllerTimestamp> {
154    /// A sender for commands for the instance.
155    command_tx: mpsc::UnboundedSender<Command<T>>,
156    /// A sender for read hold changes for collections installed on the instance.
157    #[derivative(Debug = "ignore")]
158    read_hold_tx: read_holds::ChangeTx<T>,
159}
160
161impl<T: ComputeControllerTimestamp> Client<T> {
162    pub(super) fn read_hold_tx(&self) -> read_holds::ChangeTx<T> {
163        Arc::clone(&self.read_hold_tx)
164    }
165
166    /// Call a method to be run on the instance task, by sending a message to the instance.
167    /// Does not wait for a response message.
168    pub(super) fn call<F>(&self, f: F) -> Result<(), InstanceShutDown>
169    where
170        F: FnOnce(&mut Instance<T>) + Send + 'static,
171    {
172        let otel_ctx = OpenTelemetryContext::obtain();
173        self.command_tx
174            .send(Box::new(move |instance| {
175                let _span = debug_span!("instance::call").entered();
176                otel_ctx.attach_as_parent();
177
178                f(instance)
179            }))
180            .map_err(|_send_error| InstanceShutDown)
181    }
182
183    /// Call a method to be run on the instance task, by sending a message to the instance and
184    /// waiting for a response message.
185    pub(super) async fn call_sync<F, R>(&self, f: F) -> Result<R, InstanceShutDown>
186    where
187        F: FnOnce(&mut Instance<T>) -> R + Send + 'static,
188        R: Send + 'static,
189    {
190        let (tx, rx) = oneshot::channel();
191        let otel_ctx = OpenTelemetryContext::obtain();
192        self.command_tx
193            .send(Box::new(move |instance| {
194                let _span = debug_span!("instance::call_sync").entered();
195                otel_ctx.attach_as_parent();
196                let result = f(instance);
197                let _ = tx.send(result);
198            }))
199            .map_err(|_send_error| InstanceShutDown)?;
200
201        rx.await.map_err(|_| InstanceShutDown)
202    }
203}
204
205impl<T> Client<T>
206where
207    T: ComputeControllerTimestamp,
208{
209    pub(super) fn spawn(
210        id: ComputeInstanceId,
211        build_info: &'static BuildInfo,
212        storage: StorageCollections<T>,
213        peek_stash_persist_location: PersistLocation,
214        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
215        metrics: InstanceMetrics,
216        now: NowFn,
217        wallclock_lag: WallclockLagFn<T>,
218        dyncfg: Arc<ConfigSet>,
219        response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
220        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
221        read_only: bool,
222    ) -> Self {
223        let (command_tx, command_rx) = mpsc::unbounded_channel();
224
225        let read_hold_tx: read_holds::ChangeTx<_> = {
226            let command_tx = command_tx.clone();
227            Arc::new(move |id, change: ChangeBatch<_>| {
228                let cmd: Command<_> = {
229                    let change = change.clone();
230                    Box::new(move |i| i.apply_read_hold_change(id, change.clone()))
231                };
232                command_tx.send(cmd).map_err(|_| SendError((id, change)))
233            })
234        };
235
236        mz_ore::task::spawn(
237            || format!("compute-instance-{id}"),
238            Instance::new(
239                build_info,
240                storage,
241                peek_stash_persist_location,
242                arranged_logs,
243                metrics,
244                now,
245                wallclock_lag,
246                dyncfg,
247                command_rx,
248                response_tx,
249                Arc::clone(&read_hold_tx),
250                introspection_tx,
251                read_only,
252            )
253            .run(),
254        );
255
256        Self {
257            command_tx,
258            read_hold_tx,
259        }
260    }
261
262    /// Acquires a `ReadHold` and collection write frontier for each of the identified compute
263    /// collections.
264    pub async fn acquire_read_holds_and_collection_write_frontiers(
265        &self,
266        ids: Vec<GlobalId>,
267    ) -> Result<Vec<(GlobalId, ReadHold<T>, Antichain<T>)>, CollectionLookupError> {
268        self.call_sync(move |i| {
269            let mut result = Vec::new();
270            for id in ids.into_iter() {
271                result.push((
272                    id,
273                    i.acquire_read_hold(id)?,
274                    i.collection_write_frontier(id)?,
275                ));
276            }
277            Ok(result)
278        })
279        .await?
280    }
281
282    /// Issue a peek by calling into the instance task.
283    pub async fn peek(
284        &self,
285        peek_target: PeekTarget,
286        literal_constraints: Option<Vec<Row>>,
287        uuid: Uuid,
288        timestamp: T,
289        result_desc: RelationDesc,
290        finishing: RowSetFinishing,
291        map_filter_project: mz_expr::SafeMfpPlan,
292        target_read_hold: ReadHold<T>,
293        target_replica: Option<ReplicaId>,
294        peek_response_tx: oneshot::Sender<PeekResponse>,
295    ) -> Result<(), PeekError> {
296        self.call_sync(move |i| {
297            i.peek(
298                peek_target,
299                literal_constraints,
300                uuid,
301                timestamp,
302                result_desc,
303                finishing,
304                map_filter_project,
305                target_read_hold,
306                target_replica,
307                peek_response_tx,
308            )
309        })
310        .await?
311    }
312}
313
314/// A response from a replica, composed of a replica ID, the replica's current epoch, and the
315/// compute response itself.
316pub(super) type ReplicaResponse<T> = (ReplicaId, u64, ComputeResponse<T>);
317
318/// The state we keep for a compute instance.
319pub(super) struct Instance<T: ComputeControllerTimestamp> {
320    /// Build info for spawning replicas
321    build_info: &'static BuildInfo,
322    /// A handle providing access to storage collections.
323    storage_collections: StorageCollections<T>,
324    /// Whether instance initialization has been completed.
325    initialized: bool,
326    /// Whether this instance is in read-only mode.
327    ///
328    /// When in read-only mode, this instance will not update persistent state, such as
329    /// wallclock lag introspection.
330    read_only: bool,
331    /// The workload class of this instance.
332    ///
333    /// This is currently only used to annotate metrics.
334    workload_class: Option<String>,
335    /// The replicas of this compute instance.
336    replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
337    /// Currently installed compute collections.
338    ///
339    /// New entries are added for all collections exported from dataflows created through
340    /// [`Instance::create_dataflow`].
341    ///
342    /// Entries are removed by [`Instance::cleanup_collections`]. See that method's documentation
343    /// about the conditions for removing collection state.
344    collections: BTreeMap<GlobalId, CollectionState<T>>,
345    /// IDs of log sources maintained by this compute instance.
346    log_sources: BTreeMap<LogVariant, GlobalId>,
347    /// Currently outstanding peeks.
348    ///
349    /// New entries are added for all peeks initiated through [`Instance::peek`].
350    ///
351    /// The entry for a peek is only removed once all replicas have responded to the peek. This is
352    /// currently required to ensure all replicas have stopped reading from the peeked collection's
353    /// inputs before we allow them to compact. database-issues#4822 tracks changing this so we only have to wait
354    /// for the first peek response.
355    peeks: BTreeMap<Uuid, PendingPeek<T>>,
356    /// Currently in-progress subscribes.
357    ///
358    /// New entries are added for all subscribes exported from dataflows created through
359    /// [`Instance::create_dataflow`].
360    ///
361    /// The entry for a subscribe is removed once at least one replica has reported the subscribe
362    /// to have advanced to the empty frontier or to have been dropped, implying that no further
363    /// updates will be emitted for this subscribe.
364    ///
365    /// Note that subscribes are tracked both in `collections` and `subscribes`. `collections`
366    /// keeps track of the subscribe's upper and since frontiers and ensures appropriate read holds
367    /// on the subscribe's input. `subscribes` is only used to track which updates have been
368    /// emitted, to decide if new ones should be emitted or suppressed.
369    subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>,
370    /// Tracks all in-progress COPY TOs.
371    ///
372    /// New entries are added for all s3 oneshot sinks (corresponding to a COPY TO) exported from
373    /// dataflows created through [`Instance::create_dataflow`].
374    ///
375    /// The entry for a copy to is removed once at least one replica has finished
376    /// or the exporting collection is dropped.
377    copy_tos: BTreeSet<GlobalId>,
378    /// The command history, used when introducing new replicas or restarting existing replicas.
379    history: ComputeCommandHistory<UIntGauge, T>,
380    /// Receiver for commands to be executed.
381    command_rx: mpsc::UnboundedReceiver<Command<T>>,
382    /// Sender for responses to be delivered.
383    response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
384    /// Sender for introspection updates to be recorded.
385    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
386    /// The registry the controller uses to report metrics.
387    metrics: InstanceMetrics,
388    /// Dynamic system configuration.
389    dyncfg: Arc<ConfigSet>,
390
391    /// The persist location where we can stash large peek results.
392    peek_stash_persist_location: PersistLocation,
393
394    /// A function that produces the current wallclock time.
395    now: NowFn,
396    /// A function that computes the lag between the given time and wallclock time.
397    wallclock_lag: WallclockLagFn<T>,
398    /// The last time wallclock lag introspection was recorded.
399    wallclock_lag_last_recorded: DateTime<Utc>,
400
401    /// Sender for updates to collection read holds.
402    ///
403    /// Copies of this sender are given to [`ReadHold`]s that are created in
404    /// [`CollectionState::new`].
405    read_hold_tx: read_holds::ChangeTx<T>,
406    /// A sender for responses from replicas.
407    replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
408    /// A receiver for responses from replicas.
409    replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse<T>, IntCounter>,
410}
411
412impl<T: ComputeControllerTimestamp> Instance<T> {
413    /// Acquire a handle to the collection state associated with `id`.
414    fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, CollectionMissing> {
415        self.collections.get(&id).ok_or(CollectionMissing(id))
416    }
417
418    /// Acquire a mutable handle to the collection state associated with `id`.
419    fn collection_mut(
420        &mut self,
421        id: GlobalId,
422    ) -> Result<&mut CollectionState<T>, CollectionMissing> {
423        self.collections.get_mut(&id).ok_or(CollectionMissing(id))
424    }
425
426    /// Acquire a handle to the collection state associated with `id`.
427    ///
428    /// # Panics
429    ///
430    /// Panics if the identified collection does not exist.
431    fn expect_collection(&self, id: GlobalId) -> &CollectionState<T> {
432        self.collections.get(&id).expect("collection must exist")
433    }
434
435    /// Acquire a mutable handle to the collection state associated with `id`.
436    ///
437    /// # Panics
438    ///
439    /// Panics if the identified collection does not exist.
440    fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T> {
441        self.collections
442            .get_mut(&id)
443            .expect("collection must exist")
444    }
445
446    fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState<T>)> {
447        self.collections.iter().map(|(id, coll)| (*id, coll))
448    }
449
450    /// Add a collection to the instance state.
451    ///
452    /// # Panics
453    ///
454    /// Panics if a collection with the same ID exists already.
455    fn add_collection(
456        &mut self,
457        id: GlobalId,
458        as_of: Antichain<T>,
459        shared: SharedCollectionState<T>,
460        storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
461        compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
462        replica_input_read_holds: Vec<ReadHold<T>>,
463        write_only: bool,
464        storage_sink: bool,
465        initial_as_of: Option<Antichain<T>>,
466        refresh_schedule: Option<RefreshSchedule>,
467    ) {
468        // Add global collection state.
469        let introspection = CollectionIntrospection::new(
470            id,
471            self.introspection_tx.clone(),
472            as_of.clone(),
473            storage_sink,
474            initial_as_of,
475            refresh_schedule,
476        );
477        let mut state = CollectionState::new(
478            id,
479            as_of.clone(),
480            shared,
481            storage_dependencies,
482            compute_dependencies,
483            Arc::clone(&self.read_hold_tx),
484            introspection,
485        );
486        // If the collection is write-only, clear its read policy to reflect that.
487        if write_only {
488            state.read_policy = None;
489        }
490
491        if let Some(previous) = self.collections.insert(id, state) {
492            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
493        }
494
495        // Add per-replica collection state.
496        for replica in self.replicas.values_mut() {
497            replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
498        }
499
500        // Update introspection.
501        self.report_dependency_updates(id, Diff::ONE);
502    }
503
504    fn remove_collection(&mut self, id: GlobalId) {
505        // Update introspection.
506        self.report_dependency_updates(id, Diff::MINUS_ONE);
507
508        // Remove per-replica collection state.
509        for replica in self.replicas.values_mut() {
510            replica.remove_collection(id);
511        }
512
513        // Remove global collection state.
514        self.collections.remove(&id);
515    }
516
517    fn add_replica_state(
518        &mut self,
519        id: ReplicaId,
520        client: ReplicaClient<T>,
521        config: ReplicaConfig,
522        epoch: u64,
523    ) {
524        let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
525
526        let metrics = self.metrics.for_replica(id);
527        let mut replica = ReplicaState::new(
528            id,
529            client,
530            config,
531            metrics,
532            self.introspection_tx.clone(),
533            epoch,
534        );
535
536        // Add per-replica collection state.
537        for (collection_id, collection) in &self.collections {
538            // Skip log collections not maintained by this replica.
539            if collection.log_collection && !log_ids.contains(collection_id) {
540                continue;
541            }
542
543            let as_of = if collection.log_collection {
544                // For log collections, we don't send a `CreateDataflow` command to the replica, so
545                // it doesn't know which as-of the controler chose and defaults to the minimum
546                // frontier instead. We need to initialize the controller-side tracking with the
547                // same frontier, to avoid observing regressions in the reported frontiers.
548                Antichain::from_elem(T::minimum())
549            } else {
550                collection.read_frontier().to_owned()
551            };
552
553            let input_read_holds = collection.storage_dependencies.values().cloned().collect();
554            replica.add_collection(*collection_id, as_of, input_read_holds);
555        }
556
557        self.replicas.insert(id, replica);
558    }
559
560    /// Enqueue the given response for delivery to the controller clients.
561    fn deliver_response(&self, response: ComputeControllerResponse<T>) {
562        // Failure to send means the `ComputeController` has been dropped and doesn't care about
563        // responses anymore.
564        let _ = self.response_tx.send(response);
565    }
566
567    /// Enqueue the given introspection updates for recording.
568    fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
569        // Failure to send means the `ComputeController` has been dropped and doesn't care about
570        // introspection updates anymore.
571        let _ = self.introspection_tx.send((type_, updates));
572    }
573
574    /// Returns whether the identified replica exists.
575    fn replica_exists(&self, id: ReplicaId) -> bool {
576        self.replicas.contains_key(&id)
577    }
578
579    /// Return the IDs of pending peeks targeting the specified replica.
580    fn peeks_targeting(
581        &self,
582        replica_id: ReplicaId,
583    ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)> {
584        self.peeks.iter().filter_map(move |(uuid, peek)| {
585            if peek.target_replica == Some(replica_id) {
586                Some((*uuid, peek))
587            } else {
588                None
589            }
590        })
591    }
592
593    /// Return the IDs of in-progress subscribes targeting the specified replica.
594    fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
595        self.subscribes.iter().filter_map(move |(id, subscribe)| {
596            let targeting = subscribe.target_replica == Some(replica_id);
597            targeting.then_some(*id)
598        })
599    }
600
601    /// Update introspection with the current collection frontiers.
602    ///
603    /// We could also do this directly in response to frontier changes, but doing it periodically
604    /// lets us avoid emitting some introspection updates that can be consolidated (e.g. a write
605    /// frontier updated immediately followed by a read frontier update).
606    ///
607    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
608    /// per second during normal operation.
609    fn update_frontier_introspection(&mut self) {
610        for collection in self.collections.values_mut() {
611            collection
612                .introspection
613                .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
614        }
615
616        for replica in self.replicas.values_mut() {
617            for collection in replica.collections.values_mut() {
618                collection
619                    .introspection
620                    .observe_frontier(&collection.write_frontier);
621            }
622        }
623    }
624
625    /// Refresh the controller state metrics for this instance.
626    ///
627    /// We could also do state metric updates directly in response to state changes, but that would
628    /// mean littering the code with metric update calls. Encapsulating state metric maintenance in
629    /// a single method is less noisy.
630    ///
631    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
632    /// per second during normal operation.
633    fn refresh_state_metrics(&self) {
634        let unscheduled_collections_count =
635            self.collections.values().filter(|c| !c.scheduled).count();
636        let connected_replica_count = self
637            .replicas
638            .values()
639            .filter(|r| r.client.is_connected())
640            .count();
641
642        self.metrics
643            .replica_count
644            .set(u64::cast_from(self.replicas.len()));
645        self.metrics
646            .collection_count
647            .set(u64::cast_from(self.collections.len()));
648        self.metrics
649            .collection_unscheduled_count
650            .set(u64::cast_from(unscheduled_collections_count));
651        self.metrics
652            .peek_count
653            .set(u64::cast_from(self.peeks.len()));
654        self.metrics
655            .subscribe_count
656            .set(u64::cast_from(self.subscribes.len()));
657        self.metrics
658            .copy_to_count
659            .set(u64::cast_from(self.copy_tos.len()));
660        self.metrics
661            .connected_replica_count
662            .set(u64::cast_from(connected_replica_count));
663    }
664
665    /// Refresh the wallclock lag introspection and metrics with the current lag values.
666    ///
667    /// This method produces wallclock lag metrics of two different shapes:
668    ///
669    /// * Histories: For each replica and each collection, we measure the lag of the write frontier
670    ///   behind the wallclock time every second. Every minute we emit the maximum lag observed
671    ///   over the last minute, together with the current time.
672    /// * Histograms: For each collection, we measure the lag of the write frontier behind
673    ///   wallclock time every second. Every minute we emit all lags observed over the last minute,
674    ///   together with the current histogram period.
675    ///
676    /// Histories are emitted to both Mz introspection and Prometheus, histograms only to
677    /// introspection. We treat lags of unreadable collections (i.e. collections that contain no
678    /// readable times) as undefined and set them to NULL in introspection and `u64::MAX` in
679    /// Prometheus.
680    ///
681    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
682    /// per second during normal operation.
683    fn refresh_wallclock_lag(&mut self) {
684        let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
685            Some(ts) => (self.wallclock_lag)(ts.clone()),
686            None => Duration::ZERO,
687        };
688
689        let now_ms = (self.now)();
690        let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
691        let histogram_labels = match &self.workload_class {
692            Some(wc) => [("workload_class", wc.clone())].into(),
693            None => BTreeMap::new(),
694        };
695
696        // First, iterate over all collections and collect histogram measurements.
697        // We keep a record of unreadable collections, so we can emit undefined lags for those here
698        // and below when we collect history measurements.
699        let mut unreadable_collections = BTreeSet::new();
700        for (id, collection) in &mut self.collections {
701            // We need to ask the storage controller for the read frontiers of storage collections.
702            let read_frontier = match self.storage_collections.collection_frontiers(*id) {
703                Ok(f) => f.read_capabilities,
704                Err(_) => collection.read_frontier(),
705            };
706            let write_frontier = collection.write_frontier();
707            let collection_unreadable = PartialOrder::less_equal(&write_frontier, &read_frontier);
708            if collection_unreadable {
709                unreadable_collections.insert(id);
710            }
711
712            if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
713                let bucket = if collection_unreadable {
714                    WallclockLag::Undefined
715                } else {
716                    let lag = frontier_lag(&write_frontier);
717                    let lag = lag.as_secs().next_power_of_two();
718                    WallclockLag::Seconds(lag)
719                };
720
721                let key = (histogram_period, bucket, histogram_labels.clone());
722                *stash.entry(key).or_default() += Diff::ONE;
723            }
724        }
725
726        // Second, iterate over all per-replica collections and collect history measurements.
727        for replica in self.replicas.values_mut() {
728            for (id, collection) in &mut replica.collections {
729                let lag = if unreadable_collections.contains(&id) {
730                    WallclockLag::Undefined
731                } else {
732                    let lag = frontier_lag(&collection.write_frontier);
733                    WallclockLag::Seconds(lag.as_secs())
734                };
735
736                if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
737                    *wallclock_lag_max = (*wallclock_lag_max).max(lag);
738                }
739
740                if let Some(metrics) = &mut collection.metrics {
741                    // No way to specify values as undefined in Prometheus metrics, so we use the
742                    // maximum value instead.
743                    let secs = lag.unwrap_seconds_or(u64::MAX);
744                    metrics.wallclock_lag.observe(secs);
745                };
746            }
747        }
748
749        // Record lags to persist, if it's time.
750        self.maybe_record_wallclock_lag();
751    }
752
753    /// Produce new wallclock lag introspection updates, provided enough time has passed since the
754    /// last recording.
755    //
756    /// We emit new introspection updates if the system time has passed into a new multiple of the
757    /// recording interval (typically 1 minute) since the last refresh. The storage controller uses
758    /// the same approach, ensuring that both controllers commit their lags at roughly the same
759    /// time, avoiding confusion caused by inconsistencies.
760    fn maybe_record_wallclock_lag(&mut self) {
761        if self.read_only {
762            return;
763        }
764
765        let duration_trunc = |datetime: DateTime<_>, interval| {
766            let td = TimeDelta::from_std(interval).ok()?;
767            datetime.duration_trunc(td).ok()
768        };
769
770        let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
771        let now_dt = mz_ore::now::to_datetime((self.now)());
772        let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
773            soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
774            let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
775            duration_trunc(now_dt, *default).unwrap()
776        });
777        if now_trunc <= self.wallclock_lag_last_recorded {
778            return;
779        }
780
781        let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
782
783        let mut history_updates = Vec::new();
784        for (replica_id, replica) in &mut self.replicas {
785            for (collection_id, collection) in &mut replica.collections {
786                let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
787                    continue;
788                };
789
790                let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
791                let row = Row::pack_slice(&[
792                    Datum::String(&collection_id.to_string()),
793                    Datum::String(&replica_id.to_string()),
794                    max_lag.into_interval_datum(),
795                    Datum::TimestampTz(now_ts),
796                ]);
797                history_updates.push((row, Diff::ONE));
798            }
799        }
800        if !history_updates.is_empty() {
801            self.deliver_introspection_updates(
802                IntrospectionType::WallclockLagHistory,
803                history_updates,
804            );
805        }
806
807        let mut histogram_updates = Vec::new();
808        let mut row_buf = Row::default();
809        for (collection_id, collection) in &mut self.collections {
810            let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
811                continue;
812            };
813
814            for ((period, lag, labels), count) in std::mem::take(stash) {
815                let mut packer = row_buf.packer();
816                packer.extend([
817                    Datum::TimestampTz(period.start),
818                    Datum::TimestampTz(period.end),
819                    Datum::String(&collection_id.to_string()),
820                    lag.into_uint64_datum(),
821                ]);
822                let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
823                packer.push_dict(labels);
824
825                histogram_updates.push((row_buf.clone(), count));
826            }
827        }
828        if !histogram_updates.is_empty() {
829            self.deliver_introspection_updates(
830                IntrospectionType::WallclockLagHistogram,
831                histogram_updates,
832            );
833        }
834
835        self.wallclock_lag_last_recorded = now_trunc;
836    }
837
838    /// Report updates (inserts or retractions) to the identified collection's dependencies.
839    ///
840    /// # Panics
841    ///
842    /// Panics if the identified collection does not exist.
843    fn report_dependency_updates(&self, id: GlobalId, diff: Diff) {
844        let collection = self.expect_collection(id);
845        let dependencies = collection.dependency_ids();
846
847        let updates = dependencies
848            .map(|dependency_id| {
849                let row = Row::pack_slice(&[
850                    Datum::String(&id.to_string()),
851                    Datum::String(&dependency_id.to_string()),
852                ]);
853                (row, diff)
854            })
855            .collect();
856
857        self.deliver_introspection_updates(IntrospectionType::ComputeDependencies, updates);
858    }
859
860    /// Returns `true` if the given collection is hydrated on at least one
861    /// replica.
862    ///
863    /// This also returns `true` in case this cluster does not have any
864    /// replicas.
865    #[mz_ore::instrument(level = "debug")]
866    pub fn collection_hydrated(
867        &self,
868        collection_id: GlobalId,
869    ) -> Result<bool, CollectionLookupError> {
870        if self.replicas.is_empty() {
871            return Ok(true);
872        }
873
874        for replica_state in self.replicas.values() {
875            let collection_state = replica_state
876                .collections
877                .get(&collection_id)
878                .ok_or(CollectionLookupError::CollectionMissing(collection_id))?;
879
880            if collection_state.hydrated() {
881                return Ok(true);
882            }
883        }
884
885        Ok(false)
886    }
887
888    /// Returns `true` if each non-transient, non-excluded collection is hydrated on at
889    /// least one replica.
890    ///
891    /// This also returns `true` in case this cluster does not have any
892    /// replicas.
893    #[mz_ore::instrument(level = "debug")]
894    pub fn collections_hydrated_on_replicas(
895        &self,
896        target_replica_ids: Option<Vec<ReplicaId>>,
897        exclude_collections: &BTreeSet<GlobalId>,
898    ) -> Result<bool, HydrationCheckBadTarget> {
899        if self.replicas.is_empty() {
900            return Ok(true);
901        }
902        let mut all_hydrated = true;
903        let target_replicas: BTreeSet<ReplicaId> = self
904            .replicas
905            .keys()
906            .filter_map(|id| match target_replica_ids {
907                None => Some(id.clone()),
908                Some(ref ids) if ids.contains(id) => Some(id.clone()),
909                Some(_) => None,
910            })
911            .collect();
912        if let Some(targets) = target_replica_ids {
913            if target_replicas.is_empty() {
914                return Err(HydrationCheckBadTarget(targets));
915            }
916        }
917
918        for (id, _collection) in self.collections_iter() {
919            if id.is_transient() || exclude_collections.contains(&id) {
920                continue;
921            }
922
923            let mut collection_hydrated = false;
924            for replica_state in self.replicas.values() {
925                if !target_replicas.contains(&replica_state.id) {
926                    continue;
927                }
928                let collection_state = replica_state
929                    .collections
930                    .get(&id)
931                    .expect("missing collection state");
932
933                if collection_state.hydrated() {
934                    collection_hydrated = true;
935                    break;
936                }
937            }
938
939            if !collection_hydrated {
940                tracing::info!("collection {id} is not hydrated on any replica");
941                all_hydrated = false;
942                // We continue with our loop instead of breaking out early, so
943                // that we log all non-hydrated replicas.
944            }
945        }
946
947        Ok(all_hydrated)
948    }
949
950    /// Returns `true` if all non-transient, non-excluded collections are hydrated on at least one
951    /// replica.
952    ///
953    /// This also returns `true` in case this cluster does not have any
954    /// replicas.
955    #[mz_ore::instrument(level = "debug")]
956    pub fn collections_hydrated(&self, exclude_collections: &BTreeSet<GlobalId>) -> bool {
957        self.collections_hydrated_on_replicas(None, exclude_collections)
958            .expect("Cannot error if target_replica_ids is None")
959    }
960
961    /// Clean up collection state that is not needed anymore.
962    ///
963    /// Three conditions need to be true before we can remove state for a collection:
964    ///
965    ///  1. A client must have explicitly dropped the collection. If that is not the case, clients
966    ///     can still reasonably assume that the controller knows about the collection and can
967    ///     answer queries about it.
968    ///  2. There must be no outstanding read capabilities on the collection. As long as someone
969    ///     still holds read capabilities on a collection, we need to keep it around to be able
970    ///     to properly handle downgrading of said capabilities.
971    ///  3. All replica frontiers for the collection must have advanced to the empty frontier.
972    ///     Advancement to the empty frontiers signals that replicas are done computing the
973    ///     collection and that they won't send more `ComputeResponse`s for it. As long as we might
974    ///     receive responses for a collection we want to keep it around to be able to validate and
975    ///     handle these responses.
976    fn cleanup_collections(&mut self) {
977        let to_remove: Vec<_> = self
978            .collections_iter()
979            .filter(|(id, collection)| {
980                collection.dropped
981                    && collection.shared.lock_read_capabilities(|c| c.is_empty())
982                    && self
983                        .replicas
984                        .values()
985                        .all(|r| r.collection_frontiers_empty(*id))
986            })
987            .map(|(id, _collection)| id)
988            .collect();
989
990        for id in to_remove {
991            self.remove_collection(id);
992        }
993    }
994
995    /// Returns the state of the [`Instance`] formatted as JSON.
996    ///
997    /// The returned value is not guaranteed to be stable and may change at any point in time.
998    #[mz_ore::instrument(level = "debug")]
999    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1000        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
1001        // returned object as a tradeoff between usability and stability. `serde_json` will fail
1002        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
1003        // prevents a future unrelated change from silently breaking this method.
1004
1005        // Destructure `self` here so we don't forget to consider dumping newly added fields.
1006        let Self {
1007            build_info: _,
1008            storage_collections: _,
1009            peek_stash_persist_location: _,
1010            initialized,
1011            read_only,
1012            workload_class,
1013            replicas,
1014            collections,
1015            log_sources: _,
1016            peeks,
1017            subscribes,
1018            copy_tos,
1019            history: _,
1020            command_rx: _,
1021            response_tx: _,
1022            introspection_tx: _,
1023            metrics: _,
1024            dyncfg: _,
1025            now: _,
1026            wallclock_lag: _,
1027            wallclock_lag_last_recorded,
1028            read_hold_tx: _,
1029            replica_tx: _,
1030            replica_rx: _,
1031        } = self;
1032
1033        fn field(
1034            key: &str,
1035            value: impl Serialize,
1036        ) -> Result<(String, serde_json::Value), anyhow::Error> {
1037            let value = serde_json::to_value(value)?;
1038            Ok((key.to_string(), value))
1039        }
1040
1041        let replicas: BTreeMap<_, _> = replicas
1042            .iter()
1043            .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
1044            .collect::<Result<_, anyhow::Error>>()?;
1045        let collections: BTreeMap<_, _> = collections
1046            .iter()
1047            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
1048            .collect();
1049        let peeks: BTreeMap<_, _> = peeks
1050            .iter()
1051            .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
1052            .collect();
1053        let subscribes: BTreeMap<_, _> = subscribes
1054            .iter()
1055            .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
1056            .collect();
1057        let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
1058        let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
1059
1060        let map = serde_json::Map::from_iter([
1061            field("initialized", initialized)?,
1062            field("read_only", read_only)?,
1063            field("workload_class", workload_class)?,
1064            field("replicas", replicas)?,
1065            field("collections", collections)?,
1066            field("peeks", peeks)?,
1067            field("subscribes", subscribes)?,
1068            field("copy_tos", copy_tos)?,
1069            field("wallclock_lag_last_recorded", wallclock_lag_last_recorded)?,
1070        ]);
1071        Ok(serde_json::Value::Object(map))
1072    }
1073
1074    /// Reports the current write frontier for the identified compute collection.
1075    fn collection_write_frontier(&self, id: GlobalId) -> Result<Antichain<T>, CollectionMissing> {
1076        Ok(self.collection(id)?.write_frontier())
1077    }
1078}
1079
1080impl<T> Instance<T>
1081where
1082    T: ComputeControllerTimestamp,
1083{
1084    fn new(
1085        build_info: &'static BuildInfo,
1086        storage: StorageCollections<T>,
1087        peek_stash_persist_location: PersistLocation,
1088        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
1089        metrics: InstanceMetrics,
1090        now: NowFn,
1091        wallclock_lag: WallclockLagFn<T>,
1092        dyncfg: Arc<ConfigSet>,
1093        command_rx: mpsc::UnboundedReceiver<Command<T>>,
1094        response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
1095        read_hold_tx: read_holds::ChangeTx<T>,
1096        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
1097        read_only: bool,
1098    ) -> Self {
1099        let mut collections = BTreeMap::new();
1100        let mut log_sources = BTreeMap::new();
1101        for (log, id, shared) in arranged_logs {
1102            let collection = CollectionState::new_log_collection(
1103                id,
1104                shared,
1105                Arc::clone(&read_hold_tx),
1106                introspection_tx.clone(),
1107            );
1108            collections.insert(id, collection);
1109            log_sources.insert(log, id);
1110        }
1111
1112        let history = ComputeCommandHistory::new(metrics.for_history());
1113
1114        let send_count = metrics.response_send_count.clone();
1115        let recv_count = metrics.response_recv_count.clone();
1116        let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
1117
1118        let now_dt = mz_ore::now::to_datetime(now());
1119
1120        Self {
1121            build_info,
1122            storage_collections: storage,
1123            peek_stash_persist_location,
1124            initialized: false,
1125            read_only,
1126            workload_class: None,
1127            replicas: Default::default(),
1128            collections,
1129            log_sources,
1130            peeks: Default::default(),
1131            subscribes: Default::default(),
1132            copy_tos: Default::default(),
1133            history,
1134            command_rx,
1135            response_tx,
1136            introspection_tx,
1137            metrics,
1138            dyncfg,
1139            now,
1140            wallclock_lag,
1141            wallclock_lag_last_recorded: now_dt,
1142            read_hold_tx,
1143            replica_tx,
1144            replica_rx,
1145        }
1146    }
1147
1148    async fn run(mut self) {
1149        self.send(ComputeCommand::Hello {
1150            // The nonce is protocol iteration-specific and will be set in
1151            // `ReplicaTask::specialize_command`.
1152            nonce: Uuid::default(),
1153        });
1154
1155        let instance_config = InstanceConfig {
1156            peek_stash_persist_location: self.peek_stash_persist_location.clone(),
1157            // The remaining fields are replica-specific and will be set in
1158            // `ReplicaTask::specialize_command`.
1159            logging: Default::default(),
1160            expiration_offset: Default::default(),
1161        };
1162
1163        self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
1164
1165        loop {
1166            tokio::select! {
1167                command = self.command_rx.recv() => match command {
1168                    Some(cmd) => cmd(&mut self),
1169                    None => break,
1170                },
1171                response = self.replica_rx.recv() => match response {
1172                    Some(response) => self.handle_response(response),
1173                    None => unreachable!("self owns a sender side of the channel"),
1174                }
1175            }
1176        }
1177    }
1178
1179    /// Update instance configuration.
1180    #[mz_ore::instrument(level = "debug")]
1181    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
1182        if let Some(workload_class) = &config_params.workload_class {
1183            self.workload_class = workload_class.clone();
1184        }
1185
1186        let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
1187        self.send(command);
1188    }
1189
1190    /// Marks the end of any initialization commands.
1191    ///
1192    /// Intended to be called by `Controller`, rather than by other code.
1193    /// Calling this method repeatedly has no effect.
1194    #[mz_ore::instrument(level = "debug")]
1195    pub fn initialization_complete(&mut self) {
1196        // The compute protocol requires that `InitializationComplete` is sent only once.
1197        if !self.initialized {
1198            self.send(ComputeCommand::InitializationComplete);
1199            self.initialized = true;
1200        }
1201    }
1202
1203    /// Allows collections to affect writes to external systems (persist).
1204    ///
1205    /// Calling this method repeatedly has no effect.
1206    #[mz_ore::instrument(level = "debug")]
1207    pub fn allow_writes(&mut self, collection_id: GlobalId) -> Result<(), CollectionMissing> {
1208        let collection = self.collection_mut(collection_id)?;
1209
1210        // Do not send redundant allow-writes commands.
1211        if !collection.read_only {
1212            return Ok(());
1213        }
1214
1215        // Don't send allow-writes for collections that are not installed.
1216        let as_of = collection.read_frontier();
1217
1218        // If the collection has an empty `as_of`, it was either never installed on the replica or
1219        // has since been dropped. In either case the replica does not expect any commands for it.
1220        if as_of.is_empty() {
1221            return Ok(());
1222        }
1223
1224        collection.read_only = false;
1225        self.send(ComputeCommand::AllowWrites(collection_id));
1226
1227        Ok(())
1228    }
1229
1230    /// Shut down this instance.
1231    ///
1232    /// This method runs various assertions ensuring the instance state is empty. It exists to help
1233    /// us find bugs where the client drops a compute instance that still has replicas or
1234    /// collections installed, and later assumes that said replicas/collections still exists.
1235    ///
1236    /// # Panics
1237    ///
1238    /// Panics if the compute instance still has active replicas.
1239    /// Panics if the compute instance still has collections installed.
1240    #[mz_ore::instrument(level = "debug")]
1241    pub fn shutdown(&mut self) {
1242        // Taking the `command_rx` ensures that the [`Instance::run`] loop terminates.
1243        let (_tx, rx) = mpsc::unbounded_channel();
1244        let mut command_rx = std::mem::replace(&mut self.command_rx, rx);
1245
1246        // Apply all outstanding read hold changes. This might cause read hold downgrades to be
1247        // added to `command_tx`, so we need to apply those in a loop.
1248        //
1249        // TODO(teskje): Make `Command` an enum and assert that all received commands are read
1250        // hold downgrades.
1251        while let Ok(cmd) = command_rx.try_recv() {
1252            cmd(self);
1253        }
1254
1255        // Collections might have been dropped but not cleaned up yet.
1256        self.cleanup_collections();
1257
1258        let stray_replicas: Vec<_> = self.replicas.keys().collect();
1259        soft_assert_or_log!(
1260            stray_replicas.is_empty(),
1261            "dropped instance still has provisioned replicas: {stray_replicas:?}",
1262        );
1263
1264        let collections = self.collections.iter();
1265        let stray_collections: Vec<_> = collections
1266            .filter(|(_, c)| !c.log_collection)
1267            .map(|(id, _)| id)
1268            .collect();
1269        soft_assert_or_log!(
1270            stray_collections.is_empty(),
1271            "dropped instance still has installed collections: {stray_collections:?}",
1272        );
1273    }
1274
1275    /// Sends a command to all replicas of this instance.
1276    #[mz_ore::instrument(level = "debug")]
1277    fn send(&mut self, cmd: ComputeCommand<T>) {
1278        // Record the command so that new replicas can be brought up to speed.
1279        self.history.push(cmd.clone());
1280
1281        // Clone the command for each active replica.
1282        for replica in self.replicas.values_mut() {
1283            // Swallow error, we'll notice because the replica task has stopped.
1284            let _ = replica.client.send(cmd.clone());
1285        }
1286    }
1287
1288    /// Add a new instance replica, by ID.
1289    #[mz_ore::instrument(level = "debug")]
1290    pub fn add_replica(
1291        &mut self,
1292        id: ReplicaId,
1293        mut config: ReplicaConfig,
1294        epoch: Option<u64>,
1295    ) -> Result<(), ReplicaExists> {
1296        if self.replica_exists(id) {
1297            return Err(ReplicaExists(id));
1298        }
1299
1300        config.logging.index_logs = self.log_sources.clone();
1301
1302        let epoch = epoch.unwrap_or(1);
1303        let metrics = self.metrics.for_replica(id);
1304        let client = ReplicaClient::spawn(
1305            id,
1306            self.build_info,
1307            config.clone(),
1308            epoch,
1309            metrics.clone(),
1310            Arc::clone(&self.dyncfg),
1311            self.replica_tx.clone(),
1312        );
1313
1314        // Take this opportunity to clean up the history we should present.
1315        self.history.reduce();
1316
1317        // Advance the uppers of source imports
1318        self.history.update_source_uppers(&self.storage_collections);
1319
1320        // Replay the commands at the client, creating new dataflow identifiers.
1321        for command in self.history.iter() {
1322            if client.send(command.clone()).is_err() {
1323                // We swallow the error here. On the next send, we will fail again, and
1324                // restart the connection as well as this rehydration.
1325                tracing::warn!("Replica {:?} connection terminated during hydration", id);
1326                break;
1327            }
1328        }
1329
1330        // Add replica to tracked state.
1331        self.add_replica_state(id, client, config, epoch);
1332
1333        Ok(())
1334    }
1335
1336    /// Remove an existing instance replica, by ID.
1337    #[mz_ore::instrument(level = "debug")]
1338    pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1339        self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1340
1341        // Subscribes targeting this replica either won't be served anymore (if the replica is
1342        // dropped) or might produce inconsistent output (if the target collection is an
1343        // introspection index). We produce an error to inform upstream.
1344        let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1345        for subscribe_id in to_drop {
1346            let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1347            let response = ComputeControllerResponse::SubscribeResponse(
1348                subscribe_id,
1349                SubscribeBatch {
1350                    lower: subscribe.frontier.clone(),
1351                    upper: subscribe.frontier,
1352                    updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1353                },
1354            );
1355            self.deliver_response(response);
1356        }
1357
1358        // Peeks targeting this replica might not be served anymore (if the replica is dropped).
1359        // If the replica has failed it might come back and respond to the peek later, but it still
1360        // seems like a good idea to cancel the peek to inform the caller about the failure. This
1361        // is consistent with how we handle targeted subscribes above.
1362        let mut peek_responses = Vec::new();
1363        let mut to_drop = Vec::new();
1364        for (uuid, peek) in self.peeks_targeting(id) {
1365            peek_responses.push(ComputeControllerResponse::PeekNotification(
1366                uuid,
1367                PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1368                peek.otel_ctx.clone(),
1369            ));
1370            to_drop.push(uuid);
1371        }
1372        for response in peek_responses {
1373            self.deliver_response(response);
1374        }
1375        for uuid in to_drop {
1376            let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1377            self.finish_peek(uuid, response);
1378        }
1379
1380        // We might have a chance to forward implied capabilities and reduce the cost of bringing
1381        // up the next replica, if the dropped replica was the only one in the cluster.
1382        self.forward_implied_capabilities();
1383
1384        Ok(())
1385    }
1386
1387    /// Rehydrate the given instance replica.
1388    ///
1389    /// # Panics
1390    ///
1391    /// Panics if the specified replica does not exist.
1392    fn rehydrate_replica(&mut self, id: ReplicaId) {
1393        let config = self.replicas[&id].config.clone();
1394        let epoch = self.replicas[&id].epoch + 1;
1395
1396        self.remove_replica(id).expect("replica must exist");
1397        let result = self.add_replica(id, config, Some(epoch));
1398
1399        match result {
1400            Ok(()) => (),
1401            Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1402        }
1403    }
1404
1405    /// Rehydrate any failed replicas of this instance.
1406    fn rehydrate_failed_replicas(&mut self) {
1407        let replicas = self.replicas.iter();
1408        let failed_replicas: Vec<_> = replicas
1409            .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1410            .collect();
1411
1412        for replica_id in failed_replicas {
1413            self.rehydrate_replica(replica_id);
1414        }
1415    }
1416
1417    /// Creates the described dataflow and initializes state for its output.
1418    ///
1419    /// This method expects a `DataflowDescription` with an `as_of` frontier specified, as well as
1420    /// for each imported collection a read hold in `import_read_holds` at at least the `as_of`.
1421    ///
1422    /// If a `subscribe_target_replica` is given, any subscribes exported by the dataflow are
1423    /// configured to target that replica, i.e., only subscribe responses sent by that replica are
1424    /// considered.
1425    #[mz_ore::instrument(level = "debug")]
1426    pub fn create_dataflow(
1427        &mut self,
1428        dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1429        import_read_holds: Vec<ReadHold<T>>,
1430        subscribe_target_replica: Option<ReplicaId>,
1431        mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState<T>>,
1432    ) -> Result<(), DataflowCreationError> {
1433        use DataflowCreationError::*;
1434
1435        if let Some(replica_id) = subscribe_target_replica {
1436            if !self.replica_exists(replica_id) {
1437                return Err(ReplicaMissing(replica_id));
1438            }
1439        }
1440
1441        // Simple sanity checks around `as_of`
1442        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1443        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1444            return Err(EmptyAsOfForSubscribe);
1445        }
1446        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1447            return Err(EmptyAsOfForCopyTo);
1448        }
1449
1450        // Collect all dependencies of the dataflow, and read holds on them at the `as_of`.
1451        let mut storage_dependencies = BTreeMap::new();
1452        let mut compute_dependencies = BTreeMap::new();
1453
1454        // When we install per-replica input read holds, we cannot use the `as_of` because of
1455        // reconciliation: Existing slow replicas might be reading from the inputs at times before
1456        // the `as_of` and we would rather not crash them by allowing their inputs to compact too
1457        // far. So instead we take read holds at the least time available.
1458        let mut replica_input_read_holds = Vec::new();
1459
1460        let mut import_read_holds: BTreeMap<_, _> =
1461            import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1462
1463        for &id in dataflow.source_imports.keys() {
1464            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1465            replica_input_read_holds.push(read_hold.clone());
1466
1467            read_hold
1468                .try_downgrade(as_of.clone())
1469                .map_err(|_| ReadHoldInsufficient(id))?;
1470            storage_dependencies.insert(id, read_hold);
1471        }
1472
1473        for &id in dataflow.index_imports.keys() {
1474            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1475            read_hold
1476                .try_downgrade(as_of.clone())
1477                .map_err(|_| ReadHoldInsufficient(id))?;
1478            compute_dependencies.insert(id, read_hold);
1479        }
1480
1481        // If the `as_of` is empty, we are not going to create a dataflow, so replicas won't read
1482        // from the inputs.
1483        if as_of.is_empty() {
1484            replica_input_read_holds = Default::default();
1485        }
1486
1487        // Install collection state for each of the exports.
1488        for export_id in dataflow.export_ids() {
1489            let shared = shared_collection_state
1490                .remove(&export_id)
1491                .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1492            let write_only = dataflow.sink_exports.contains_key(&export_id);
1493            let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1494
1495            self.add_collection(
1496                export_id,
1497                as_of.clone(),
1498                shared,
1499                storage_dependencies.clone(),
1500                compute_dependencies.clone(),
1501                replica_input_read_holds.clone(),
1502                write_only,
1503                storage_sink,
1504                dataflow.initial_storage_as_of.clone(),
1505                dataflow.refresh_schedule.clone(),
1506            );
1507
1508            // If the export is a storage sink, we can advance its write frontier to the write
1509            // frontier of the target storage collection.
1510            if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1511                self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1512            }
1513        }
1514
1515        // Initialize tracking of subscribes.
1516        for subscribe_id in dataflow.subscribe_ids() {
1517            self.subscribes
1518                .insert(subscribe_id, ActiveSubscribe::new(subscribe_target_replica));
1519        }
1520
1521        // Initialize tracking of copy tos.
1522        for copy_to_id in dataflow.copy_to_ids() {
1523            self.copy_tos.insert(copy_to_id);
1524        }
1525
1526        // Here we augment all imported sources and all exported sinks with the appropriate
1527        // storage metadata needed by the compute instance.
1528        let mut source_imports = BTreeMap::new();
1529        for (id, (si, monotonic, _upper)) in dataflow.source_imports {
1530            let frontiers = self
1531                .storage_collections
1532                .collection_frontiers(id)
1533                .expect("collection exists");
1534
1535            let collection_metadata = self
1536                .storage_collections
1537                .collection_metadata(id)
1538                .expect("we have a read hold on this collection");
1539
1540            let desc = SourceInstanceDesc {
1541                storage_metadata: collection_metadata.clone(),
1542                arguments: si.arguments,
1543                typ: si.typ.clone(),
1544            };
1545            source_imports.insert(id, (desc, monotonic, frontiers.write_frontier));
1546        }
1547
1548        let mut sink_exports = BTreeMap::new();
1549        for (id, se) in dataflow.sink_exports {
1550            let connection = match se.connection {
1551                ComputeSinkConnection::MaterializedView(conn) => {
1552                    let metadata = self
1553                        .storage_collections
1554                        .collection_metadata(id)
1555                        .map_err(|_| CollectionMissing(id))?
1556                        .clone();
1557                    let conn = MaterializedViewSinkConnection {
1558                        value_desc: conn.value_desc,
1559                        storage_metadata: metadata,
1560                    };
1561                    ComputeSinkConnection::MaterializedView(conn)
1562                }
1563                ComputeSinkConnection::ContinualTask(conn) => {
1564                    let metadata = self
1565                        .storage_collections
1566                        .collection_metadata(id)
1567                        .map_err(|_| DataflowCreationError::CollectionMissing(id))?
1568                        .clone();
1569                    let conn = ContinualTaskConnection {
1570                        input_id: conn.input_id,
1571                        storage_metadata: metadata,
1572                    };
1573                    ComputeSinkConnection::ContinualTask(conn)
1574                }
1575                ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1576                ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1577                    ComputeSinkConnection::CopyToS3Oneshot(conn)
1578                }
1579            };
1580            let desc = ComputeSinkDesc {
1581                from: se.from,
1582                from_desc: se.from_desc,
1583                connection,
1584                with_snapshot: se.with_snapshot,
1585                up_to: se.up_to,
1586                non_null_assertions: se.non_null_assertions,
1587                refresh_schedule: se.refresh_schedule,
1588            };
1589            sink_exports.insert(id, desc);
1590        }
1591
1592        // Flatten the dataflow plans into the representation expected by replicas.
1593        let objects_to_build = dataflow
1594            .objects_to_build
1595            .into_iter()
1596            .map(|object| BuildDesc {
1597                id: object.id,
1598                plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1599            })
1600            .collect();
1601
1602        let augmented_dataflow = DataflowDescription {
1603            source_imports,
1604            sink_exports,
1605            objects_to_build,
1606            // The rest of the fields are identical
1607            index_imports: dataflow.index_imports,
1608            index_exports: dataflow.index_exports,
1609            as_of: dataflow.as_of.clone(),
1610            until: dataflow.until,
1611            initial_storage_as_of: dataflow.initial_storage_as_of,
1612            refresh_schedule: dataflow.refresh_schedule,
1613            debug_name: dataflow.debug_name,
1614            time_dependence: dataflow.time_dependence,
1615        };
1616
1617        if augmented_dataflow.is_transient() {
1618            tracing::debug!(
1619                name = %augmented_dataflow.debug_name,
1620                import_ids = %augmented_dataflow.display_import_ids(),
1621                export_ids = %augmented_dataflow.display_export_ids(),
1622                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1623                until = ?augmented_dataflow.until.elements(),
1624                "creating dataflow",
1625            );
1626        } else {
1627            tracing::info!(
1628                name = %augmented_dataflow.debug_name,
1629                import_ids = %augmented_dataflow.display_import_ids(),
1630                export_ids = %augmented_dataflow.display_export_ids(),
1631                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1632                until = ?augmented_dataflow.until.elements(),
1633                "creating dataflow",
1634            );
1635        }
1636
1637        // Skip the actual dataflow creation for an empty `as_of`. (Happens e.g. for the
1638        // bootstrapping of a REFRESH AT mat view that is past its last refresh.)
1639        if as_of.is_empty() {
1640            tracing::info!(
1641                name = %augmented_dataflow.debug_name,
1642                "not sending `CreateDataflow`, because of empty `as_of`",
1643            );
1644        } else {
1645            let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1646            let dataflow = Box::new(augmented_dataflow);
1647            self.send(ComputeCommand::CreateDataflow(dataflow));
1648
1649            for id in collections {
1650                self.maybe_schedule_collection(id);
1651            }
1652        }
1653
1654        Ok(())
1655    }
1656
1657    /// Schedule the identified collection if all its inputs are available.
1658    ///
1659    /// # Panics
1660    ///
1661    /// Panics if the identified collection does not exist.
1662    fn maybe_schedule_collection(&mut self, id: GlobalId) {
1663        let collection = self.expect_collection(id);
1664
1665        // Don't schedule collections twice.
1666        if collection.scheduled {
1667            return;
1668        }
1669
1670        let as_of = collection.read_frontier();
1671
1672        // If the collection has an empty `as_of`, it was either never installed on the replica or
1673        // has since been dropped. In either case the replica does not expect any commands for it.
1674        if as_of.is_empty() {
1675            return;
1676        }
1677
1678        let ready = if id.is_transient() {
1679            // Always schedule transient collections immediately. The assumption is that those are
1680            // created by interactive user commands and we want to schedule them as quickly as
1681            // possible. Inputs might not yet be available, but when they become available, we
1682            // don't need to wait for the controller to become aware and for the scheduling check
1683            // to run again.
1684            true
1685        } else {
1686            // Ignore self-dependencies. Any self-dependencies do not need to be
1687            // available at the as_of for the dataflow to make progress, so we
1688            // can ignore them here. At the moment, only continual tasks have
1689            // self-dependencies, but this logic is correct for any dataflow, so
1690            // we don't special case it to CTs.
1691            let not_self_dep = |x: &GlobalId| *x != id;
1692
1693            // Check dependency frontiers to determine if all inputs are
1694            // available. An input is available when its frontier is greater
1695            // than the `as_of`, i.e., all input data up to and including the
1696            // `as_of` has been sealed.
1697            let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1698            let compute_frontiers = compute_deps.map(|id| {
1699                let dep = &self.expect_collection(id);
1700                dep.write_frontier()
1701            });
1702
1703            let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1704            let storage_frontiers = self
1705                .storage_collections
1706                .collections_frontiers(storage_deps.collect())
1707                .expect("must exist");
1708            let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1709
1710            let ready = compute_frontiers
1711                .chain(storage_frontiers)
1712                .all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1713
1714            ready
1715        };
1716
1717        if ready {
1718            self.send(ComputeCommand::Schedule(id));
1719            let collection = self.expect_collection_mut(id);
1720            collection.scheduled = true;
1721        }
1722    }
1723
1724    /// Schedule any unscheduled collections that are ready.
1725    fn schedule_collections(&mut self) {
1726        let ids: Vec<_> = self.collections.keys().copied().collect();
1727        for id in ids {
1728            self.maybe_schedule_collection(id);
1729        }
1730    }
1731
1732    /// Drops the read capability for the given collections and allows their resources to be
1733    /// reclaimed.
1734    #[mz_ore::instrument(level = "debug")]
1735    pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1736        for id in &ids {
1737            let collection = self.collection_mut(*id)?;
1738
1739            // Mark the collection as dropped to allow it to be removed from the controller state.
1740            collection.dropped = true;
1741
1742            // Drop the implied and warmup read holds to announce that clients are not
1743            // interested in the collection anymore.
1744            collection.implied_read_hold.release();
1745            collection.warmup_read_hold.release();
1746
1747            // If the collection is a subscribe, stop tracking it. This ensures that the controller
1748            // ceases to produce `SubscribeResponse`s for this subscribe.
1749            self.subscribes.remove(id);
1750            // If the collection is a copy to, stop tracking it. This ensures that the controller
1751            // ceases to produce `CopyToResponse`s` for this copy to.
1752            self.copy_tos.remove(id);
1753        }
1754
1755        Ok(())
1756    }
1757
1758    /// Initiate a peek request for the contents of `id` at `timestamp`.
1759    #[mz_ore::instrument(level = "debug")]
1760    pub fn peek(
1761        &mut self,
1762        peek_target: PeekTarget,
1763        literal_constraints: Option<Vec<Row>>,
1764        uuid: Uuid,
1765        timestamp: T,
1766        result_desc: RelationDesc,
1767        finishing: RowSetFinishing,
1768        map_filter_project: mz_expr::SafeMfpPlan,
1769        mut read_hold: ReadHold<T>,
1770        target_replica: Option<ReplicaId>,
1771        peek_response_tx: oneshot::Sender<PeekResponse>,
1772    ) -> Result<(), PeekError> {
1773        use PeekError::*;
1774
1775        let target_id = peek_target.id();
1776
1777        // Downgrade the provided read hold to the peek time.
1778        if read_hold.id() != target_id {
1779            return Err(ReadHoldIdMismatch(read_hold.id()));
1780        }
1781        read_hold
1782            .try_downgrade(Antichain::from_elem(timestamp.clone()))
1783            .map_err(|_| ReadHoldInsufficient(target_id))?;
1784
1785        if let Some(target) = target_replica {
1786            if !self.replica_exists(target) {
1787                return Err(ReplicaMissing(target));
1788            }
1789        }
1790
1791        let otel_ctx = OpenTelemetryContext::obtain();
1792
1793        self.peeks.insert(
1794            uuid,
1795            PendingPeek {
1796                target_replica,
1797                // TODO(guswynn): can we just hold the `tracing::Span` here instead?
1798                otel_ctx: otel_ctx.clone(),
1799                requested_at: Instant::now(),
1800                read_hold,
1801                peek_response_tx,
1802                limit: finishing.limit.map(usize::cast_from),
1803                offset: finishing.offset,
1804            },
1805        );
1806
1807        let peek = Peek {
1808            literal_constraints,
1809            uuid,
1810            timestamp,
1811            finishing,
1812            map_filter_project,
1813            // Obtain an `OpenTelemetryContext` from the thread-local tracing
1814            // tree to forward it on to the compute worker.
1815            otel_ctx,
1816            target: peek_target,
1817            result_desc,
1818        };
1819        self.send(ComputeCommand::Peek(Box::new(peek)));
1820
1821        Ok(())
1822    }
1823
1824    /// Cancels an existing peek request.
1825    #[mz_ore::instrument(level = "debug")]
1826    pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1827        let Some(peek) = self.peeks.get_mut(&uuid) else {
1828            tracing::warn!("did not find pending peek for {uuid}");
1829            return;
1830        };
1831
1832        let duration = peek.requested_at.elapsed();
1833        self.metrics
1834            .observe_peek_response(&PeekResponse::Canceled, duration);
1835
1836        // Enqueue a notification for the cancellation.
1837        let otel_ctx = peek.otel_ctx.clone();
1838        otel_ctx.attach_as_parent();
1839
1840        self.deliver_response(ComputeControllerResponse::PeekNotification(
1841            uuid,
1842            PeekNotification::Canceled,
1843            otel_ctx,
1844        ));
1845
1846        // Finish the peek.
1847        // This will also propagate the cancellation to the replicas.
1848        self.finish_peek(uuid, reason);
1849    }
1850
1851    /// Assigns a read policy to specific identifiers.
1852    ///
1853    /// The policies are assigned in the order presented, and repeated identifiers should
1854    /// conclude with the last policy. Changing a policy will immediately downgrade the read
1855    /// capability if appropriate, but it will not "recover" the read capability if the prior
1856    /// capability is already ahead of it.
1857    ///
1858    /// Identifiers not present in `policies` retain their existing read policies.
1859    ///
1860    /// It is an error to attempt to set a read policy for a collection that is not readable in the
1861    /// context of compute. At this time, only indexes are readable compute collections.
1862    #[mz_ore::instrument(level = "debug")]
1863    pub fn set_read_policy(
1864        &mut self,
1865        policies: Vec<(GlobalId, ReadPolicy<T>)>,
1866    ) -> Result<(), ReadPolicyError> {
1867        // Do error checking upfront, to avoid introducing inconsistencies between a collection's
1868        // `implied_capability` and `read_capabilities`.
1869        for (id, _policy) in &policies {
1870            let collection = self.collection(*id)?;
1871            if collection.read_policy.is_none() {
1872                return Err(ReadPolicyError::WriteOnlyCollection(*id));
1873            }
1874        }
1875
1876        for (id, new_policy) in policies {
1877            let collection = self.expect_collection_mut(id);
1878            let new_since = new_policy.frontier(collection.write_frontier().borrow());
1879            let _ = collection.implied_read_hold.try_downgrade(new_since);
1880            collection.read_policy = Some(new_policy);
1881        }
1882
1883        Ok(())
1884    }
1885
1886    /// Advance the global write frontier of the given collection.
1887    ///
1888    /// Frontier regressions are gracefully ignored.
1889    ///
1890    /// # Panics
1891    ///
1892    /// Panics if the identified collection does not exist.
1893    #[mz_ore::instrument(level = "debug")]
1894    fn maybe_update_global_write_frontier(&mut self, id: GlobalId, new_frontier: Antichain<T>) {
1895        let collection = self.expect_collection_mut(id);
1896
1897        let advanced = collection.shared.lock_write_frontier(|f| {
1898            let advanced = PartialOrder::less_than(f, &new_frontier);
1899            if advanced {
1900                f.clone_from(&new_frontier);
1901            }
1902            advanced
1903        });
1904
1905        if !advanced {
1906            return;
1907        }
1908
1909        // Relax the implied read hold according to the read policy.
1910        let new_since = match &collection.read_policy {
1911            Some(read_policy) => {
1912                // For readable collections the read frontier is determined by applying the
1913                // client-provided read policy to the write frontier.
1914                read_policy.frontier(new_frontier.borrow())
1915            }
1916            None => {
1917                // Write-only collections cannot be read within the context of the compute
1918                // controller, so their read frontier only controls the read holds taken on their
1919                // inputs. We can safely downgrade the input read holds to any time less than the
1920                // write frontier.
1921                //
1922                // Note that some write-only collections (continual tasks) need to observe changes
1923                // at their current write frontier during hydration. Thus, we cannot downgrade the
1924                // read frontier to the write frontier and instead step it back by one.
1925                Antichain::from_iter(
1926                    new_frontier
1927                        .iter()
1928                        .map(|t| t.step_back().unwrap_or_else(T::minimum)),
1929                )
1930            }
1931        };
1932        let _ = collection.implied_read_hold.try_downgrade(new_since);
1933
1934        // Report the frontier advancement.
1935        self.deliver_response(ComputeControllerResponse::FrontierUpper {
1936            id,
1937            upper: new_frontier,
1938        });
1939    }
1940
1941    /// Apply a collection read hold change.
1942    fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch<T>) {
1943        let Some(collection) = self.collections.get_mut(&id) else {
1944            soft_panic_or_log!(
1945                "read hold change for absent collection (id={id}, changes={update:?})"
1946            );
1947            return;
1948        };
1949
1950        let new_since = collection.shared.lock_read_capabilities(|caps| {
1951            // Sanity check to prevent corrupted `read_capabilities`, which can cause hard-to-debug
1952            // issues (usually stuck read frontiers).
1953            let read_frontier = caps.frontier();
1954            for (time, diff) in update.iter() {
1955                let count = caps.count_for(time) + diff;
1956                assert!(
1957                    count >= 0,
1958                    "invalid read capabilities update: negative capability \
1959             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1960                );
1961                assert!(
1962                    count == 0 || read_frontier.less_equal(time),
1963                    "invalid read capabilities update: frontier regression \
1964             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1965                );
1966            }
1967
1968            // Apply read capability updates and learn about resulting changes to the read
1969            // frontier.
1970            let changes = caps.update_iter(update.drain());
1971
1972            let changed = changes.count() > 0;
1973            changed.then(|| caps.frontier().to_owned())
1974        });
1975
1976        let Some(new_since) = new_since else {
1977            return; // read frontier did not change
1978        };
1979
1980        // Propagate read frontier update to dependencies.
1981        for read_hold in collection.compute_dependencies.values_mut() {
1982            read_hold
1983                .try_downgrade(new_since.clone())
1984                .expect("frontiers don't regress");
1985        }
1986        for read_hold in collection.storage_dependencies.values_mut() {
1987            read_hold
1988                .try_downgrade(new_since.clone())
1989                .expect("frontiers don't regress");
1990        }
1991
1992        // Produce `AllowCompaction` command.
1993        self.send(ComputeCommand::AllowCompaction {
1994            id,
1995            frontier: new_since,
1996        });
1997    }
1998
1999    /// Fulfills a registered peek and cleans up associated state.
2000    ///
2001    /// As part of this we:
2002    ///  * Send a `PeekResponse` through the peek's response channel.
2003    ///  * Emit a `CancelPeek` command to instruct replicas to stop spending resources on this
2004    ///    peek, and to allow the `ComputeCommandHistory` to reduce away the corresponding `Peek`
2005    ///    command.
2006    ///  * Remove the read hold for this peek, unblocking compaction that might have waited on it.
2007    fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
2008        let Some(peek) = self.peeks.remove(&uuid) else {
2009            return;
2010        };
2011
2012        // The recipient might not be interested in the peek response anymore, which is fine.
2013        let _ = peek.peek_response_tx.send(response);
2014
2015        // NOTE: We need to send the `CancelPeek` command _before_ we release the peek's read hold
2016        // (by dropping it), to avoid the edge case that caused database-issues#4812.
2017        self.send(ComputeCommand::CancelPeek { uuid });
2018
2019        drop(peek.read_hold);
2020    }
2021
2022    /// Handles a response from a replica. Replica IDs are re-used across replica restarts, so we
2023    /// use the replica epoch to drop stale responses.
2024    fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse<T>) {
2025        // Filter responses from non-existing or stale replicas.
2026        if self
2027            .replicas
2028            .get(&replica_id)
2029            .filter(|replica| replica.epoch == epoch)
2030            .is_none()
2031        {
2032            return;
2033        }
2034
2035        // Invariant: the replica exists and has the expected epoch.
2036
2037        match response {
2038            ComputeResponse::Frontiers(id, frontiers) => {
2039                self.handle_frontiers_response(id, frontiers, replica_id);
2040            }
2041            ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
2042                self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
2043            }
2044            ComputeResponse::CopyToResponse(id, response) => {
2045                self.handle_copy_to_response(id, response, replica_id);
2046            }
2047            ComputeResponse::SubscribeResponse(id, response) => {
2048                self.handle_subscribe_response(id, response, replica_id);
2049            }
2050            ComputeResponse::Status(response) => {
2051                self.handle_status_response(response, replica_id);
2052            }
2053        }
2054    }
2055
2056    /// Handle new frontiers, returning any compute response that needs to
2057    /// be sent to the client.
2058    fn handle_frontiers_response(
2059        &mut self,
2060        id: GlobalId,
2061        frontiers: FrontiersResponse<T>,
2062        replica_id: ReplicaId,
2063    ) {
2064        if !self.collections.contains_key(&id) {
2065            soft_panic_or_log!(
2066                "frontiers update for an unknown collection \
2067                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
2068            );
2069            return;
2070        }
2071        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2072            soft_panic_or_log!(
2073                "frontiers update for an unknown replica \
2074                 (replica_id={replica_id}, frontiers={frontiers:?})"
2075            );
2076            return;
2077        };
2078        let Some(replica_collection) = replica.collections.get_mut(&id) else {
2079            soft_panic_or_log!(
2080                "frontiers update for an unknown replica collection \
2081                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
2082            );
2083            return;
2084        };
2085
2086        if let Some(new_frontier) = frontiers.input_frontier {
2087            replica_collection.update_input_frontier(new_frontier.clone());
2088        }
2089        if let Some(new_frontier) = frontiers.output_frontier {
2090            replica_collection.update_output_frontier(new_frontier.clone());
2091        }
2092        if let Some(new_frontier) = frontiers.write_frontier {
2093            replica_collection.update_write_frontier(new_frontier.clone());
2094            self.maybe_update_global_write_frontier(id, new_frontier);
2095        }
2096    }
2097
2098    #[mz_ore::instrument(level = "debug")]
2099    fn handle_peek_response(
2100        &mut self,
2101        uuid: Uuid,
2102        response: PeekResponse,
2103        otel_ctx: OpenTelemetryContext,
2104        replica_id: ReplicaId,
2105    ) {
2106        otel_ctx.attach_as_parent();
2107
2108        // We might not be tracking this peek anymore, because we have served a response already or
2109        // because it was canceled. If this is the case, we ignore the response.
2110        let Some(peek) = self.peeks.get(&uuid) else {
2111            return;
2112        };
2113
2114        // If the peek is targeting a replica, ignore responses from other replicas.
2115        let target_replica = peek.target_replica.unwrap_or(replica_id);
2116        if target_replica != replica_id {
2117            return;
2118        }
2119
2120        let duration = peek.requested_at.elapsed();
2121        self.metrics.observe_peek_response(&response, duration);
2122
2123        let notification = PeekNotification::new(&response, peek.offset, peek.limit);
2124        // NOTE: We use the `otel_ctx` from the response, not the pending peek, because we
2125        // currently want the parent to be whatever the compute worker did with this peek.
2126        self.deliver_response(ComputeControllerResponse::PeekNotification(
2127            uuid,
2128            notification,
2129            otel_ctx,
2130        ));
2131
2132        self.finish_peek(uuid, response)
2133    }
2134
2135    fn handle_copy_to_response(
2136        &mut self,
2137        sink_id: GlobalId,
2138        response: CopyToResponse,
2139        replica_id: ReplicaId,
2140    ) {
2141        if !self.collections.contains_key(&sink_id) {
2142            soft_panic_or_log!(
2143                "received response for an unknown copy-to \
2144                 (sink_id={sink_id}, replica_id={replica_id})",
2145            );
2146            return;
2147        }
2148        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2149            soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
2150            return;
2151        };
2152        let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
2153            soft_panic_or_log!(
2154                "copy-to response for an unknown replica collection \
2155                 (sink_id={sink_id}, replica_id={replica_id})"
2156            );
2157            return;
2158        };
2159
2160        // Downgrade the replica frontiers, to enable dropping of input read holds and clean up of
2161        // collection state.
2162        // TODO(database-issues#4701): report copy-to frontiers through `Frontiers` responses
2163        replica_collection.update_write_frontier(Antichain::new());
2164        replica_collection.update_input_frontier(Antichain::new());
2165        replica_collection.update_output_frontier(Antichain::new());
2166
2167        // We might not be tracking this COPY TO because we have already returned a response
2168        // from one of the replicas. In that case, we ignore the response.
2169        if !self.copy_tos.remove(&sink_id) {
2170            return;
2171        }
2172
2173        let result = match response {
2174            CopyToResponse::RowCount(count) => Ok(count),
2175            CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
2176            // We should never get here: Replicas only drop copy to collections in response
2177            // to the controller allowing them to do so, and when the controller drops a
2178            // copy to it also removes it from the list of tracked copy_tos (see
2179            // [`Instance::drop_collections`]).
2180            CopyToResponse::Dropped => {
2181                tracing::error!(
2182                    %sink_id, %replica_id,
2183                    "received `Dropped` response for a tracked copy to",
2184                );
2185                return;
2186            }
2187        };
2188
2189        self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2190    }
2191
2192    fn handle_subscribe_response(
2193        &mut self,
2194        subscribe_id: GlobalId,
2195        response: SubscribeResponse<T>,
2196        replica_id: ReplicaId,
2197    ) {
2198        if !self.collections.contains_key(&subscribe_id) {
2199            soft_panic_or_log!(
2200                "received response for an unknown subscribe \
2201                 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2202            );
2203            return;
2204        }
2205        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2206            soft_panic_or_log!(
2207                "subscribe response for an unknown replica (replica_id={replica_id})"
2208            );
2209            return;
2210        };
2211        let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2212            soft_panic_or_log!(
2213                "subscribe response for an unknown replica collection \
2214                 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2215            );
2216            return;
2217        };
2218
2219        // Always apply replica write frontier updates. Even if the subscribe is not tracked
2220        // anymore, there might still be replicas reading from its inputs, so we need to track the
2221        // frontiers until all replicas have advanced to the empty one.
2222        let write_frontier = match &response {
2223            SubscribeResponse::Batch(batch) => batch.upper.clone(),
2224            SubscribeResponse::DroppedAt(_) => Antichain::new(),
2225        };
2226
2227        // For subscribes we downgrade all replica frontiers based on write frontiers. This should
2228        // be fine because the input and output frontier of a subscribe track its write frontier.
2229        // TODO(database-issues#4701): report subscribe frontiers through `Frontiers` responses
2230        replica_collection.update_write_frontier(write_frontier.clone());
2231        replica_collection.update_input_frontier(write_frontier.clone());
2232        replica_collection.update_output_frontier(write_frontier.clone());
2233
2234        // If the subscribe is not tracked, or targets a different replica, there is nothing to do.
2235        let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2236            return;
2237        };
2238        let replica_targeted = subscribe.target_replica.unwrap_or(replica_id) == replica_id;
2239        if !replica_targeted {
2240            return;
2241        }
2242
2243        // Apply a global frontier update.
2244        // If this is a replica-targeted subscribe, it is important that we advance the global
2245        // frontier only based on responses from the targeted replica. Otherwise, another replica
2246        // could advance to the empty frontier, making us drop the subscribe on the targeted
2247        // replica prematurely.
2248        self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2249
2250        match response {
2251            SubscribeResponse::Batch(batch) => {
2252                let upper = batch.upper;
2253                let mut updates = batch.updates;
2254
2255                // If this batch advances the subscribe's frontier, we emit all updates at times
2256                // greater or equal to the last frontier (to avoid emitting duplicate updates).
2257                if PartialOrder::less_than(&subscribe.frontier, &upper) {
2258                    let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2259
2260                    if upper.is_empty() {
2261                        // This subscribe cannot produce more data. Stop tracking it.
2262                        self.subscribes.remove(&subscribe_id);
2263                    } else {
2264                        // This subscribe can produce more data. Update our tracking of it.
2265                        self.subscribes.insert(subscribe_id, subscribe);
2266                    }
2267
2268                    if let Ok(updates) = updates.as_mut() {
2269                        updates.retain(|(time, _data, _diff)| lower.less_equal(time));
2270                    }
2271                    self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2272                        subscribe_id,
2273                        SubscribeBatch {
2274                            lower,
2275                            upper,
2276                            updates,
2277                        },
2278                    ));
2279                }
2280            }
2281            SubscribeResponse::DroppedAt(frontier) => {
2282                // We should never get here: Replicas only drop subscribe collections in response
2283                // to the controller allowing them to do so, and when the controller drops a
2284                // subscribe it also removes it from the list of tracked subscribes (see
2285                // [`Instance::drop_collections`]).
2286                tracing::error!(
2287                    %subscribe_id,
2288                    %replica_id,
2289                    frontier = ?frontier.elements(),
2290                    "received `DroppedAt` response for a tracked subscribe",
2291                );
2292                self.subscribes.remove(&subscribe_id);
2293            }
2294        }
2295    }
2296
2297    fn handle_status_response(&self, response: StatusResponse, _replica_id: ReplicaId) {
2298        match response {
2299            StatusResponse::Placeholder => {}
2300        }
2301    }
2302
2303    /// Return the write frontiers of the dependencies of the given collection.
2304    fn dependency_write_frontiers<'b>(
2305        &'b self,
2306        collection: &'b CollectionState<T>,
2307    ) -> impl Iterator<Item = Antichain<T>> + 'b {
2308        let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2309            let collection = self.collections.get(&dep_id);
2310            collection.map(|c| c.write_frontier())
2311        });
2312        let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2313            let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2314            frontiers.map(|f| f.write_frontier)
2315        });
2316
2317        compute_frontiers.chain(storage_frontiers)
2318    }
2319
2320    /// Return the write frontiers of transitive storage dependencies of the given collection.
2321    fn transitive_storage_dependency_write_frontiers<'b>(
2322        &'b self,
2323        collection: &'b CollectionState<T>,
2324    ) -> impl Iterator<Item = Antichain<T>> + 'b {
2325        let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2326        let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2327        let mut done = BTreeSet::new();
2328
2329        while let Some(id) = todo.pop() {
2330            if done.contains(&id) {
2331                continue;
2332            }
2333            if let Some(dep) = self.collections.get(&id) {
2334                storage_ids.extend(dep.storage_dependency_ids());
2335                todo.extend(dep.compute_dependency_ids())
2336            }
2337            done.insert(id);
2338        }
2339
2340        let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2341            let frontiers = self.storage_collections.collection_frontiers(id).ok();
2342            frontiers.map(|f| f.write_frontier)
2343        });
2344
2345        storage_frontiers
2346    }
2347
2348    /// Downgrade the warmup capabilities of collections as much as possible.
2349    ///
2350    /// The only requirement we have for a collection's warmup capability is that it is for a time
2351    /// that is available in all of the collection's inputs. For each input the latest time that is
2352    /// the case for is `write_frontier - 1`. So the farthest we can downgrade a collection's
2353    /// warmup capability is the minimum of `write_frontier - 1` of all its inputs.
2354    ///
2355    /// This method expects to be periodically called as part of instance maintenance work.
2356    /// We would like to instead update the warmup capabilities synchronously in response to
2357    /// frontier updates of dependency collections, but that is not generally possible because we
2358    /// don't learn about frontier updates of storage collections synchronously. We could do
2359    /// synchronous updates for compute dependencies, but we refrain from doing for simplicity.
2360    fn downgrade_warmup_capabilities(&mut self) {
2361        let mut new_capabilities = BTreeMap::new();
2362        for (id, collection) in &self.collections {
2363            // For write-only collections that have advanced to the empty frontier, we can drop the
2364            // warmup capability entirely. There is no reason why we would need to hydrate those
2365            // collections again, so being able to warm them up is not useful.
2366            if collection.read_policy.is_none()
2367                && collection.shared.lock_write_frontier(|f| f.is_empty())
2368            {
2369                new_capabilities.insert(*id, Antichain::new());
2370                continue;
2371            }
2372
2373            let mut new_capability = Antichain::new();
2374            for frontier in self.dependency_write_frontiers(collection) {
2375                for time in frontier {
2376                    new_capability.insert(time.step_back().unwrap_or(time));
2377                }
2378            }
2379
2380            new_capabilities.insert(*id, new_capability);
2381        }
2382
2383        for (id, new_capability) in new_capabilities {
2384            let collection = self.expect_collection_mut(id);
2385            let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2386        }
2387    }
2388
2389    /// Forward the implied capabilities of collections, if possible.
2390    ///
2391    /// The implied capability of a collection controls (a) which times are still readable (for
2392    /// indexes) and (b) with which as-of the collection gets installed on a new replica. We are
2393    /// usually not allowed to advance an implied capability beyond the frontier that follows from
2394    /// the collection's read policy applied to its write frontier:
2395    ///
2396    ///  * For sink collections, some external consumer might rely on seeing all distinct times in
2397    ///    the input reflected in the output. If we'd forward the implied capability of a sink,
2398    ///    we'd risk skipping times in the output across replica restarts.
2399    ///  * For index collections, we might make the index unreadable by advancing its read frontier
2400    ///    beyond its write frontier.
2401    ///
2402    /// There is one case where forwarding an implied capability is fine though: an index installed
2403    /// on a cluster that has no replicas. Such indexes are not readable anyway until a new replica
2404    /// is added, so advancing its read frontier can't make it unreadable. We can thus advance the
2405    /// implied capability as long as we make sure that when a new replica is added, the expected
2406    /// relationship between write frontier, read policy, and implied capability can be restored
2407    /// immediately (modulo computation time).
2408    ///
2409    /// Forwarding implied capabilities is not necessary for the correct functioning of the
2410    /// controller but an optimization that is beneficial in two ways:
2411    ///
2412    ///  * It relaxes read holds on inputs to forwarded collections, allowing their compaction.
2413    ///  * It reduces the amount of historical detail new replicas need to process when computing
2414    ///    forwarded collections, as forwarding the implied capability also forwards the corresponding
2415    ///    dataflow as-of.
2416    fn forward_implied_capabilities(&mut self) {
2417        if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2418            return;
2419        }
2420        if !self.replicas.is_empty() {
2421            return;
2422        }
2423
2424        let mut new_capabilities = BTreeMap::new();
2425        for (id, collection) in &self.collections {
2426            let Some(read_policy) = &collection.read_policy else {
2427                // Collection is write-only, i.e. a sink.
2428                continue;
2429            };
2430
2431            // When a new replica is started, it will immediately be able to compute all collection
2432            // output up to the write frontier of its transitive storage inputs. So the new implied
2433            // read capability should be the read policy applied to that frontier.
2434            let mut dep_frontier = Antichain::new();
2435            for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2436                dep_frontier.extend(frontier);
2437            }
2438
2439            let new_capability = read_policy.frontier(dep_frontier.borrow());
2440            if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2441                new_capabilities.insert(*id, new_capability);
2442            }
2443        }
2444
2445        for (id, new_capability) in new_capabilities {
2446            let collection = self.expect_collection_mut(id);
2447            let _ = collection.implied_read_hold.try_downgrade(new_capability);
2448        }
2449    }
2450
2451    /// Acquires a `ReadHold` for the identified compute collection.
2452    ///
2453    /// This mirrors the logic used by the controller-side `InstanceState::acquire_read_hold`,
2454    /// but executes on the instance task itself.
2455    fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
2456        // Similarly to InstanceState::acquire_read_hold and StorageCollections::acquire_read_holds,
2457        // we acquire read holds at the earliest possible time rather than returning a copy
2458        // of the implied read hold. This is so that dependents can acquire read holds on
2459        // compute dependencies at frontiers that are held back by other read holds the caller
2460        // has previously taken.
2461        let collection = self.collection(id)?;
2462        let since = collection.shared.lock_read_capabilities(|caps| {
2463            let since = caps.frontier().to_owned();
2464            caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
2465            since
2466        });
2467        let hold = ReadHold::new(id, since, Arc::clone(&self.read_hold_tx));
2468        Ok(hold)
2469    }
2470
2471    /// Process pending maintenance work.
2472    ///
2473    /// This method is invoked periodically by the global controller.
2474    /// It is a good place to perform maintenance work that arises from various controller state
2475    /// changes and that cannot conveniently be handled synchronously with those state changes.
2476    #[mz_ore::instrument(level = "debug")]
2477    pub fn maintain(&mut self) {
2478        self.rehydrate_failed_replicas();
2479        self.downgrade_warmup_capabilities();
2480        self.forward_implied_capabilities();
2481        self.schedule_collections();
2482        self.cleanup_collections();
2483        self.update_frontier_introspection();
2484        self.refresh_state_metrics();
2485        self.refresh_wallclock_lag();
2486    }
2487}
2488
2489/// State maintained about individual compute collections.
2490///
2491/// A compute collection is either an index, or a storage sink, or a subscribe, exported by a
2492/// compute dataflow.
2493#[derive(Debug)]
2494struct CollectionState<T: ComputeControllerTimestamp> {
2495    /// Whether this collection is a log collection.
2496    ///
2497    /// Log collections are special in that they are only maintained by a subset of all replicas.
2498    log_collection: bool,
2499    /// Whether this collection has been dropped by a controller client.
2500    ///
2501    /// The controller is allowed to remove the `CollectionState` for a collection only when
2502    /// `dropped == true`. Otherwise, clients might still expect to be able to query information
2503    /// about this collection.
2504    dropped: bool,
2505    /// Whether this collection has been scheduled, i.e., the controller has sent a `Schedule`
2506    /// command for it.
2507    scheduled: bool,
2508
2509    /// Whether this collection is in read-only mode.
2510    ///
2511    /// When in read-only mode, the dataflow is not allowed to affect external state (largely persist).
2512    read_only: bool,
2513
2514    /// State shared with the `ComputeController`.
2515    shared: SharedCollectionState<T>,
2516
2517    /// A read hold maintaining the implicit capability of the collection.
2518    ///
2519    /// This capability is kept to ensure that the collection remains readable according to its
2520    /// `read_policy`. It also ensures that read holds on the collection's dependencies are kept at
2521    /// some time not greater than the collection's `write_frontier`, guaranteeing that the
2522    /// collection's next outputs can always be computed without skipping times.
2523    implied_read_hold: ReadHold<T>,
2524    /// A read hold held to enable dataflow warmup.
2525    ///
2526    /// Dataflow warmup is an optimization that allows dataflows to immediately start hydrating
2527    /// even when their next output time (as implied by the `write_frontier`) is in the future.
2528    /// By installing a read capability derived from the write frontiers of the collection's
2529    /// inputs, we ensure that the as-of of new dataflows installed for the collection is at a time
2530    /// that is immediately available, so hydration can begin immediately too.
2531    warmup_read_hold: ReadHold<T>,
2532    /// The policy to use to downgrade `self.implied_read_hold`.
2533    ///
2534    /// If `None`, the collection is a write-only collection (i.e. a sink). For write-only
2535    /// collections, the `implied_read_hold` is only required for maintaining read holds on the
2536    /// inputs, so we can immediately downgrade it to the `write_frontier`.
2537    read_policy: Option<ReadPolicy<T>>,
2538
2539    /// Storage identifiers on which this collection depends, and read holds this collection
2540    /// requires on them.
2541    storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2542    /// Compute identifiers on which this collection depends, and read holds this collection
2543    /// requires on them.
2544    compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2545
2546    /// Introspection state associated with this collection.
2547    introspection: CollectionIntrospection<T>,
2548
2549    /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
2550    /// introspection update.
2551    ///
2552    /// Keys are `(period, lag, labels)` triples, values are counts.
2553    ///
2554    /// If this is `None`, wallclock lag is not tracked for this collection.
2555    wallclock_lag_histogram_stash: Option<
2556        BTreeMap<
2557            (
2558                WallclockLagHistogramPeriod,
2559                WallclockLag,
2560                BTreeMap<&'static str, String>,
2561            ),
2562            Diff,
2563        >,
2564    >,
2565}
2566
2567impl<T: ComputeControllerTimestamp> CollectionState<T> {
2568    /// Creates a new collection state, with an initial read policy valid from `since`.
2569    fn new(
2570        collection_id: GlobalId,
2571        as_of: Antichain<T>,
2572        shared: SharedCollectionState<T>,
2573        storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2574        compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2575        read_hold_tx: read_holds::ChangeTx<T>,
2576        introspection: CollectionIntrospection<T>,
2577    ) -> Self {
2578        // A collection is not readable before the `as_of`.
2579        let since = as_of.clone();
2580        // A collection won't produce updates for times before the `as_of`.
2581        let upper = as_of;
2582
2583        // Ensure that the provided `shared` is valid for the given `as_of`.
2584        assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2585        assert!(shared.lock_write_frontier(|f| f == &upper));
2586
2587        // Initialize collection read holds.
2588        // Note that the implied read hold was already added to the `read_capabilities` when
2589        // `shared` was created, so we only need to add the warmup read hold here.
2590        let implied_read_hold =
2591            ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2592        let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2593
2594        let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2595        shared.lock_read_capabilities(|c| {
2596            c.update_iter(updates);
2597        });
2598
2599        // In an effort to keep the produced wallclock lag introspection data small and
2600        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
2601        // select indexes and subscribes.
2602        let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2603            true => None,
2604            false => Some(Default::default()),
2605        };
2606
2607        Self {
2608            log_collection: false,
2609            dropped: false,
2610            scheduled: false,
2611            read_only: true,
2612            shared,
2613            implied_read_hold,
2614            warmup_read_hold,
2615            read_policy: Some(ReadPolicy::ValidFrom(since)),
2616            storage_dependencies,
2617            compute_dependencies,
2618            introspection,
2619            wallclock_lag_histogram_stash,
2620        }
2621    }
2622
2623    /// Creates a new collection state for a log collection.
2624    fn new_log_collection(
2625        id: GlobalId,
2626        shared: SharedCollectionState<T>,
2627        read_hold_tx: read_holds::ChangeTx<T>,
2628        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2629    ) -> Self {
2630        let since = Antichain::from_elem(T::minimum());
2631        let introspection =
2632            CollectionIntrospection::new(id, introspection_tx, since.clone(), false, None, None);
2633        let mut state = Self::new(
2634            id,
2635            since,
2636            shared,
2637            Default::default(),
2638            Default::default(),
2639            read_hold_tx,
2640            introspection,
2641        );
2642        state.log_collection = true;
2643        // Log collections are created and scheduled implicitly as part of replica initialization.
2644        state.scheduled = true;
2645        state
2646    }
2647
2648    /// Reports the current read frontier.
2649    fn read_frontier(&self) -> Antichain<T> {
2650        self.shared
2651            .lock_read_capabilities(|c| c.frontier().to_owned())
2652    }
2653
2654    /// Reports the current write frontier.
2655    fn write_frontier(&self) -> Antichain<T> {
2656        self.shared.lock_write_frontier(|f| f.clone())
2657    }
2658
2659    fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2660        self.storage_dependencies.keys().copied()
2661    }
2662
2663    fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2664        self.compute_dependencies.keys().copied()
2665    }
2666
2667    /// Reports the IDs of the dependencies of this collection.
2668    fn dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2669        self.compute_dependency_ids()
2670            .chain(self.storage_dependency_ids())
2671    }
2672}
2673
2674/// Collection state shared with the `ComputeController`.
2675///
2676/// Having this allows certain controller APIs, such as `ComputeController::collection_frontiers`
2677/// and `ComputeController::acquire_read_hold` to be non-`async`. This comes at the cost of
2678/// complexity (by introducing shared mutable state) and performance (by introducing locking). We
2679/// should aim to reduce the amount of shared state over time, rather than expand it.
2680///
2681/// Note that [`SharedCollectionState`]s are initialized by the `ComputeController` prior to the
2682/// collection's creation in the [`Instance`]. This is to allow compute clients to query frontiers
2683/// and take new read holds immediately, without having to wait for the [`Instance`] to update.
2684#[derive(Clone, Debug)]
2685pub(super) struct SharedCollectionState<T> {
2686    /// Accumulation of read capabilities for the collection.
2687    ///
2688    /// This accumulation contains the capabilities held by all [`ReadHold`]s given out for the
2689    /// collection, including `implied_read_hold` and `warmup_read_hold`.
2690    ///
2691    /// NOTE: This field may only be modified by [`Instance::apply_read_hold_change`],
2692    /// [`Instance::acquire_read_hold`], and `ComputeController::acquire_read_hold`.
2693    /// Nobody else should modify read capabilities directly. Instead, collection users should
2694    /// manage read holds through [`ReadHold`] objects acquired through
2695    /// `ComputeController::acquire_read_hold`.
2696    ///
2697    /// TODO(teskje): Restructure the code to enforce the above in the type system.
2698    read_capabilities: Arc<Mutex<MutableAntichain<T>>>,
2699    /// The write frontier of this collection.
2700    write_frontier: Arc<Mutex<Antichain<T>>>,
2701}
2702
2703impl<T: Timestamp> SharedCollectionState<T> {
2704    pub fn new(as_of: Antichain<T>) -> Self {
2705        // A collection is not readable before the `as_of`.
2706        let since = as_of.clone();
2707        // A collection won't produce updates for times before the `as_of`.
2708        let upper = as_of;
2709
2710        // Initialize read capabilities to the `since`.
2711        // The is the implied read capability. The corresponding [`ReadHold`] is created in
2712        // [`CollectionState::new`].
2713        let mut read_capabilities = MutableAntichain::new();
2714        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2715
2716        Self {
2717            read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2718            write_frontier: Arc::new(Mutex::new(upper)),
2719        }
2720    }
2721
2722    pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2723    where
2724        F: FnOnce(&mut MutableAntichain<T>) -> R,
2725    {
2726        let mut caps = self.read_capabilities.lock().expect("poisoned");
2727        f(&mut *caps)
2728    }
2729
2730    pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2731    where
2732        F: FnOnce(&mut Antichain<T>) -> R,
2733    {
2734        let mut frontier = self.write_frontier.lock().expect("poisoned");
2735        f(&mut *frontier)
2736    }
2737}
2738
2739/// Manages certain introspection relations associated with a collection. Upon creation, it adds
2740/// rows to introspection relations. When dropped, it retracts its managed rows.
2741///
2742/// TODO: `ComputeDependencies` could be moved under this.
2743#[derive(Debug)]
2744struct CollectionIntrospection<T: ComputeControllerTimestamp> {
2745    /// The ID of the compute collection.
2746    collection_id: GlobalId,
2747    /// A channel through which introspection updates are delivered.
2748    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2749    /// Introspection state for `IntrospectionType::Frontiers`.
2750    ///
2751    /// `Some` if the collection does _not_ sink into a storage collection (i.e. is not an MV). If
2752    /// the collection sinks into storage, the storage controller reports its frontiers instead.
2753    frontiers: Option<FrontiersIntrospectionState<T>>,
2754    /// Introspection state for `IntrospectionType::ComputeMaterializedViewRefreshes`.
2755    ///
2756    /// `Some` if the collection is a REFRESH MV.
2757    refresh: Option<RefreshIntrospectionState<T>>,
2758}
2759
2760impl<T: ComputeControllerTimestamp> CollectionIntrospection<T> {
2761    fn new(
2762        collection_id: GlobalId,
2763        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2764        as_of: Antichain<T>,
2765        storage_sink: bool,
2766        initial_as_of: Option<Antichain<T>>,
2767        refresh_schedule: Option<RefreshSchedule>,
2768    ) -> Self {
2769        let refresh =
2770            match (refresh_schedule, initial_as_of) {
2771                (Some(refresh_schedule), Some(initial_as_of)) => Some(
2772                    RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2773                ),
2774                (refresh_schedule, _) => {
2775                    // If we have a `refresh_schedule`, then the collection is a MV, so we should also have
2776                    // an `initial_as_of`.
2777                    soft_assert_or_log!(
2778                        refresh_schedule.is_none(),
2779                        "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2780                    );
2781                    None
2782                }
2783            };
2784        let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2785
2786        let self_ = Self {
2787            collection_id,
2788            introspection_tx,
2789            frontiers,
2790            refresh,
2791        };
2792
2793        self_.report_initial_state();
2794        self_
2795    }
2796
2797    /// Reports the initial introspection state.
2798    fn report_initial_state(&self) {
2799        if let Some(frontiers) = &self.frontiers {
2800            let row = frontiers.row_for_collection(self.collection_id);
2801            let updates = vec![(row, Diff::ONE)];
2802            self.send(IntrospectionType::Frontiers, updates);
2803        }
2804
2805        if let Some(refresh) = &self.refresh {
2806            let row = refresh.row_for_collection(self.collection_id);
2807            let updates = vec![(row, Diff::ONE)];
2808            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2809        }
2810    }
2811
2812    /// Observe the given current collection frontiers and update the introspection state as
2813    /// necessary.
2814    fn observe_frontiers(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2815        self.update_frontier_introspection(read_frontier, write_frontier);
2816        self.update_refresh_introspection(write_frontier);
2817    }
2818
2819    fn update_frontier_introspection(
2820        &mut self,
2821        read_frontier: &Antichain<T>,
2822        write_frontier: &Antichain<T>,
2823    ) {
2824        let Some(frontiers) = &mut self.frontiers else {
2825            return;
2826        };
2827
2828        if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2829        {
2830            return; // no change
2831        };
2832
2833        let retraction = frontiers.row_for_collection(self.collection_id);
2834        frontiers.update(read_frontier, write_frontier);
2835        let insertion = frontiers.row_for_collection(self.collection_id);
2836        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2837        self.send(IntrospectionType::Frontiers, updates);
2838    }
2839
2840    fn update_refresh_introspection(&mut self, write_frontier: &Antichain<T>) {
2841        let Some(refresh) = &mut self.refresh else {
2842            return;
2843        };
2844
2845        let retraction = refresh.row_for_collection(self.collection_id);
2846        refresh.frontier_update(write_frontier);
2847        let insertion = refresh.row_for_collection(self.collection_id);
2848
2849        if retraction == insertion {
2850            return; // no change
2851        }
2852
2853        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2854        self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2855    }
2856
2857    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2858        // Failure to send means the `ComputeController` has been dropped and doesn't care about
2859        // introspection updates anymore.
2860        let _ = self.introspection_tx.send((introspection_type, updates));
2861    }
2862}
2863
2864impl<T: ComputeControllerTimestamp> Drop for CollectionIntrospection<T> {
2865    fn drop(&mut self) {
2866        // Retract collection frontiers.
2867        if let Some(frontiers) = &self.frontiers {
2868            let row = frontiers.row_for_collection(self.collection_id);
2869            let updates = vec![(row, Diff::MINUS_ONE)];
2870            self.send(IntrospectionType::Frontiers, updates);
2871        }
2872
2873        // Retract MV refresh state.
2874        if let Some(refresh) = &self.refresh {
2875            let retraction = refresh.row_for_collection(self.collection_id);
2876            let updates = vec![(retraction, Diff::MINUS_ONE)];
2877            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2878        }
2879    }
2880}
2881
2882#[derive(Debug)]
2883struct FrontiersIntrospectionState<T> {
2884    read_frontier: Antichain<T>,
2885    write_frontier: Antichain<T>,
2886}
2887
2888impl<T: ComputeControllerTimestamp> FrontiersIntrospectionState<T> {
2889    fn new(as_of: Antichain<T>) -> Self {
2890        Self {
2891            read_frontier: as_of.clone(),
2892            write_frontier: as_of,
2893        }
2894    }
2895
2896    /// Return a `Row` reflecting the current collection frontiers.
2897    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2898        let read_frontier = self
2899            .read_frontier
2900            .as_option()
2901            .map_or(Datum::Null, |ts| ts.clone().into());
2902        let write_frontier = self
2903            .write_frontier
2904            .as_option()
2905            .map_or(Datum::Null, |ts| ts.clone().into());
2906        Row::pack_slice(&[
2907            Datum::String(&collection_id.to_string()),
2908            read_frontier,
2909            write_frontier,
2910        ])
2911    }
2912
2913    /// Update the introspection state with the given new frontiers.
2914    fn update(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2915        if read_frontier != &self.read_frontier {
2916            self.read_frontier.clone_from(read_frontier);
2917        }
2918        if write_frontier != &self.write_frontier {
2919            self.write_frontier.clone_from(write_frontier);
2920        }
2921    }
2922}
2923
2924/// Information needed to compute introspection updates for a REFRESH materialized view when the
2925/// write frontier advances.
2926#[derive(Debug)]
2927struct RefreshIntrospectionState<T> {
2928    // Immutable properties of the MV
2929    refresh_schedule: RefreshSchedule,
2930    initial_as_of: Antichain<T>,
2931    // Refresh state
2932    next_refresh: Datum<'static>,           // Null or an MzTimestamp
2933    last_completed_refresh: Datum<'static>, // Null or an MzTimestamp
2934}
2935
2936impl<T> RefreshIntrospectionState<T> {
2937    /// Return a `Row` reflecting the current refresh introspection state.
2938    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2939        Row::pack_slice(&[
2940            Datum::String(&collection_id.to_string()),
2941            self.last_completed_refresh,
2942            self.next_refresh,
2943        ])
2944    }
2945}
2946
2947impl<T: ComputeControllerTimestamp> RefreshIntrospectionState<T> {
2948    /// Construct a new [`RefreshIntrospectionState`], and apply an initial `frontier_update()` at
2949    /// the `upper`.
2950    fn new(
2951        refresh_schedule: RefreshSchedule,
2952        initial_as_of: Antichain<T>,
2953        upper: &Antichain<T>,
2954    ) -> Self {
2955        let mut self_ = Self {
2956            refresh_schedule: refresh_schedule.clone(),
2957            initial_as_of: initial_as_of.clone(),
2958            next_refresh: Datum::Null,
2959            last_completed_refresh: Datum::Null,
2960        };
2961        self_.frontier_update(upper);
2962        self_
2963    }
2964
2965    /// Should be called whenever the write frontier of the collection advances. It updates the
2966    /// state that should be recorded in introspection relations, but doesn't send the updates yet.
2967    fn frontier_update(&mut self, write_frontier: &Antichain<T>) {
2968        if write_frontier.is_empty() {
2969            self.last_completed_refresh =
2970                if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2971                    last_refresh.into()
2972                } else {
2973                    // If there is no last refresh, then we have a `REFRESH EVERY`, in which case
2974                    // the saturating roundup puts a refresh at the maximum possible timestamp.
2975                    T::maximum().into()
2976                };
2977            self.next_refresh = Datum::Null;
2978        } else {
2979            if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2980                // We are before the first refresh.
2981                self.last_completed_refresh = Datum::Null;
2982                let initial_as_of = self.initial_as_of.as_option().expect(
2983                    "initial_as_of can't be [], because then there would be no refreshes at all",
2984                );
2985                let first_refresh = initial_as_of
2986                    .round_up(&self.refresh_schedule)
2987                    .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2988                soft_assert_or_log!(
2989                    first_refresh == *initial_as_of,
2990                    "initial_as_of should be set to the first refresh"
2991                );
2992                self.next_refresh = first_refresh.into();
2993            } else {
2994                // The first refresh has already happened.
2995                let write_frontier = write_frontier.as_option().expect("checked above");
2996                self.last_completed_refresh = write_frontier
2997                    .round_down_minus_1(&self.refresh_schedule)
2998                    .map_or_else(
2999                        || {
3000                            soft_panic_or_log!(
3001                                "rounding down should have returned the first refresh or later"
3002                            );
3003                            Datum::Null
3004                        },
3005                        |last_completed_refresh| last_completed_refresh.into(),
3006                    );
3007                self.next_refresh = write_frontier.clone().into();
3008            }
3009        }
3010    }
3011}
3012
3013/// A note of an outstanding peek response.
3014#[derive(Debug)]
3015struct PendingPeek<T: Timestamp> {
3016    /// For replica-targeted peeks, this specifies the replica whose response we should pass on.
3017    ///
3018    /// If this value is `None`, we pass on the first response.
3019    target_replica: Option<ReplicaId>,
3020    /// The OpenTelemetry context for this peek.
3021    otel_ctx: OpenTelemetryContext,
3022    /// The time at which the peek was requested.
3023    ///
3024    /// Used to track peek durations.
3025    requested_at: Instant,
3026    /// The read hold installed to serve this peek.
3027    read_hold: ReadHold<T>,
3028    /// The channel to send peek results.
3029    peek_response_tx: oneshot::Sender<PeekResponse>,
3030    /// An optional limit of the peek's result size.
3031    limit: Option<usize>,
3032    /// The offset into the peek's result.
3033    offset: usize,
3034}
3035
3036#[derive(Debug, Clone)]
3037struct ActiveSubscribe<T> {
3038    /// Current upper frontier of this subscribe.
3039    frontier: Antichain<T>,
3040    /// For replica-targeted subscribes, this specifies the replica whose responses we should pass on.
3041    ///
3042    /// If this value is `None`, we pass on the first response for each time slice.
3043    target_replica: Option<ReplicaId>,
3044}
3045
3046impl<T: ComputeControllerTimestamp> ActiveSubscribe<T> {
3047    fn new(target_replica: Option<ReplicaId>) -> Self {
3048        Self {
3049            frontier: Antichain::from_elem(T::minimum()),
3050            target_replica,
3051        }
3052    }
3053}
3054
3055/// State maintained about individual replicas.
3056#[derive(Debug)]
3057struct ReplicaState<T: ComputeControllerTimestamp> {
3058    /// The ID of the replica.
3059    id: ReplicaId,
3060    /// Client for the running replica task.
3061    client: ReplicaClient<T>,
3062    /// The replica configuration.
3063    config: ReplicaConfig,
3064    /// Replica metrics.
3065    metrics: ReplicaMetrics,
3066    /// A channel through which introspection updates are delivered.
3067    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3068    /// Per-replica collection state.
3069    collections: BTreeMap<GlobalId, ReplicaCollectionState<T>>,
3070    /// The epoch of the replica.
3071    epoch: u64,
3072}
3073
3074impl<T: ComputeControllerTimestamp> ReplicaState<T> {
3075    fn new(
3076        id: ReplicaId,
3077        client: ReplicaClient<T>,
3078        config: ReplicaConfig,
3079        metrics: ReplicaMetrics,
3080        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3081        epoch: u64,
3082    ) -> Self {
3083        Self {
3084            id,
3085            client,
3086            config,
3087            metrics,
3088            introspection_tx,
3089            epoch,
3090            collections: Default::default(),
3091        }
3092    }
3093
3094    /// Add a collection to the replica state.
3095    ///
3096    /// # Panics
3097    ///
3098    /// Panics if a collection with the same ID exists already.
3099    fn add_collection(
3100        &mut self,
3101        id: GlobalId,
3102        as_of: Antichain<T>,
3103        input_read_holds: Vec<ReadHold<T>>,
3104    ) {
3105        let metrics = self.metrics.for_collection(id);
3106        let introspection = ReplicaCollectionIntrospection::new(
3107            self.id,
3108            id,
3109            self.introspection_tx.clone(),
3110            as_of.clone(),
3111        );
3112        let mut state =
3113            ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
3114
3115        // In an effort to keep the produced wallclock lag introspection data small and
3116        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
3117        // select indexes and subscribes.
3118        if id.is_transient() {
3119            state.wallclock_lag_max = None;
3120        }
3121
3122        if let Some(previous) = self.collections.insert(id, state) {
3123            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
3124        }
3125    }
3126
3127    /// Remove state for a collection.
3128    fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState<T>> {
3129        self.collections.remove(&id)
3130    }
3131
3132    /// Returns whether all replica frontiers of the given collection are empty.
3133    fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
3134        self.collections.get(&id).map_or(true, |c| {
3135            c.write_frontier.is_empty()
3136                && c.input_frontier.is_empty()
3137                && c.output_frontier.is_empty()
3138        })
3139    }
3140
3141    /// Returns the state of the [`ReplicaState`] formatted as JSON.
3142    ///
3143    /// The returned value is not guaranteed to be stable and may change at any point in time.
3144    #[mz_ore::instrument(level = "debug")]
3145    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3146        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
3147        // returned object as a tradeoff between usability and stability. `serde_json` will fail
3148        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
3149        // prevents a future unrelated change from silently breaking this method.
3150
3151        // Destructure `self` here so we don't forget to consider dumping newly added fields.
3152        let Self {
3153            id,
3154            client: _,
3155            config: _,
3156            metrics: _,
3157            introspection_tx: _,
3158            epoch,
3159            collections,
3160        } = self;
3161
3162        fn field(
3163            key: &str,
3164            value: impl Serialize,
3165        ) -> Result<(String, serde_json::Value), anyhow::Error> {
3166            let value = serde_json::to_value(value)?;
3167            Ok((key.to_string(), value))
3168        }
3169
3170        let collections: BTreeMap<_, _> = collections
3171            .iter()
3172            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
3173            .collect();
3174
3175        let map = serde_json::Map::from_iter([
3176            field("id", id.to_string())?,
3177            field("collections", collections)?,
3178            field("epoch", epoch)?,
3179        ]);
3180        Ok(serde_json::Value::Object(map))
3181    }
3182}
3183
3184#[derive(Debug)]
3185struct ReplicaCollectionState<T: ComputeControllerTimestamp> {
3186    /// The replica write frontier of this collection.
3187    ///
3188    /// See [`FrontiersResponse::write_frontier`].
3189    write_frontier: Antichain<T>,
3190    /// The replica input frontier of this collection.
3191    ///
3192    /// See [`FrontiersResponse::input_frontier`].
3193    input_frontier: Antichain<T>,
3194    /// The replica output frontier of this collection.
3195    ///
3196    /// See [`FrontiersResponse::output_frontier`].
3197    output_frontier: Antichain<T>,
3198
3199    /// Metrics tracked for this collection.
3200    ///
3201    /// If this is `None`, no metrics are collected.
3202    metrics: Option<ReplicaCollectionMetrics>,
3203    /// As-of frontier with which this collection was installed on the replica.
3204    as_of: Antichain<T>,
3205    /// Tracks introspection state for this collection.
3206    introspection: ReplicaCollectionIntrospection<T>,
3207    /// Read holds on storage inputs to this collection.
3208    ///
3209    /// These read holds are kept to ensure that the replica is able to read from storage inputs at
3210    /// all times it hasn't read yet. We only need to install read holds for storage inputs since
3211    /// compaction of compute inputs is implicitly held back by Timely/DD.
3212    input_read_holds: Vec<ReadHold<T>>,
3213
3214    /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
3215    ///
3216    /// If this is `None`, wallclock lag is not tracked for this collection.
3217    wallclock_lag_max: Option<WallclockLag>,
3218}
3219
3220impl<T: ComputeControllerTimestamp> ReplicaCollectionState<T> {
3221    fn new(
3222        metrics: Option<ReplicaCollectionMetrics>,
3223        as_of: Antichain<T>,
3224        introspection: ReplicaCollectionIntrospection<T>,
3225        input_read_holds: Vec<ReadHold<T>>,
3226    ) -> Self {
3227        Self {
3228            write_frontier: as_of.clone(),
3229            input_frontier: as_of.clone(),
3230            output_frontier: as_of.clone(),
3231            metrics,
3232            as_of,
3233            introspection,
3234            input_read_holds,
3235            wallclock_lag_max: Some(WallclockLag::MIN),
3236        }
3237    }
3238
3239    /// Returns whether this collection is hydrated.
3240    fn hydrated(&self) -> bool {
3241        // If the observed frontier is greater than the collection's as-of, the collection has
3242        // produced some output and is therefore hydrated.
3243        //
3244        // We need to consider the edge case where the as-of is the empty frontier. Such an as-of
3245        // is not useful for indexes, because they wouldn't be readable. For write-only
3246        // collections, an empty as-of means that the collection has been fully written and no new
3247        // dataflow needs to be created for it. Consequently, no hydration will happen either.
3248        //
3249        // Based on this, we could respond in two ways:
3250        //  * `false`, as in "the dataflow was never created"
3251        //  * `true`, as in "the dataflow completed immediately"
3252        //
3253        // Since hydration is often used as a measure of dataflow progress and we don't want to
3254        // give the impression that certain dataflows are somehow stuck when they are not, we go
3255        // with the second interpretation here.
3256        self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3257    }
3258
3259    /// Updates the replica write frontier of this collection.
3260    fn update_write_frontier(&mut self, new_frontier: Antichain<T>) {
3261        if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3262            soft_panic_or_log!(
3263                "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3264                self.write_frontier,
3265            );
3266            return;
3267        } else if new_frontier == self.write_frontier {
3268            return;
3269        }
3270
3271        self.write_frontier = new_frontier;
3272    }
3273
3274    /// Updates the replica input frontier of this collection.
3275    fn update_input_frontier(&mut self, new_frontier: Antichain<T>) {
3276        if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3277            soft_panic_or_log!(
3278                "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3279                self.input_frontier,
3280            );
3281            return;
3282        } else if new_frontier == self.input_frontier {
3283            return;
3284        }
3285
3286        self.input_frontier = new_frontier;
3287
3288        // Relax our read holds on collection inputs.
3289        for read_hold in &mut self.input_read_holds {
3290            let result = read_hold.try_downgrade(self.input_frontier.clone());
3291            soft_assert_or_log!(
3292                result.is_ok(),
3293                "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3294                self.input_frontier,
3295            );
3296        }
3297    }
3298
3299    /// Updates the replica output frontier of this collection.
3300    fn update_output_frontier(&mut self, new_frontier: Antichain<T>) {
3301        if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3302            soft_panic_or_log!(
3303                "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3304                self.output_frontier,
3305            );
3306            return;
3307        } else if new_frontier == self.output_frontier {
3308            return;
3309        }
3310
3311        self.output_frontier = new_frontier;
3312    }
3313}
3314
3315/// Maintains the introspection state for a given replica and collection, and ensures that reported
3316/// introspection data is retracted when the collection is dropped.
3317#[derive(Debug)]
3318struct ReplicaCollectionIntrospection<T: ComputeControllerTimestamp> {
3319    /// The ID of the replica.
3320    replica_id: ReplicaId,
3321    /// The ID of the compute collection.
3322    collection_id: GlobalId,
3323    /// The collection's reported replica write frontier.
3324    write_frontier: Antichain<T>,
3325    /// A channel through which introspection updates are delivered.
3326    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3327}
3328
3329impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
3330    /// Create a new `HydrationState` and initialize introspection.
3331    fn new(
3332        replica_id: ReplicaId,
3333        collection_id: GlobalId,
3334        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3335        as_of: Antichain<T>,
3336    ) -> Self {
3337        let self_ = Self {
3338            replica_id,
3339            collection_id,
3340            write_frontier: as_of,
3341            introspection_tx,
3342        };
3343
3344        self_.report_initial_state();
3345        self_
3346    }
3347
3348    /// Reports the initial introspection state.
3349    fn report_initial_state(&self) {
3350        let row = self.write_frontier_row();
3351        let updates = vec![(row, Diff::ONE)];
3352        self.send(IntrospectionType::ReplicaFrontiers, updates);
3353    }
3354
3355    /// Observe the given current write frontier and update the introspection state as necessary.
3356    fn observe_frontier(&mut self, write_frontier: &Antichain<T>) {
3357        if self.write_frontier == *write_frontier {
3358            return; // no change
3359        }
3360
3361        let retraction = self.write_frontier_row();
3362        self.write_frontier.clone_from(write_frontier);
3363        let insertion = self.write_frontier_row();
3364
3365        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3366        self.send(IntrospectionType::ReplicaFrontiers, updates);
3367    }
3368
3369    /// Return a `Row` reflecting the current replica write frontier.
3370    fn write_frontier_row(&self) -> Row {
3371        let write_frontier = self
3372            .write_frontier
3373            .as_option()
3374            .map_or(Datum::Null, |ts| ts.clone().into());
3375        Row::pack_slice(&[
3376            Datum::String(&self.collection_id.to_string()),
3377            Datum::String(&self.replica_id.to_string()),
3378            write_frontier,
3379        ])
3380    }
3381
3382    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3383        // Failure to send means the `ComputeController` has been dropped and doesn't care about
3384        // introspection updates anymore.
3385        let _ = self.introspection_tx.send((introspection_type, updates));
3386    }
3387}
3388
3389impl<T: ComputeControllerTimestamp> Drop for ReplicaCollectionIntrospection<T> {
3390    fn drop(&mut self) {
3391        // Retract the write frontier.
3392        let row = self.write_frontier_row();
3393        let updates = vec![(row, Diff::MINUS_ONE)];
3394        self.send(IntrospectionType::ReplicaFrontiers, updates);
3395    }
3396}