Skip to main content

mz_compute_client/controller/
instance.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller for a compute instance.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
21use mz_compute_types::plan::render_plan::RenderPlan;
22use mz_compute_types::sinks::{
23    ComputeSinkConnection, ComputeSinkDesc, MaterializedViewSinkConnection,
24};
25use mz_compute_types::sources::SourceInstanceDesc;
26use mz_controller_types::dyncfgs::{
27    ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, WALLCLOCK_LAG_RECORDING_INTERVAL,
28};
29use mz_dyncfg::{ConfigSet, ConfigUpdates};
30use mz_expr::RowSetFinishing;
31use mz_ore::cast::CastFrom;
32use mz_ore::channel::instrumented_unbounded_channel;
33use mz_ore::now::NowFn;
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{soft_assert_or_log, soft_panic_or_log};
36use mz_persist_types::PersistLocation;
37use mz_repr::adt::timestamp::CheckedTimestamp;
38use mz_repr::refresh_schedule::RefreshSchedule;
39use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row, Timestamp};
40use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
41use mz_storage_types::read_holds::{self, ReadHold};
42use mz_storage_types::read_policy::ReadPolicy;
43use thiserror::Error;
44use timely::PartialOrder;
45use timely::progress::frontier::MutableAntichain;
46use timely::progress::{Antichain, ChangeBatch};
47use tokio::sync::{mpsc, oneshot};
48use uuid::Uuid;
49
50use crate::controller::error::{
51    CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
52};
53use crate::controller::instance_client::PeekError;
54use crate::controller::replica::{ReplicaClient, ReplicaConfig};
55use crate::controller::{
56    ComputeControllerResponse, IntrospectionUpdates, PeekNotification, ReplicaId,
57    StorageCollections,
58};
59use crate::logging::LogVariant;
60use crate::metrics::IntCounter;
61use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
62use crate::protocol::command::{
63    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
64};
65use crate::protocol::history::ComputeCommandHistory;
66use crate::protocol::response::{
67    ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StatusResponse,
68    SubscribeBatch, SubscribeResponse,
69};
70
71#[derive(Error, Debug)]
72#[error("replica exists already: {0}")]
73pub(super) struct ReplicaExists(pub ReplicaId);
74
75#[derive(Error, Debug)]
76#[error("replica does not exist: {0}")]
77pub(super) struct ReplicaMissing(pub ReplicaId);
78
79#[derive(Error, Debug)]
80pub(super) enum DataflowCreationError {
81    #[error("collection does not exist: {0}")]
82    CollectionMissing(GlobalId),
83    #[error("replica does not exist: {0}")]
84    ReplicaMissing(ReplicaId),
85    #[error("dataflow definition lacks an as_of value")]
86    MissingAsOf,
87    #[error("subscribe dataflow has an empty as_of")]
88    EmptyAsOfForSubscribe,
89    #[error("copy to dataflow has an empty as_of")]
90    EmptyAsOfForCopyTo,
91    #[error("no read hold provided for dataflow import: {0}")]
92    ReadHoldMissing(GlobalId),
93    #[error("insufficient read hold provided for dataflow import: {0}")]
94    ReadHoldInsufficient(GlobalId),
95}
96
97impl From<CollectionMissing> for DataflowCreationError {
98    fn from(error: CollectionMissing) -> Self {
99        Self::CollectionMissing(error.0)
100    }
101}
102
103#[derive(Error, Debug)]
104pub(super) enum ReadPolicyError {
105    #[error("collection does not exist: {0}")]
106    CollectionMissing(GlobalId),
107    #[error("collection is write-only: {0}")]
108    WriteOnlyCollection(GlobalId),
109}
110
111impl From<CollectionMissing> for ReadPolicyError {
112    fn from(error: CollectionMissing) -> Self {
113        Self::CollectionMissing(error.0)
114    }
115}
116
117/// A command sent to an [`Instance`] task.
118pub(super) type Command = Box<dyn FnOnce(&mut Instance) + Send>;
119
120/// A response from a replica, composed of a replica ID, the replica's current epoch, and the
121/// compute response itself.
122pub(super) type ReplicaResponse = (ReplicaId, u64, ComputeResponse);
123
124/// The state we keep for a compute instance.
125pub(super) struct Instance {
126    /// Build info for spawning replicas
127    build_info: &'static BuildInfo,
128    /// A handle providing access to storage collections.
129    storage_collections: StorageCollections,
130    /// Whether instance initialization has been completed.
131    initialized: bool,
132    /// Whether this instance is in read-only mode.
133    ///
134    /// When in read-only mode, this instance will not update persistent state, such as
135    /// wallclock lag introspection.
136    read_only: bool,
137    /// The workload class of this instance.
138    ///
139    /// This is currently only used to annotate metrics.
140    workload_class: Option<String>,
141    /// The replicas of this compute instance.
142    replicas: BTreeMap<ReplicaId, ReplicaState>,
143    /// Per-replica dyncfg overrides, merged into the `UpdateConfiguration`
144    /// command sent to each replica (and into the command-history replay used
145    /// to hydrate new replicas). Populated from the scoped feature flags
146    /// (replica-local) layer; empty by default, in which case every replica
147    /// receives the unmodified environment-wide configuration. Stores only the
148    /// values that differ from the environment-wide value, so the map is sparse.
149    replica_dyncfg_overrides: BTreeMap<ReplicaId, ConfigUpdates>,
150    /// Currently installed compute collections.
151    ///
152    /// New entries are added for all collections exported from dataflows created through
153    /// [`Instance::create_dataflow`].
154    ///
155    /// Entries are removed by [`Instance::cleanup_collections`]. See that method's documentation
156    /// about the conditions for removing collection state.
157    collections: BTreeMap<GlobalId, CollectionState>,
158    /// IDs of log sources maintained by this compute instance.
159    log_sources: BTreeMap<LogVariant, GlobalId>,
160    /// Currently outstanding peeks.
161    ///
162    /// New entries are added for all peeks initiated through [`Instance::peek`].
163    ///
164    /// The entry for a peek is only removed once all replicas have responded to the peek. This is
165    /// currently required to ensure all replicas have stopped reading from the peeked collection's
166    /// inputs before we allow them to compact. database-issues#4822 tracks changing this so we only have to wait
167    /// for the first peek response.
168    peeks: BTreeMap<Uuid, PendingPeek>,
169    /// Currently in-progress subscribes.
170    ///
171    /// New entries are added for all subscribes exported from dataflows created through
172    /// [`Instance::create_dataflow`].
173    ///
174    /// The entry for a subscribe is removed once at least one replica has reported the subscribe
175    /// to have advanced to the empty frontier or to have been dropped, implying that no further
176    /// updates will be emitted for this subscribe.
177    ///
178    /// Note that subscribes are tracked both in `collections` and `subscribes`. `collections`
179    /// keeps track of the subscribe's upper and since frontiers and ensures appropriate read holds
180    /// on the subscribe's input. `subscribes` is only used to track which updates have been
181    /// emitted, to decide if new ones should be emitted or suppressed.
182    subscribes: BTreeMap<GlobalId, ActiveSubscribe>,
183    /// Tracks all in-progress COPY TOs.
184    ///
185    /// New entries are added for all s3 oneshot sinks (corresponding to a COPY TO) exported from
186    /// dataflows created through [`Instance::create_dataflow`].
187    ///
188    /// The entry for a copy to is removed once at least one replica has finished
189    /// or the exporting collection is dropped.
190    copy_tos: BTreeSet<GlobalId>,
191    /// The command history, used when introducing new replicas or restarting existing replicas.
192    history: ComputeCommandHistory<UIntGauge>,
193    /// Receiver for commands to be executed.
194    command_rx: mpsc::UnboundedReceiver<Command>,
195    /// Sender for responses to be delivered.
196    response_tx: mpsc::UnboundedSender<ComputeControllerResponse>,
197    /// Sender for introspection updates to be recorded.
198    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
199    /// The registry the controller uses to report metrics.
200    metrics: InstanceMetrics,
201    /// Dynamic system configuration.
202    dyncfg: Arc<ConfigSet>,
203
204    /// The persist location where we can stash large peek results.
205    peek_stash_persist_location: PersistLocation,
206
207    /// A function that produces the current wallclock time.
208    now: NowFn,
209    /// A function that computes the lag between the given time and wallclock time.
210    wallclock_lag: WallclockLagFn<Timestamp>,
211    /// The last time wallclock lag introspection was recorded.
212    wallclock_lag_last_recorded: DateTime<Utc>,
213
214    /// Sender for updates to collection read holds.
215    ///
216    /// Copies of this sender are given to [`ReadHold`]s that are created in
217    /// [`CollectionState::new`].
218    read_hold_tx: read_holds::ChangeTx,
219    /// A sender for responses from replicas.
220    replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse, IntCounter>,
221    /// A receiver for responses from replicas.
222    replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse, IntCounter>,
223}
224
225impl Instance {
226    /// Acquire a handle to the collection state associated with `id`.
227    fn collection(&self, id: GlobalId) -> Result<&CollectionState, CollectionMissing> {
228        self.collections.get(&id).ok_or(CollectionMissing(id))
229    }
230
231    /// Acquire a mutable handle to the collection state associated with `id`.
232    fn collection_mut(&mut self, id: GlobalId) -> Result<&mut CollectionState, CollectionMissing> {
233        self.collections.get_mut(&id).ok_or(CollectionMissing(id))
234    }
235
236    /// Acquire a handle to the collection state associated with `id`.
237    ///
238    /// # Panics
239    ///
240    /// Panics if the identified collection does not exist.
241    fn expect_collection(&self, id: GlobalId) -> &CollectionState {
242        self.collections.get(&id).expect("collection must exist")
243    }
244
245    /// Acquire a mutable handle to the collection state associated with `id`.
246    ///
247    /// # Panics
248    ///
249    /// Panics if the identified collection does not exist.
250    fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
251        self.collections
252            .get_mut(&id)
253            .expect("collection must exist")
254    }
255
256    fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState)> {
257        self.collections.iter().map(|(id, coll)| (*id, coll))
258    }
259
260    /// Returns an iterator over replicas that host the given collection.
261    ///
262    /// For replica-targeted collections, this returns only the target replica.
263    /// For non-targeted collections, this returns all replicas.
264    ///
265    /// Returns `Err` if the collection does not exist.
266    fn replicas_hosting(
267        &self,
268        id: GlobalId,
269    ) -> Result<impl Iterator<Item = &ReplicaState>, CollectionMissing> {
270        let target = self.collection(id)?.target_replica;
271        Ok(self
272            .replicas
273            .values()
274            .filter(move |r| target.map_or(true, |t| t == r.id)))
275    }
276
277    /// Add a collection to the instance state.
278    ///
279    /// # Panics
280    ///
281    /// Panics if a collection with the same ID exists already.
282    fn add_collection(
283        &mut self,
284        id: GlobalId,
285        as_of: Antichain<Timestamp>,
286        shared: SharedCollectionState,
287        storage_dependencies: BTreeMap<GlobalId, ReadHold>,
288        compute_dependencies: BTreeMap<GlobalId, ReadHold>,
289        replica_input_read_holds: Vec<ReadHold>,
290        write_only: bool,
291        storage_sink: bool,
292        initial_as_of: Option<Antichain<Timestamp>>,
293        refresh_schedule: Option<RefreshSchedule>,
294        target_replica: Option<ReplicaId>,
295    ) {
296        // Add global collection state.
297        let dependency_ids: Vec<GlobalId> = compute_dependencies
298            .keys()
299            .chain(storage_dependencies.keys())
300            .copied()
301            .collect();
302        let introspection = CollectionIntrospection::new(
303            id,
304            self.introspection_tx.clone(),
305            as_of.clone(),
306            storage_sink,
307            initial_as_of,
308            refresh_schedule,
309            dependency_ids,
310        );
311        let mut state = CollectionState::new(
312            id,
313            as_of.clone(),
314            shared,
315            storage_dependencies,
316            compute_dependencies,
317            Arc::clone(&self.read_hold_tx),
318            introspection,
319        );
320        state.target_replica = target_replica;
321        // If the collection is write-only, clear its read policy to reflect that.
322        if write_only {
323            state.read_policy = None;
324        }
325
326        if let Some(previous) = self.collections.insert(id, state) {
327            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
328        }
329
330        // Add per-replica collection state.
331        for replica in self.replicas.values_mut() {
332            if target_replica.is_some_and(|id| id != replica.id) {
333                continue;
334            }
335            replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
336        }
337    }
338
339    fn remove_collection(&mut self, id: GlobalId) {
340        // Remove per-replica collection state.
341        for replica in self.replicas.values_mut() {
342            replica.remove_collection(id);
343        }
344
345        // Remove global collection state.
346        self.collections.remove(&id);
347    }
348
349    fn add_replica_state(
350        &mut self,
351        id: ReplicaId,
352        client: ReplicaClient,
353        config: ReplicaConfig,
354        epoch: u64,
355    ) -> Result<(), read_holds::ReadHoldIssuerHungUp> {
356        let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
357
358        let metrics = self.metrics.for_replica(id);
359        let mut replica = ReplicaState::new(
360            id,
361            client,
362            config,
363            metrics,
364            self.introspection_tx.clone(),
365            epoch,
366        );
367
368        // Add per-replica collection state.
369        let mut shutdown_input = None;
370        for (collection_id, collection) in &self.collections {
371            // Skip log collections not maintained by this replica,
372            // and collections targeted at a different replica.
373            if (collection.log_collection && !log_ids.contains(collection_id))
374                || collection.target_replica.is_some_and(|rid| rid != id)
375            {
376                continue;
377            }
378
379            let as_of = if collection.log_collection {
380                // For log collections, we don't send a `CreateDataflow` command to the replica, so
381                // it doesn't know which as-of the controler chose and defaults to the minimum
382                // frontier instead. We need to initialize the controller-side tracking with the
383                // same frontier, to avoid observing regressions in the reported frontiers.
384                Antichain::from_elem(Timestamp::MIN)
385            } else {
386                collection.read_frontier().to_owned()
387            };
388
389            // Cloning a `ReadHold` fails when its issuer has hung up. For these holds the issuer
390            // is the `StorageCollections`, which doesn't hang up as long as the `Instance` exists,
391            // except during process shutdown, when the tokio runtime drops tasks in arbitrary
392            // order. In that case there is no way of correctly initializing the per-replica
393            // collection state, so we give up. We still add the replica itself, to keep the
394            // bookkeeping consistent with the controller's, and then signal the unrecoverable
395            // error to the caller, which shuts the instance down.
396            let mut input_read_holds = Vec::with_capacity(collection.storage_dependencies.len());
397            let mut hung_up = Vec::new();
398            for hold in collection.storage_dependencies.values() {
399                match hold.try_clone() {
400                    Ok(hold) => input_read_holds.push(hold),
401                    Err(read_holds::ReadHoldIssuerHungUp(input_id)) => hung_up.push(input_id),
402                }
403            }
404            if !hung_up.is_empty() {
405                tracing::error!(
406                    replica_id = %id,
407                    %collection_id,
408                    ?hung_up,
409                    "giving up on adding replica collections: storage read hold issuers hung \
410                     up, the process is shutting down",
411                );
412                shutdown_input = hung_up.into_iter().next();
413                break;
414            }
415
416            replica.add_collection(*collection_id, as_of, input_read_holds);
417        }
418
419        self.replicas.insert(id, replica);
420
421        match shutdown_input {
422            Some(input_id) => Err(read_holds::ReadHoldIssuerHungUp(input_id)),
423            None => Ok(()),
424        }
425    }
426
427    /// Enqueue the given response for delivery to the controller clients.
428    fn deliver_response(&self, response: ComputeControllerResponse) {
429        // Failure to send means the `ComputeController` has been dropped and doesn't care about
430        // responses anymore.
431        let _ = self.response_tx.send(response);
432    }
433
434    /// Enqueue the given introspection updates for recording.
435    fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
436        // Failure to send means the `ComputeController` has been dropped and doesn't care about
437        // introspection updates anymore.
438        let _ = self.introspection_tx.send((type_, updates));
439    }
440
441    /// Returns whether the identified replica exists.
442    fn replica_exists(&self, id: ReplicaId) -> bool {
443        self.replicas.contains_key(&id)
444    }
445
446    /// Return the IDs of pending peeks targeting the specified replica.
447    fn peeks_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = (Uuid, &PendingPeek)> {
448        self.peeks.iter().filter_map(move |(uuid, peek)| {
449            if peek.target_replica == Some(replica_id) {
450                Some((*uuid, peek))
451            } else {
452                None
453            }
454        })
455    }
456
457    /// Return the IDs of in-progress subscribes targeting the specified replica.
458    fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
459        self.subscribes.keys().copied().filter(move |id| {
460            let collection = self.expect_collection(*id);
461            collection.target_replica == Some(replica_id)
462        })
463    }
464
465    /// Update introspection with the current collection frontiers.
466    ///
467    /// We could also do this directly in response to frontier changes, but doing it periodically
468    /// lets us avoid emitting some introspection updates that can be consolidated (e.g. a write
469    /// frontier updated immediately followed by a read frontier update).
470    ///
471    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
472    /// per second during normal operation.
473    fn update_frontier_introspection(&mut self) {
474        for collection in self.collections.values_mut() {
475            collection
476                .introspection
477                .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
478        }
479
480        for replica in self.replicas.values_mut() {
481            for collection in replica.collections.values_mut() {
482                collection
483                    .introspection
484                    .observe_frontier(&collection.write_frontier);
485            }
486        }
487    }
488
489    /// Refresh the controller state metrics for this instance.
490    ///
491    /// We could also do state metric updates directly in response to state changes, but that would
492    /// mean littering the code with metric update calls. Encapsulating state metric maintenance in
493    /// a single method is less noisy.
494    ///
495    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
496    /// per second during normal operation.
497    fn refresh_state_metrics(&self) {
498        let unscheduled_collections_count =
499            self.collections.values().filter(|c| !c.scheduled).count();
500        let connected_replica_count = self
501            .replicas
502            .values()
503            .filter(|r| r.client.is_connected())
504            .count();
505
506        self.metrics
507            .replica_count
508            .set(u64::cast_from(self.replicas.len()));
509        self.metrics
510            .collection_count
511            .set(u64::cast_from(self.collections.len()));
512        self.metrics
513            .collection_unscheduled_count
514            .set(u64::cast_from(unscheduled_collections_count));
515        self.metrics
516            .peek_count
517            .set(u64::cast_from(self.peeks.len()));
518        self.metrics
519            .subscribe_count
520            .set(u64::cast_from(self.subscribes.len()));
521        self.metrics
522            .copy_to_count
523            .set(u64::cast_from(self.copy_tos.len()));
524        self.metrics
525            .connected_replica_count
526            .set(u64::cast_from(connected_replica_count));
527    }
528
529    /// Refresh the wallclock lag introspection and metrics with the current lag values.
530    ///
531    /// This method produces wallclock lag metrics of two different shapes:
532    ///
533    /// * Histories: For each replica and each collection, we measure the lag of the write frontier
534    ///   behind the wallclock time every second. Every minute we emit the maximum lag observed
535    ///   over the last minute, together with the current time.
536    /// * Histograms: For each collection, we measure the lag of the write frontier behind
537    ///   wallclock time every second. Every minute we emit all lags observed over the last minute,
538    ///   together with the current histogram period.
539    ///
540    /// Histories are emitted to both Mz introspection and Prometheus, histograms only to
541    /// introspection. We treat lags of unreadable collections (i.e. collections that contain no
542    /// readable times) as undefined and set them to NULL in introspection and `u64::MAX` in
543    /// Prometheus.
544    ///
545    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
546    /// per second during normal operation.
547    fn refresh_wallclock_lag(&mut self) {
548        let frontier_lag = |frontier: &Antichain<Timestamp>| match frontier.as_option() {
549            Some(ts) => (self.wallclock_lag)(ts.clone()),
550            None => Duration::ZERO,
551        };
552
553        let now_ms = (self.now)();
554        let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
555        let histogram_labels = match &self.workload_class {
556            Some(wc) => [("workload_class", wc.clone())].into(),
557            None => BTreeMap::new(),
558        };
559
560        // For collections that sink into storage, we need to ask the storage controller to know
561        // whether they're currently readable.
562        let readable_storage_collections: BTreeSet<_> = self
563            .collections
564            .keys()
565            .filter_map(|id| {
566                let frontiers = self.storage_collections.collection_frontiers(*id).ok()?;
567                PartialOrder::less_than(&frontiers.read_capabilities, &frontiers.write_frontier)
568                    .then_some(*id)
569            })
570            .collect();
571
572        // First, iterate over all collections and collect histogram measurements.
573        for (id, collection) in &mut self.collections {
574            let write_frontier = collection.write_frontier();
575            let readable = if self.storage_collections.check_exists(*id).is_ok() {
576                readable_storage_collections.contains(id)
577            } else {
578                PartialOrder::less_than(&collection.read_frontier(), &write_frontier)
579            };
580
581            if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
582                let bucket = if readable {
583                    let lag = frontier_lag(&write_frontier);
584                    let lag = lag.as_secs().next_power_of_two();
585                    WallclockLag::Seconds(lag)
586                } else {
587                    WallclockLag::Undefined
588                };
589
590                let key = (histogram_period, bucket, histogram_labels.clone());
591                *stash.entry(key).or_default() += Diff::ONE;
592            }
593        }
594
595        // Second, iterate over all per-replica collections and collect history measurements.
596        for replica in self.replicas.values_mut() {
597            for (id, collection) in &mut replica.collections {
598                // A per-replica collection is considered readable in the context of lag
599                // measurement if either:
600                //  (a) it sinks into a storage collection that is readable
601                //  (b) it is hydrated
602                let readable = readable_storage_collections.contains(id) || collection.hydrated();
603
604                let lag = if readable {
605                    let lag = frontier_lag(&collection.write_frontier);
606                    WallclockLag::Seconds(lag.as_secs())
607                } else {
608                    WallclockLag::Undefined
609                };
610
611                if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
612                    *wallclock_lag_max = (*wallclock_lag_max).max(lag);
613                }
614
615                if let Some(metrics) = &mut collection.metrics {
616                    // No way to specify values as undefined in Prometheus metrics, so we use the
617                    // maximum value instead.
618                    let secs = lag.unwrap_seconds_or(u64::MAX);
619                    metrics.wallclock_lag.observe(secs);
620                };
621            }
622        }
623
624        // Record lags to persist, if it's time.
625        self.maybe_record_wallclock_lag();
626    }
627
628    /// Produce new wallclock lag introspection updates, provided enough time has passed since the
629    /// last recording.
630    //
631    /// We emit new introspection updates if the system time has passed into a new multiple of the
632    /// recording interval (typically 1 minute) since the last refresh. The storage controller uses
633    /// the same approach, ensuring that both controllers commit their lags at roughly the same
634    /// time, avoiding confusion caused by inconsistencies.
635    fn maybe_record_wallclock_lag(&mut self) {
636        if self.read_only {
637            return;
638        }
639
640        let duration_trunc = |datetime: DateTime<_>, interval| {
641            let td = TimeDelta::from_std(interval).ok()?;
642            datetime.duration_trunc(td).ok()
643        };
644
645        let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
646        let now_dt = mz_ore::now::to_datetime((self.now)());
647        let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
648            soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
649            let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
650            duration_trunc(now_dt, *default).unwrap()
651        });
652        if now_trunc <= self.wallclock_lag_last_recorded {
653            return;
654        }
655
656        let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
657
658        let mut history_updates = Vec::new();
659        for (replica_id, replica) in &mut self.replicas {
660            for (collection_id, collection) in &mut replica.collections {
661                let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
662                    continue;
663                };
664
665                let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
666                let row = Row::pack_slice(&[
667                    Datum::String(&collection_id.to_string()),
668                    Datum::String(&replica_id.to_string()),
669                    max_lag.into_interval_datum(),
670                    Datum::TimestampTz(now_ts),
671                ]);
672                history_updates.push((row, Diff::ONE));
673            }
674        }
675        if !history_updates.is_empty() {
676            self.deliver_introspection_updates(
677                IntrospectionType::WallclockLagHistory,
678                history_updates,
679            );
680        }
681
682        let mut histogram_updates = Vec::new();
683        let mut row_buf = Row::default();
684        for (collection_id, collection) in &mut self.collections {
685            let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
686                continue;
687            };
688
689            for ((period, lag, labels), count) in std::mem::take(stash) {
690                let mut packer = row_buf.packer();
691                packer.extend([
692                    Datum::TimestampTz(period.start),
693                    Datum::TimestampTz(period.end),
694                    Datum::String(&collection_id.to_string()),
695                    lag.into_uint64_datum(),
696                ]);
697                let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
698                packer.push_dict(labels);
699
700                histogram_updates.push((row_buf.clone(), count));
701            }
702        }
703        if !histogram_updates.is_empty() {
704            self.deliver_introspection_updates(
705                IntrospectionType::WallclockLagHistogram,
706                histogram_updates,
707            );
708        }
709
710        self.wallclock_lag_last_recorded = now_trunc;
711    }
712
713    /// Returns `true` if the given collection is hydrated on at least one
714    /// replica.
715    ///
716    /// This also returns `true` in case this cluster does not have any
717    /// replicas that host the given collection.
718    #[mz_ore::instrument(level = "debug")]
719    pub fn collection_hydrated(&self, collection_id: GlobalId) -> Result<bool, CollectionMissing> {
720        let mut hosting_replicas = self.replicas_hosting(collection_id)?.peekable();
721        if hosting_replicas.peek().is_none() {
722            return Ok(true);
723        }
724        for replica_state in hosting_replicas {
725            let collection_state = replica_state
726                .collections
727                .get(&collection_id)
728                .expect("hosting replica must have per-replica collection state");
729
730            if collection_state.hydrated() {
731                return Ok(true);
732            }
733        }
734
735        Ok(false)
736    }
737
738    /// Returns `true` if each non-transient, non-excluded collection is hydrated on at
739    /// least one replica.
740    ///
741    /// This also returns `true` in case this cluster does not have any
742    /// replicas.
743    #[mz_ore::instrument(level = "debug")]
744    pub fn collections_hydrated_on_replicas(
745        &self,
746        target_replica_ids: Option<Vec<ReplicaId>>,
747        exclude_collections: &BTreeSet<GlobalId>,
748    ) -> Result<bool, HydrationCheckBadTarget> {
749        if self.replicas.is_empty() {
750            return Ok(true);
751        }
752        let mut all_hydrated = true;
753        let target_replicas: BTreeSet<ReplicaId> = self
754            .replicas
755            .keys()
756            .filter_map(|id| match target_replica_ids {
757                None => Some(id.clone()),
758                Some(ref ids) if ids.contains(id) => Some(id.clone()),
759                Some(_) => None,
760            })
761            .collect();
762        if let Some(targets) = target_replica_ids {
763            if target_replicas.is_empty() {
764                return Err(HydrationCheckBadTarget(targets));
765            }
766        }
767
768        for (id, _collection) in self.collections_iter() {
769            if id.is_transient() || exclude_collections.contains(&id) {
770                continue;
771            }
772
773            let mut collection_hydrated = false;
774            // `replicas_hosting` cannot fail here because `collections_iter`
775            // only yields collections that exist.
776            for replica_state in self.replicas_hosting(id).expect("collection must exist") {
777                if !target_replicas.contains(&replica_state.id) {
778                    continue;
779                }
780                let collection_state = replica_state
781                    .collections
782                    .get(&id)
783                    .expect("hosting replica must have per-replica collection state");
784
785                if collection_state.hydrated() {
786                    collection_hydrated = true;
787                    break;
788                }
789            }
790
791            if !collection_hydrated {
792                tracing::info!("collection {id} is not hydrated on any replica");
793                all_hydrated = false;
794                // We continue with our loop instead of breaking out early, so
795                // that we log all non-hydrated replicas.
796            }
797        }
798
799        Ok(all_hydrated)
800    }
801
802    /// Clean up collection state that is not needed anymore.
803    ///
804    /// Three conditions need to be true before we can remove state for a collection:
805    ///
806    ///  1. A client must have explicitly dropped the collection. If that is not the case, clients
807    ///     can still reasonably assume that the controller knows about the collection and can
808    ///     answer queries about it.
809    ///  2. There must be no outstanding read capabilities on the collection. As long as someone
810    ///     still holds read capabilities on a collection, we need to keep it around to be able
811    ///     to properly handle downgrading of said capabilities.
812    ///  3. All replica frontiers for the collection must have advanced to the empty frontier.
813    ///     Advancement to the empty frontiers signals that replicas are done computing the
814    ///     collection and that they won't send more `ComputeResponse`s for it. As long as we might
815    ///     receive responses for a collection we want to keep it around to be able to validate and
816    ///     handle these responses.
817    fn cleanup_collections(&mut self) {
818        let to_remove: Vec<_> = self
819            .collections_iter()
820            .filter(|(id, collection)| {
821                collection.dropped
822                    && collection.shared.lock_read_capabilities(|c| c.is_empty())
823                    && self
824                        .replicas
825                        .values()
826                        .all(|r| r.collection_frontiers_empty(*id))
827            })
828            .map(|(id, _collection)| id)
829            .collect();
830
831        for id in to_remove {
832            self.remove_collection(id);
833        }
834    }
835
836    /// Returns the state of the [`Instance`] formatted as JSON.
837    ///
838    /// The returned value is not guaranteed to be stable and may change at any point in time.
839    #[mz_ore::instrument(level = "debug")]
840    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
841        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
842        // returned object as a tradeoff between usability and stability. `serde_json` will fail
843        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
844        // prevents a future unrelated change from silently breaking this method.
845
846        // Destructure `self` here so we don't forget to consider dumping newly added fields.
847        let Self {
848            build_info: _,
849            storage_collections: _,
850            peek_stash_persist_location: _,
851            initialized,
852            read_only,
853            workload_class,
854            replicas,
855            replica_dyncfg_overrides: _,
856            collections,
857            log_sources: _,
858            peeks,
859            subscribes,
860            copy_tos,
861            history: _,
862            command_rx: _,
863            response_tx: _,
864            introspection_tx: _,
865            metrics: _,
866            dyncfg: _,
867            now: _,
868            wallclock_lag: _,
869            wallclock_lag_last_recorded,
870            read_hold_tx: _,
871            replica_tx: _,
872            replica_rx: _,
873        } = self;
874
875        let replicas: BTreeMap<_, _> = replicas
876            .iter()
877            .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
878            .collect::<Result<_, anyhow::Error>>()?;
879        let collections: BTreeMap<_, _> = collections
880            .iter()
881            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
882            .collect();
883        let peeks: BTreeMap<_, _> = peeks
884            .iter()
885            .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
886            .collect();
887        let subscribes: BTreeMap<_, _> = subscribes
888            .iter()
889            .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
890            .collect();
891        let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
892        let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
893
894        Ok(serde_json::json!({
895            "initialized": initialized,
896            "read_only": read_only,
897            "workload_class": workload_class,
898            "replicas": replicas,
899            "collections": collections,
900            "peeks": peeks,
901            "subscribes": subscribes,
902            "copy_tos": copy_tos,
903            "wallclock_lag_last_recorded": wallclock_lag_last_recorded,
904        }))
905    }
906
907    /// Reports the current write frontier for the identified compute collection.
908    pub(super) fn collection_write_frontier(
909        &self,
910        id: GlobalId,
911    ) -> Result<Antichain<Timestamp>, CollectionMissing> {
912        Ok(self.collection(id)?.write_frontier())
913    }
914}
915
916impl Instance {
917    pub(super) fn new(
918        build_info: &'static BuildInfo,
919        storage: StorageCollections,
920        peek_stash_persist_location: PersistLocation,
921        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState)>,
922        metrics: InstanceMetrics,
923        now: NowFn,
924        wallclock_lag: WallclockLagFn<Timestamp>,
925        dyncfg: Arc<ConfigSet>,
926        command_rx: mpsc::UnboundedReceiver<Command>,
927        response_tx: mpsc::UnboundedSender<ComputeControllerResponse>,
928        read_hold_tx: read_holds::ChangeTx,
929        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
930        read_only: bool,
931    ) -> Self {
932        let mut collections = BTreeMap::new();
933        let mut log_sources = BTreeMap::new();
934        for (log, id, shared) in arranged_logs {
935            let collection = CollectionState::new_log_collection(
936                id,
937                shared,
938                Arc::clone(&read_hold_tx),
939                introspection_tx.clone(),
940            );
941            collections.insert(id, collection);
942            log_sources.insert(log, id);
943        }
944
945        let history = ComputeCommandHistory::new(metrics.for_history());
946
947        let send_count = metrics.response_send_count.clone();
948        let recv_count = metrics.response_recv_count.clone();
949        let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
950
951        let now_dt = mz_ore::now::to_datetime(now());
952
953        Self {
954            build_info,
955            storage_collections: storage,
956            peek_stash_persist_location,
957            initialized: false,
958            read_only,
959            workload_class: None,
960            replicas: Default::default(),
961            replica_dyncfg_overrides: Default::default(),
962            collections,
963            log_sources,
964            peeks: Default::default(),
965            subscribes: Default::default(),
966            copy_tos: Default::default(),
967            history,
968            command_rx,
969            response_tx,
970            introspection_tx,
971            metrics,
972            dyncfg,
973            now,
974            wallclock_lag,
975            wallclock_lag_last_recorded: now_dt,
976            read_hold_tx,
977            replica_tx,
978            replica_rx,
979        }
980    }
981
982    pub(super) async fn run(mut self) {
983        self.send(ComputeCommand::Hello {
984            // The nonce is protocol iteration-specific and will be set in
985            // `ReplicaTask::specialize_command`.
986            nonce: Uuid::default(),
987        });
988
989        let instance_config = InstanceConfig {
990            peek_stash_persist_location: self.peek_stash_persist_location.clone(),
991            // The remaining fields are replica-specific and will be set in
992            // `ReplicaTask::specialize_command`.
993            logging: Default::default(),
994            expiration_offset: Default::default(),
995            arrangement_dictionary_compression: Default::default(),
996        };
997
998        self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
999
1000        loop {
1001            tokio::select! {
1002                command = self.command_rx.recv() => match command {
1003                    Some(cmd) => cmd(&mut self),
1004                    None => break,
1005                },
1006                response = self.replica_rx.recv() => match response {
1007                    Some(response) => self.handle_response(response),
1008                    None => unreachable!("self owns a sender side of the channel"),
1009                }
1010            }
1011        }
1012    }
1013
1014    /// Update instance configuration.
1015    #[mz_ore::instrument(level = "debug")]
1016    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
1017        if let Some(workload_class) = &config_params.workload_class {
1018            self.workload_class = workload_class.clone();
1019        }
1020
1021        let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
1022        self.send(command);
1023    }
1024
1025    /// Marks the end of any initialization commands.
1026    ///
1027    /// Intended to be called by `Controller`, rather than by other code.
1028    /// Calling this method repeatedly has no effect.
1029    #[mz_ore::instrument(level = "debug")]
1030    pub fn initialization_complete(&mut self) {
1031        // The compute protocol requires that `InitializationComplete` is sent only once.
1032        if !self.initialized {
1033            self.send(ComputeCommand::InitializationComplete);
1034            self.initialized = true;
1035        }
1036    }
1037
1038    /// Allows collections to affect writes to external systems (persist).
1039    ///
1040    /// Calling this method repeatedly has no effect.
1041    #[mz_ore::instrument(level = "debug")]
1042    pub fn allow_writes(&mut self, collection_id: GlobalId) -> Result<(), CollectionMissing> {
1043        let collection = self.collection_mut(collection_id)?;
1044
1045        // Do not send redundant allow-writes commands.
1046        if !collection.read_only {
1047            return Ok(());
1048        }
1049
1050        // Don't send allow-writes for collections that are not installed.
1051        let as_of = collection.read_frontier();
1052
1053        // If the collection has an empty `as_of`, it was either never installed on the replica or
1054        // has since been dropped. In either case the replica does not expect any commands for it.
1055        if as_of.is_empty() {
1056            return Ok(());
1057        }
1058
1059        collection.read_only = false;
1060        self.send(ComputeCommand::AllowWrites(collection_id));
1061
1062        Ok(())
1063    }
1064
1065    /// Shut down this instance.
1066    ///
1067    /// This method asserts that the instance has no replicas left. It exists to help
1068    /// us find bugs where the client drops a compute instance that still has replicas
1069    /// installed, and later assumes that said replicas still exist.
1070    ///
1071    /// # Panics
1072    ///
1073    /// Soft-panics if the compute instance still has active replicas.
1074    #[mz_ore::instrument(level = "debug")]
1075    pub fn shutdown(&mut self) {
1076        // Taking the `command_rx` ensures that the [`Instance::run`] loop terminates.
1077        let (_tx, rx) = mpsc::unbounded_channel();
1078        self.command_rx = rx;
1079
1080        let stray_replicas: Vec<_> = self.replicas.keys().collect();
1081        soft_assert_or_log!(
1082            stray_replicas.is_empty(),
1083            "dropped instance still has provisioned replicas: {stray_replicas:?}",
1084        );
1085    }
1086
1087    /// Terminate the [`Instance::run`] loop, causing the instance task to shut down.
1088    ///
1089    /// Unlike [`Instance::shutdown`], this does not assert that the instance has no replicas
1090    /// left. We use it to react to unrecoverable errors that can only occur during process
1091    /// shutdown, such as a storage read hold issuer hanging up while we rehydrate a replica.
1092    fn initiate_shutdown(&mut self) {
1093        // Replacing `command_rx` with a fresh, sender-less channel makes the next `recv` in
1094        // [`Instance::run`] return `None`, terminating the loop.
1095        let (_tx, rx) = mpsc::unbounded_channel();
1096        self.command_rx = rx;
1097    }
1098
1099    /// Sends a command to replicas of this instance.
1100    #[mz_ore::instrument(level = "debug")]
1101    fn send(&mut self, cmd: ComputeCommand) {
1102        // Record the command so that new replicas can be brought up to speed.
1103        // We record the *base* (un-specialized) command, so that the per-replica
1104        // dyncfg overrides are re-applied at replay time in `add_replica` rather
1105        // than baked into the shared history.
1106        self.history.push(cmd.clone());
1107
1108        let target_replica = self.target_replica(&cmd);
1109
1110        // Borrow the overrides separately from `self.replicas` so the per-replica
1111        // specialization below does not conflict with the mutable replica borrow.
1112        let overrides = &self.replica_dyncfg_overrides;
1113
1114        if let Some(rid) = target_replica {
1115            if let Some(replica) = self.replicas.get_mut(&rid) {
1116                let cmd = Self::specialize_command_for_replica(cmd, rid, overrides);
1117                let _ = replica.client.send(cmd);
1118            }
1119        } else {
1120            for (rid, replica) in self.replicas.iter_mut() {
1121                let cmd = Self::specialize_command_for_replica(cmd.clone(), *rid, overrides);
1122                let _ = replica.client.send(cmd);
1123            }
1124        }
1125    }
1126
1127    /// Specializes a command for a specific replica by merging that replica's
1128    /// dyncfg override (if any) into an `UpdateConfiguration` command. All other
1129    /// commands, and replicas without an override, are returned unchanged.
1130    fn specialize_command_for_replica(
1131        mut cmd: ComputeCommand,
1132        replica_id: ReplicaId,
1133        overrides: &BTreeMap<ReplicaId, ConfigUpdates>,
1134    ) -> ComputeCommand {
1135        if let ComputeCommand::UpdateConfiguration(params) = &mut cmd
1136            && let Some(over) = overrides.get(&replica_id)
1137            && !over.updates.is_empty()
1138        {
1139            params.dyncfg_updates.extend(over.clone());
1140        }
1141        cmd
1142    }
1143
1144    /// Replaces the per-replica dyncfg overrides. Callers should follow this
1145    /// with a configuration push (e.g. `update_configuration`) so that existing
1146    /// replicas observe the new overrides.
1147    pub(super) fn update_replica_dyncfg_overrides(
1148        &mut self,
1149        overrides: BTreeMap<ReplicaId, ConfigUpdates>,
1150    ) {
1151        self.replica_dyncfg_overrides = overrides;
1152    }
1153
1154    /// Determine the target replica for a compute command. Retrieves the
1155    /// collection named by the command, and returns the target replica if
1156    /// it is set, and None if not set, or the command doesn't name a collection.
1157    ///
1158    /// Panics if a create-dataflow command names collections that have different
1159    /// target replicas. It is an error to construct such an object and would
1160    /// indicate a bug in [`Self::create_dataflow`].
1161    fn target_replica(&self, cmd: &ComputeCommand) -> Option<ReplicaId> {
1162        match &cmd {
1163            ComputeCommand::Schedule(id)
1164            | ComputeCommand::AllowWrites(id)
1165            | ComputeCommand::AllowCompaction { id, .. } => {
1166                self.expect_collection(*id).target_replica
1167            }
1168            ComputeCommand::CreateDataflow(desc) => {
1169                let mut target_replica = None;
1170                for id in desc.export_ids() {
1171                    if let Some(replica) = self.expect_collection(id).target_replica {
1172                        if target_replica.is_some() {
1173                            assert_eq!(target_replica, Some(replica));
1174                        }
1175                        target_replica = Some(replica);
1176                    }
1177                }
1178                target_replica
1179            }
1180            // Skip Peek as we don't allow replica-targeted indexes.
1181            ComputeCommand::Peek(_)
1182            | ComputeCommand::Hello { .. }
1183            | ComputeCommand::CreateInstance(_)
1184            | ComputeCommand::InitializationComplete
1185            | ComputeCommand::UpdateConfiguration(_)
1186            | ComputeCommand::CancelPeek { .. } => None,
1187        }
1188    }
1189
1190    /// Add a new instance replica, by ID.
1191    #[mz_ore::instrument(level = "debug")]
1192    pub fn add_replica(
1193        &mut self,
1194        id: ReplicaId,
1195        mut config: ReplicaConfig,
1196        epoch: Option<u64>,
1197    ) -> Result<(), ReplicaExists> {
1198        if self.replica_exists(id) {
1199            return Err(ReplicaExists(id));
1200        }
1201
1202        config.logging.index_logs = self.log_sources.clone();
1203
1204        let epoch = epoch.unwrap_or(1);
1205        let metrics = self.metrics.for_replica(id);
1206        let client = ReplicaClient::spawn(
1207            id,
1208            self.build_info,
1209            config.clone(),
1210            epoch,
1211            metrics.clone(),
1212            Arc::clone(&self.dyncfg),
1213            self.replica_tx.clone(),
1214        );
1215
1216        // Take this opportunity to clean up the history we should present.
1217        self.history.reduce();
1218
1219        // Advance the uppers of source imports
1220        self.history.update_source_uppers(&self.storage_collections);
1221
1222        // Replay the commands at the client, creating new dataflow identifiers.
1223        for command in self.history.iter() {
1224            // Skip `CreateDataflow` commands targeted at different replicas.
1225            if let Some(target_replica) = self.target_replica(command)
1226                && target_replica != id
1227            {
1228                continue;
1229            }
1230
1231            // Re-apply this replica's dyncfg override to replayed config commands.
1232            let command = Self::specialize_command_for_replica(
1233                command.clone(),
1234                id,
1235                &self.replica_dyncfg_overrides,
1236            );
1237            if client.send(command).is_err() {
1238                // We swallow the error here. On the next send, we will fail again, and
1239                // restart the connection as well as this rehydration.
1240                tracing::warn!("Replica {:?} connection terminated during hydration", id);
1241                break;
1242            }
1243        }
1244
1245        // Add replica to tracked state.
1246        if self.add_replica_state(id, client, config, epoch).is_err() {
1247            // A storage read hold issuer hung up, which only happens during process shutdown.
1248            // There is no way to correctly bring up the replica anymore, so we shut the instance
1249            // down instead of running on with half-initialized replica state. `add_replica_state`
1250            // has already logged the details and inserted the replica to keep our bookkeeping
1251            // consistent with the controller's.
1252            self.initiate_shutdown();
1253        }
1254
1255        Ok(())
1256    }
1257
1258    /// Remove an existing instance replica, by ID.
1259    #[mz_ore::instrument(level = "debug")]
1260    pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1261        let replica = self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1262
1263        // Before dropping the replica state (and the contained input read holds), log read holds
1264        // that are the last line of defense against compaction of a dataflow's storage inputs. If
1265        // the corresponding global read hold has already been released, dropping the per-replica
1266        // read hold will allow compaction, which can cause the replica to panic trying to install
1267        // the dataflow.
1268        //
1269        // This exists primarily to help diagnose incidents-and-escalations#39.
1270        for (collection_id, replica_collection) in &replica.collections {
1271            let collection = self.collections.get(collection_id);
1272            for replica_hold in &replica_collection.input_read_holds {
1273                let input_id = replica_hold.id();
1274                let global_hold = collection.and_then(|c| c.storage_dependencies.get(&input_id));
1275                let unprotected = global_hold
1276                    .is_none_or(|h| PartialOrder::less_than(replica_hold.since(), h.since()));
1277                if unprotected {
1278                    tracing::warn!(
1279                        replica_id = %id,
1280                        %collection_id,
1281                        %input_id,
1282                        replica_hold_since = ?replica_hold.since(),
1283                        global_hold_since = ?global_hold.map(|h| h.since()),
1284                        "dropping per-replica read hold without equivalent global read hold",
1285                    );
1286                }
1287            }
1288        }
1289        drop(replica);
1290
1291        // Subscribes targeting this replica either won't be served anymore (if the replica is
1292        // dropped) or might produce inconsistent output (if the target collection is an
1293        // introspection index). We produce an error to inform upstream.
1294        let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1295        for subscribe_id in to_drop {
1296            let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1297            let response = ComputeControllerResponse::SubscribeResponse(
1298                subscribe_id,
1299                SubscribeBatch {
1300                    lower: subscribe.frontier.clone(),
1301                    upper: subscribe.frontier,
1302                    updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1303                },
1304            );
1305            self.deliver_response(response);
1306        }
1307
1308        // Peeks targeting this replica might not be served anymore (if the replica is dropped).
1309        // If the replica has failed it might come back and respond to the peek later, but it still
1310        // seems like a good idea to cancel the peek to inform the caller about the failure. This
1311        // is consistent with how we handle targeted subscribes above.
1312        let mut peek_responses = Vec::new();
1313        let mut to_drop = Vec::new();
1314        for (uuid, peek) in self.peeks_targeting(id) {
1315            peek_responses.push(ComputeControllerResponse::PeekNotification(
1316                uuid,
1317                PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1318                peek.otel_ctx.clone(),
1319            ));
1320            to_drop.push(uuid);
1321        }
1322        for response in peek_responses {
1323            self.deliver_response(response);
1324        }
1325        for uuid in to_drop {
1326            let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1327            self.finish_peek(uuid, response);
1328        }
1329
1330        // We might have a chance to forward implied capabilities and reduce the cost of bringing
1331        // up the next replica, if the dropped replica was the only one in the cluster.
1332        self.forward_implied_capabilities();
1333
1334        Ok(())
1335    }
1336
1337    /// Rehydrate the given instance replica.
1338    ///
1339    /// # Panics
1340    ///
1341    /// Panics if the specified replica does not exist.
1342    fn rehydrate_replica(&mut self, id: ReplicaId) {
1343        let config = self.replicas[&id].config.clone();
1344        let epoch = self.replicas[&id].epoch + 1;
1345
1346        self.remove_replica(id).expect("replica must exist");
1347        let result = self.add_replica(id, config, Some(epoch));
1348
1349        match result {
1350            Ok(()) => (),
1351            Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1352        }
1353    }
1354
1355    /// Rehydrate any failed replicas of this instance.
1356    fn rehydrate_failed_replicas(&mut self) {
1357        let replicas = self.replicas.iter();
1358        let failed_replicas: Vec<_> = replicas
1359            .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1360            .collect();
1361
1362        for replica_id in failed_replicas {
1363            self.rehydrate_replica(replica_id);
1364        }
1365    }
1366
1367    /// Creates the described dataflow and initializes state for its output.
1368    ///
1369    /// This method expects a `DataflowDescription` with an `as_of` frontier specified, as well as
1370    /// for each imported collection a read hold in `import_read_holds` at at least the `as_of`.
1371    #[mz_ore::instrument(level = "debug")]
1372    pub fn create_dataflow(
1373        &mut self,
1374        dataflow: DataflowDescription<mz_compute_types::plan::Plan, ()>,
1375        import_read_holds: Vec<ReadHold>,
1376        mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState>,
1377        target_replica: Option<ReplicaId>,
1378    ) -> Result<(), DataflowCreationError> {
1379        use DataflowCreationError::*;
1380
1381        // Validate that the target replica, if specified, exists.
1382        // A targeted dataflow is only installed on a single replica; if that
1383        // replica doesn't exist, we can't create the dataflow.
1384        if let Some(replica_id) = target_replica {
1385            if !self.replica_exists(replica_id) {
1386                return Err(ReplicaMissing(replica_id));
1387            }
1388        }
1389
1390        // Simple sanity checks around `as_of`
1391        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1392        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1393            return Err(EmptyAsOfForSubscribe);
1394        }
1395        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1396            return Err(EmptyAsOfForCopyTo);
1397        }
1398
1399        // Collect all dependencies of the dataflow, and read holds on them at the `as_of`.
1400        let mut storage_dependencies = BTreeMap::new();
1401        let mut compute_dependencies = BTreeMap::new();
1402
1403        // When we install per-replica input read holds, we cannot use the `as_of` because of
1404        // reconciliation: Existing slow replicas might be reading from the inputs at times before
1405        // the `as_of` and we would rather not crash them by allowing their inputs to compact too
1406        // far. So instead we take read holds at the least time available.
1407        let mut replica_input_read_holds = Vec::new();
1408
1409        let mut import_read_holds: BTreeMap<_, _> =
1410            import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1411
1412        for &id in dataflow.source_imports.keys() {
1413            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1414            replica_input_read_holds.push(read_hold.clone());
1415
1416            read_hold
1417                .try_downgrade(as_of.clone())
1418                .map_err(|_| ReadHoldInsufficient(id))?;
1419            storage_dependencies.insert(id, read_hold);
1420        }
1421
1422        for &id in dataflow.index_imports.keys() {
1423            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1424            read_hold
1425                .try_downgrade(as_of.clone())
1426                .map_err(|_| ReadHoldInsufficient(id))?;
1427            compute_dependencies.insert(id, read_hold);
1428        }
1429
1430        // If the `as_of` is empty, we are not going to create a dataflow, so replicas won't read
1431        // from the inputs.
1432        if as_of.is_empty() {
1433            replica_input_read_holds = Default::default();
1434        }
1435
1436        // Install collection state for each of the exports.
1437        for export_id in dataflow.export_ids() {
1438            let shared = shared_collection_state
1439                .remove(&export_id)
1440                .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1441            let write_only = dataflow.sink_exports.contains_key(&export_id);
1442            let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1443
1444            self.add_collection(
1445                export_id,
1446                as_of.clone(),
1447                shared,
1448                storage_dependencies.clone(),
1449                compute_dependencies.clone(),
1450                replica_input_read_holds.clone(),
1451                write_only,
1452                storage_sink,
1453                dataflow.initial_storage_as_of.clone(),
1454                dataflow.refresh_schedule.clone(),
1455                target_replica,
1456            );
1457
1458            // If the export is a storage sink, we can advance its write frontier to the write
1459            // frontier of the target storage collection.
1460            if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1461                self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1462            }
1463        }
1464
1465        // Initialize tracking of subscribes.
1466        for subscribe_id in dataflow.subscribe_ids() {
1467            self.subscribes
1468                .insert(subscribe_id, ActiveSubscribe::default());
1469        }
1470
1471        // Initialize tracking of copy tos.
1472        for copy_to_id in dataflow.copy_to_ids() {
1473            self.copy_tos.insert(copy_to_id);
1474        }
1475
1476        // Here we augment all imported sources and all exported sinks with the appropriate
1477        // storage metadata needed by the compute instance.
1478        let mut source_imports = BTreeMap::new();
1479        for (id, import) in dataflow.source_imports {
1480            let frontiers = self
1481                .storage_collections
1482                .collection_frontiers(id)
1483                .expect("collection exists");
1484
1485            let collection_metadata = self
1486                .storage_collections
1487                .collection_metadata(id)
1488                .expect("we have a read hold on this collection");
1489
1490            let desc = SourceInstanceDesc {
1491                storage_metadata: collection_metadata.clone(),
1492                arguments: import.desc.arguments,
1493                typ: import.desc.typ.clone(),
1494            };
1495            source_imports.insert(
1496                id,
1497                mz_compute_types::dataflows::SourceImport {
1498                    desc,
1499                    monotonic: import.monotonic,
1500                    with_snapshot: import.with_snapshot,
1501                    upper: frontiers.write_frontier,
1502                },
1503            );
1504        }
1505
1506        let mut sink_exports = BTreeMap::new();
1507        for (id, se) in dataflow.sink_exports {
1508            let connection = match se.connection {
1509                ComputeSinkConnection::MaterializedView(conn) => {
1510                    let metadata = self
1511                        .storage_collections
1512                        .collection_metadata(id)
1513                        .map_err(|_| CollectionMissing(id))?
1514                        .clone();
1515                    let conn = MaterializedViewSinkConnection {
1516                        value_desc: conn.value_desc,
1517                        storage_metadata: metadata,
1518                    };
1519                    ComputeSinkConnection::MaterializedView(conn)
1520                }
1521                ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1522                ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1523                    ComputeSinkConnection::CopyToS3Oneshot(conn)
1524                }
1525            };
1526            let desc = ComputeSinkDesc {
1527                from: se.from,
1528                from_desc: se.from_desc,
1529                connection,
1530                with_snapshot: se.with_snapshot,
1531                up_to: se.up_to,
1532                non_null_assertions: se.non_null_assertions,
1533                refresh_schedule: se.refresh_schedule,
1534            };
1535            sink_exports.insert(id, desc);
1536        }
1537
1538        // Flatten the dataflow plans into the representation expected by replicas.
1539        let objects_to_build = dataflow
1540            .objects_to_build
1541            .into_iter()
1542            .map(|object| BuildDesc {
1543                id: object.id,
1544                plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1545            })
1546            .collect();
1547
1548        let augmented_dataflow = DataflowDescription {
1549            source_imports,
1550            sink_exports,
1551            objects_to_build,
1552            // The rest of the fields are identical
1553            index_imports: dataflow.index_imports,
1554            index_exports: dataflow.index_exports,
1555            as_of: dataflow.as_of.clone(),
1556            until: dataflow.until,
1557            initial_storage_as_of: dataflow.initial_storage_as_of,
1558            refresh_schedule: dataflow.refresh_schedule,
1559            debug_name: dataflow.debug_name,
1560            time_dependence: dataflow.time_dependence,
1561        };
1562
1563        if augmented_dataflow.is_transient() {
1564            tracing::debug!(
1565                name = %augmented_dataflow.debug_name,
1566                import_ids = %augmented_dataflow.display_import_ids(),
1567                export_ids = %augmented_dataflow.display_export_ids(),
1568                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1569                until = ?augmented_dataflow.until.elements(),
1570                "creating dataflow",
1571            );
1572        } else {
1573            tracing::info!(
1574                name = %augmented_dataflow.debug_name,
1575                import_ids = %augmented_dataflow.display_import_ids(),
1576                export_ids = %augmented_dataflow.display_export_ids(),
1577                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1578                until = ?augmented_dataflow.until.elements(),
1579                "creating dataflow",
1580            );
1581        }
1582
1583        // Skip the actual dataflow creation for an empty `as_of`. (Happens e.g. for the
1584        // bootstrapping of a REFRESH AT mat view that is past its last refresh.)
1585        if as_of.is_empty() {
1586            tracing::info!(
1587                name = %augmented_dataflow.debug_name,
1588                "not sending `CreateDataflow`, because of empty `as_of`",
1589            );
1590        } else {
1591            let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1592            self.send(ComputeCommand::CreateDataflow(Box::new(augmented_dataflow)));
1593
1594            for id in collections {
1595                self.maybe_schedule_collection(id);
1596            }
1597        }
1598
1599        Ok(())
1600    }
1601
1602    /// Schedule the identified collection if all its inputs are available.
1603    ///
1604    /// # Panics
1605    ///
1606    /// Panics if the identified collection does not exist.
1607    fn maybe_schedule_collection(&mut self, id: GlobalId) {
1608        let collection = self.expect_collection(id);
1609
1610        // Don't schedule collections twice.
1611        if collection.scheduled {
1612            return;
1613        }
1614
1615        let as_of = collection.read_frontier();
1616
1617        // If the collection has an empty `as_of`, it was either never installed on the replica or
1618        // has since been dropped. In either case the replica does not expect any commands for it.
1619        if as_of.is_empty() {
1620            return;
1621        }
1622
1623        let ready = if id.is_transient() {
1624            // Always schedule transient collections immediately. The assumption is that those are
1625            // created by interactive user commands and we want to schedule them as quickly as
1626            // possible. Inputs might not yet be available, but when they become available, we
1627            // don't need to wait for the controller to become aware and for the scheduling check
1628            // to run again.
1629            true
1630        } else {
1631            // Ignore self-dependencies. Any self-dependencies do not need to be
1632            // available at the as_of for the dataflow to make progress, so we
1633            // can ignore them here. At the moment, only continual tasks have
1634            // self-dependencies, but this logic is correct for any dataflow, so
1635            // we don't special case it to CTs.
1636            let not_self_dep = |x: &GlobalId| *x != id;
1637
1638            // Make sure we never schedule a collection before its input compute collections have
1639            // been scheduled. Scheduling in the wrong order can lead to deadlocks.
1640            let mut deps_scheduled = true;
1641
1642            // Check dependency frontiers to determine if all inputs are
1643            // available. An input is available when its frontier is greater
1644            // than the `as_of`, i.e., all input data up to and including the
1645            // `as_of` has been sealed.
1646            let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1647            let mut compute_frontiers = Vec::new();
1648            for id in compute_deps {
1649                let dep = &self.expect_collection(id);
1650                deps_scheduled &= dep.scheduled;
1651                compute_frontiers.push(dep.write_frontier());
1652            }
1653
1654            let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1655            let storage_frontiers = self
1656                .storage_collections
1657                .collections_frontiers(storage_deps.collect())
1658                .expect("must exist");
1659            let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1660
1661            let mut frontiers = compute_frontiers.into_iter().chain(storage_frontiers);
1662            let frontiers_ready =
1663                frontiers.all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1664
1665            deps_scheduled && frontiers_ready
1666        };
1667
1668        if ready {
1669            self.send(ComputeCommand::Schedule(id));
1670            let collection = self.expect_collection_mut(id);
1671            collection.scheduled = true;
1672        }
1673    }
1674
1675    /// Schedule any unscheduled collections that are ready.
1676    fn schedule_collections(&mut self) {
1677        let ids: Vec<_> = self.collections.keys().copied().collect();
1678        for id in ids {
1679            self.maybe_schedule_collection(id);
1680        }
1681    }
1682
1683    /// Drops the read capability for the given collections and allows their resources to be
1684    /// reclaimed.
1685    #[mz_ore::instrument(level = "debug")]
1686    pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1687        for id in &ids {
1688            let collection = self.collection_mut(*id)?;
1689
1690            // Mark the collection as dropped to allow it to be removed from the controller state.
1691            collection.dropped = true;
1692
1693            // Drop the implied and warmup read holds to announce that clients are not
1694            // interested in the collection anymore.
1695            collection.implied_read_hold.release();
1696            collection.warmup_read_hold.release();
1697
1698            // If the collection is a subscribe, stop tracking it. This ensures that the controller
1699            // ceases to produce `SubscribeResponse`s for this subscribe.
1700            self.subscribes.remove(id);
1701            // If the collection is a copy to, stop tracking it. This ensures that the controller
1702            // ceases to produce `CopyToResponse`s` for this copy to.
1703            self.copy_tos.remove(id);
1704        }
1705
1706        Ok(())
1707    }
1708
1709    /// Initiate a peek request for the contents of `id` at `timestamp`.
1710    ///
1711    /// If this returns an error, then it didn't modify any `Instance` state.
1712    #[mz_ore::instrument(level = "debug")]
1713    pub fn peek(
1714        &mut self,
1715        peek_target: PeekTarget,
1716        literal_constraints: Option<Vec<Row>>,
1717        uuid: Uuid,
1718        timestamp: Timestamp,
1719        result_desc: RelationDesc,
1720        finishing: RowSetFinishing,
1721        map_filter_project: mz_expr::SafeMfpPlan,
1722        mut read_hold: ReadHold,
1723        target_replica: Option<ReplicaId>,
1724        peek_response_tx: oneshot::Sender<PeekResponse>,
1725    ) -> Result<(), PeekError> {
1726        use PeekError::*;
1727
1728        let target_id = peek_target.id();
1729
1730        // Downgrade the provided read hold to the peek time.
1731        if read_hold.id() != target_id {
1732            return Err(ReadHoldIdMismatch(read_hold.id()));
1733        }
1734        read_hold
1735            .try_downgrade(Antichain::from_elem(timestamp.clone()))
1736            .map_err(|_| ReadHoldInsufficient(target_id))?;
1737
1738        if let Some(target) = target_replica {
1739            if !self.replica_exists(target) {
1740                return Err(ReplicaMissing(target));
1741            }
1742        }
1743
1744        let otel_ctx = OpenTelemetryContext::obtain();
1745
1746        self.peeks.insert(
1747            uuid,
1748            PendingPeek {
1749                target_replica,
1750                // TODO(guswynn): can we just hold the `tracing::Span` here instead?
1751                otel_ctx: otel_ctx.clone(),
1752                requested_at: Instant::now(),
1753                read_hold,
1754                peek_response_tx,
1755                limit: finishing.limit.map(usize::cast_from),
1756                offset: finishing.offset,
1757            },
1758        );
1759
1760        let peek = Peek {
1761            literal_constraints,
1762            uuid,
1763            timestamp,
1764            finishing,
1765            map_filter_project,
1766            // Obtain an `OpenTelemetryContext` from the thread-local tracing
1767            // tree to forward it on to the compute worker.
1768            otel_ctx,
1769            target: peek_target,
1770            result_desc,
1771        };
1772        self.send(ComputeCommand::Peek(Box::new(peek)));
1773
1774        Ok(())
1775    }
1776
1777    /// Cancels an existing peek request.
1778    #[mz_ore::instrument(level = "debug")]
1779    pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1780        let Some(peek) = self.peeks.get_mut(&uuid) else {
1781            tracing::warn!("did not find pending peek for {uuid}");
1782            return;
1783        };
1784
1785        let duration = peek.requested_at.elapsed();
1786        self.metrics
1787            .observe_peek_response(&PeekResponse::Canceled, duration);
1788
1789        // Enqueue a notification for the cancellation.
1790        let otel_ctx = peek.otel_ctx.clone();
1791        otel_ctx.attach_as_parent();
1792
1793        self.deliver_response(ComputeControllerResponse::PeekNotification(
1794            uuid,
1795            PeekNotification::Canceled,
1796            otel_ctx,
1797        ));
1798
1799        // Finish the peek.
1800        // This will also propagate the cancellation to the replicas.
1801        self.finish_peek(uuid, reason);
1802    }
1803
1804    /// Assigns a read policy to specific identifiers.
1805    ///
1806    /// The policies are assigned in the order presented, and repeated identifiers should
1807    /// conclude with the last policy. Changing a policy will immediately downgrade the read
1808    /// capability if appropriate, but it will not "recover" the read capability if the prior
1809    /// capability is already ahead of it.
1810    ///
1811    /// Identifiers not present in `policies` retain their existing read policies.
1812    ///
1813    /// It is an error to attempt to set a read policy for a collection that is not readable in the
1814    /// context of compute. At this time, only indexes are readable compute collections.
1815    #[mz_ore::instrument(level = "debug")]
1816    pub fn set_read_policy(
1817        &mut self,
1818        policies: Vec<(GlobalId, ReadPolicy)>,
1819    ) -> Result<(), ReadPolicyError> {
1820        // Do error checking upfront, to avoid introducing inconsistencies between a collection's
1821        // `implied_capability` and `read_capabilities`.
1822        for (id, _policy) in &policies {
1823            let collection = self.collection(*id)?;
1824            if collection.read_policy.is_none() {
1825                return Err(ReadPolicyError::WriteOnlyCollection(*id));
1826            }
1827        }
1828
1829        for (id, new_policy) in policies {
1830            let collection = self.expect_collection_mut(id);
1831            let new_since = new_policy.frontier(collection.write_frontier().borrow());
1832            let _ = collection.implied_read_hold.try_downgrade(new_since);
1833            collection.read_policy = Some(new_policy);
1834        }
1835
1836        Ok(())
1837    }
1838
1839    /// Advance the global write frontier of the given collection.
1840    ///
1841    /// Frontier regressions are gracefully ignored.
1842    ///
1843    /// # Panics
1844    ///
1845    /// Panics if the identified collection does not exist.
1846    #[mz_ore::instrument(level = "debug")]
1847    fn maybe_update_global_write_frontier(
1848        &mut self,
1849        id: GlobalId,
1850        new_frontier: Antichain<Timestamp>,
1851    ) {
1852        let collection = self.expect_collection_mut(id);
1853
1854        let advanced = collection.shared.lock_write_frontier(|f| {
1855            let advanced = PartialOrder::less_than(f, &new_frontier);
1856            if advanced {
1857                f.clone_from(&new_frontier);
1858            }
1859            advanced
1860        });
1861
1862        if !advanced {
1863            return;
1864        }
1865
1866        // Relax the implied read hold according to the read policy.
1867        let new_since = match &collection.read_policy {
1868            Some(read_policy) => {
1869                // For readable collections the read frontier is determined by applying the
1870                // client-provided read policy to the write frontier.
1871                read_policy.frontier(new_frontier.borrow())
1872            }
1873            None => {
1874                // Write-only collections cannot be read within the context of the compute
1875                // controller, so their read frontier only controls the read holds taken on their
1876                // inputs. We can safely downgrade the input read holds to any time less than the
1877                // write frontier.
1878                //
1879                // Note that some write-only collections (continual tasks) need to observe changes
1880                // at their current write frontier during hydration. Thus, we cannot downgrade the
1881                // read frontier to the write frontier and instead step it back by one.
1882                Antichain::from_iter(
1883                    new_frontier
1884                        .iter()
1885                        .map(|t| t.step_back().unwrap_or(Timestamp::MIN)),
1886                )
1887            }
1888        };
1889        let _ = collection.implied_read_hold.try_downgrade(new_since);
1890
1891        // Report the frontier advancement.
1892        self.deliver_response(ComputeControllerResponse::FrontierUpper {
1893            id,
1894            upper: new_frontier,
1895        });
1896    }
1897
1898    /// Apply a collection read hold change.
1899    pub(super) fn apply_read_hold_change(
1900        &mut self,
1901        id: GlobalId,
1902        mut update: ChangeBatch<Timestamp>,
1903    ) {
1904        let Some(collection) = self.collections.get_mut(&id) else {
1905            soft_panic_or_log!(
1906                "read hold change for absent collection (id={id}, changes={update:?})"
1907            );
1908            return;
1909        };
1910
1911        let new_since = collection.shared.lock_read_capabilities(|caps| {
1912            // Sanity check to prevent corrupted `read_capabilities`, which can cause hard-to-debug
1913            // issues (usually stuck read frontiers).
1914            let read_frontier = caps.frontier();
1915            for (time, diff) in update.iter() {
1916                let count = caps.count_for(time) + diff;
1917                assert!(
1918                    count >= 0,
1919                    "invalid read capabilities update: negative capability \
1920             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1921                );
1922                assert!(
1923                    count == 0 || read_frontier.less_equal(time),
1924                    "invalid read capabilities update: frontier regression \
1925             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1926                );
1927            }
1928
1929            // Apply read capability updates and learn about resulting changes to the read
1930            // frontier.
1931            let changes = caps.update_iter(update.drain());
1932
1933            let changed = changes.count() > 0;
1934            changed.then(|| caps.frontier().to_owned())
1935        });
1936
1937        let Some(new_since) = new_since else {
1938            return; // read frontier did not change
1939        };
1940
1941        // Propagate read frontier update to dependencies.
1942        for read_hold in collection.compute_dependencies.values_mut() {
1943            read_hold
1944                .try_downgrade(new_since.clone())
1945                .expect("frontiers don't regress");
1946        }
1947        for read_hold in collection.storage_dependencies.values_mut() {
1948            read_hold
1949                .try_downgrade(new_since.clone())
1950                .expect("frontiers don't regress");
1951        }
1952
1953        // Produce `AllowCompaction` command.
1954        self.send(ComputeCommand::AllowCompaction {
1955            id,
1956            frontier: new_since,
1957        });
1958    }
1959
1960    /// Fulfills a registered peek and cleans up associated state.
1961    ///
1962    /// As part of this we:
1963    ///  * Send a `PeekResponse` through the peek's response channel.
1964    ///  * Emit a `CancelPeek` command to instruct replicas to stop spending resources on this
1965    ///    peek, and to allow the `ComputeCommandHistory` to reduce away the corresponding `Peek`
1966    ///    command.
1967    ///  * Remove the read hold for this peek, unblocking compaction that might have waited on it.
1968    fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1969        let Some(peek) = self.peeks.remove(&uuid) else {
1970            return;
1971        };
1972
1973        // The recipient might not be interested in the peek response anymore, which is fine.
1974        let _ = peek.peek_response_tx.send(response);
1975
1976        // NOTE: We need to send the `CancelPeek` command _before_ we release the peek's read hold
1977        // (by dropping it), to avoid the edge case that caused database-issues#4812.
1978        self.send(ComputeCommand::CancelPeek { uuid });
1979
1980        drop(peek.read_hold);
1981    }
1982
1983    /// Handles a response from a replica. Replica IDs are re-used across replica restarts, so we
1984    /// use the replica epoch to drop stale responses.
1985    fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse) {
1986        // Filter responses from non-existing or stale replicas.
1987        if self
1988            .replicas
1989            .get(&replica_id)
1990            .filter(|replica| replica.epoch == epoch)
1991            .is_none()
1992        {
1993            return;
1994        }
1995
1996        // Invariant: the replica exists and has the expected epoch.
1997
1998        match response {
1999            ComputeResponse::Frontiers(id, frontiers) => {
2000                self.handle_frontiers_response(id, frontiers, replica_id);
2001            }
2002            ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
2003                self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
2004            }
2005            ComputeResponse::CopyToResponse(id, response) => {
2006                self.handle_copy_to_response(id, response, replica_id);
2007            }
2008            ComputeResponse::SubscribeResponse(id, response) => {
2009                self.handle_subscribe_response(id, response, replica_id);
2010            }
2011            ComputeResponse::Status(response) => {
2012                self.handle_status_response(response, replica_id);
2013            }
2014        }
2015    }
2016
2017    /// Handle new frontiers, returning any compute response that needs to
2018    /// be sent to the client.
2019    fn handle_frontiers_response(
2020        &mut self,
2021        id: GlobalId,
2022        frontiers: FrontiersResponse,
2023        replica_id: ReplicaId,
2024    ) {
2025        if !self.collections.contains_key(&id) {
2026            soft_panic_or_log!(
2027                "frontiers update for an unknown collection \
2028                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
2029            );
2030            return;
2031        }
2032        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2033            soft_panic_or_log!(
2034                "frontiers update for an unknown replica \
2035                 (replica_id={replica_id}, frontiers={frontiers:?})"
2036            );
2037            return;
2038        };
2039        let Some(replica_collection) = replica.collections.get_mut(&id) else {
2040            soft_panic_or_log!(
2041                "frontiers update for an unknown replica collection \
2042                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
2043            );
2044            return;
2045        };
2046
2047        if let Some(new_frontier) = frontiers.input_frontier {
2048            replica_collection.update_input_frontier(new_frontier.clone());
2049        }
2050        if let Some(new_frontier) = frontiers.output_frontier {
2051            replica_collection.update_output_frontier(new_frontier.clone());
2052        }
2053        if let Some(new_frontier) = frontiers.write_frontier {
2054            replica_collection.update_write_frontier(new_frontier.clone());
2055            self.maybe_update_global_write_frontier(id, new_frontier);
2056        }
2057    }
2058
2059    #[mz_ore::instrument(level = "debug")]
2060    fn handle_peek_response(
2061        &mut self,
2062        uuid: Uuid,
2063        response: PeekResponse,
2064        otel_ctx: OpenTelemetryContext,
2065        replica_id: ReplicaId,
2066    ) {
2067        otel_ctx.attach_as_parent();
2068
2069        // We might not be tracking this peek anymore, because we have served a response already or
2070        // because it was canceled. If this is the case, we ignore the response.
2071        let Some(peek) = self.peeks.get(&uuid) else {
2072            return;
2073        };
2074
2075        // If the peek is targeting a replica, ignore responses from other replicas.
2076        let target_replica = peek.target_replica.unwrap_or(replica_id);
2077        if target_replica != replica_id {
2078            return;
2079        }
2080
2081        let duration = peek.requested_at.elapsed();
2082        self.metrics.observe_peek_response(&response, duration);
2083
2084        let notification = PeekNotification::new(&response, peek.offset, peek.limit);
2085        // NOTE: We use the `otel_ctx` from the response, not the pending peek, because we
2086        // currently want the parent to be whatever the compute worker did with this peek.
2087        self.deliver_response(ComputeControllerResponse::PeekNotification(
2088            uuid,
2089            notification,
2090            otel_ctx,
2091        ));
2092
2093        self.finish_peek(uuid, response)
2094    }
2095
2096    fn handle_copy_to_response(
2097        &mut self,
2098        sink_id: GlobalId,
2099        response: CopyToResponse,
2100        replica_id: ReplicaId,
2101    ) {
2102        if !self.collections.contains_key(&sink_id) {
2103            soft_panic_or_log!(
2104                "received response for an unknown copy-to \
2105                 (sink_id={sink_id}, replica_id={replica_id})",
2106            );
2107            return;
2108        }
2109        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2110            soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
2111            return;
2112        };
2113        let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
2114            soft_panic_or_log!(
2115                "copy-to response for an unknown replica collection \
2116                 (sink_id={sink_id}, replica_id={replica_id})"
2117            );
2118            return;
2119        };
2120
2121        // Downgrade the replica frontiers, to enable dropping of input read holds and clean up of
2122        // collection state.
2123        // TODO(database-issues#4701): report copy-to frontiers through `Frontiers` responses
2124        replica_collection.update_write_frontier(Antichain::new());
2125        replica_collection.update_input_frontier(Antichain::new());
2126        replica_collection.update_output_frontier(Antichain::new());
2127
2128        // We might not be tracking this COPY TO because we have already returned a response
2129        // from one of the replicas. In that case, we ignore the response.
2130        if !self.copy_tos.remove(&sink_id) {
2131            return;
2132        }
2133
2134        let result = match response {
2135            CopyToResponse::RowCount(count) => Ok(count),
2136            CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
2137            // We should never get here: Replicas only drop copy to collections in response
2138            // to the controller allowing them to do so, and when the controller drops a
2139            // copy to it also removes it from the list of tracked copy_tos (see
2140            // [`Instance::drop_collections`]).
2141            CopyToResponse::Dropped => {
2142                tracing::error!(
2143                    %sink_id, %replica_id,
2144                    "received `Dropped` response for a tracked copy to",
2145                );
2146                return;
2147            }
2148        };
2149
2150        self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2151    }
2152
2153    fn handle_subscribe_response(
2154        &mut self,
2155        subscribe_id: GlobalId,
2156        response: SubscribeResponse,
2157        replica_id: ReplicaId,
2158    ) {
2159        if !self.collections.contains_key(&subscribe_id) {
2160            soft_panic_or_log!(
2161                "received response for an unknown subscribe \
2162                 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2163            );
2164            return;
2165        }
2166        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2167            soft_panic_or_log!(
2168                "subscribe response for an unknown replica (replica_id={replica_id})"
2169            );
2170            return;
2171        };
2172        let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2173            soft_panic_or_log!(
2174                "subscribe response for an unknown replica collection \
2175                 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2176            );
2177            return;
2178        };
2179
2180        // Always apply replica write frontier updates. Even if the subscribe is not tracked
2181        // anymore, there might still be replicas reading from its inputs, so we need to track the
2182        // frontiers until all replicas have advanced to the empty one.
2183        let write_frontier = match &response {
2184            SubscribeResponse::Batch(batch) => batch.upper.clone(),
2185            SubscribeResponse::DroppedAt(_) => Antichain::new(),
2186        };
2187
2188        // For subscribes we downgrade all replica frontiers based on write frontiers. This should
2189        // be fine because the input and output frontier of a subscribe track its write frontier.
2190        // TODO(database-issues#4701): report subscribe frontiers through `Frontiers` responses
2191        replica_collection.update_write_frontier(write_frontier.clone());
2192        replica_collection.update_input_frontier(write_frontier.clone());
2193        replica_collection.update_output_frontier(write_frontier.clone());
2194
2195        // If the subscribe is not tracked, or targets a different replica, there is nothing to do.
2196        let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2197            return;
2198        };
2199
2200        // Apply a global frontier update.
2201        // If this is a replica-targeted subscribe, it is important that we advance the global
2202        // frontier only based on responses from the targeted replica. Otherwise, another replica
2203        // could advance to the empty frontier, making us drop the subscribe on the targeted
2204        // replica prematurely.
2205        self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2206
2207        match response {
2208            SubscribeResponse::Batch(batch) => {
2209                let upper = batch.upper;
2210                let mut updates = batch.updates;
2211
2212                // If this batch advances the subscribe's frontier, we emit all updates at times
2213                // greater or equal to the last frontier (to avoid emitting duplicate updates).
2214                if PartialOrder::less_than(&subscribe.frontier, &upper) {
2215                    let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2216
2217                    if upper.is_empty() {
2218                        // This subscribe cannot produce more data. Stop tracking it.
2219                        self.subscribes.remove(&subscribe_id);
2220                    } else {
2221                        // This subscribe can produce more data. Update our tracking of it.
2222                        self.subscribes.insert(subscribe_id, subscribe);
2223                    }
2224
2225                    if let Ok(updates) = updates.as_mut() {
2226                        updates.retain_mut(|updates| {
2227                            let offset = updates.times().partition_point(|t| {
2228                                // True for times that are strictly less than lower (and should be skipped)
2229                                // and false otherwise.
2230                                !lower.less_equal(t)
2231                            });
2232                            let (_, past_lower) = std::mem::take(updates).split_at(offset);
2233                            *updates = past_lower;
2234                            updates.len() > 0
2235                        });
2236                    }
2237                    self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2238                        subscribe_id,
2239                        SubscribeBatch {
2240                            lower,
2241                            upper,
2242                            updates,
2243                        },
2244                    ));
2245                }
2246            }
2247            SubscribeResponse::DroppedAt(frontier) => {
2248                // We should never get here: Replicas only drop subscribe collections in response
2249                // to the controller allowing them to do so, and when the controller drops a
2250                // subscribe it also removes it from the list of tracked subscribes (see
2251                // [`Instance::drop_collections`]).
2252                tracing::error!(
2253                    %subscribe_id,
2254                    %replica_id,
2255                    frontier = ?frontier.elements(),
2256                    "received `DroppedAt` response for a tracked subscribe",
2257                );
2258                self.subscribes.remove(&subscribe_id);
2259            }
2260        }
2261    }
2262
2263    fn handle_status_response(&self, response: StatusResponse, _replica_id: ReplicaId) {
2264        match response {
2265            StatusResponse::Placeholder => {}
2266        }
2267    }
2268
2269    /// Return the write frontiers of the dependencies of the given collection.
2270    fn dependency_write_frontiers<'b>(
2271        &'b self,
2272        collection: &'b CollectionState,
2273    ) -> impl Iterator<Item = Antichain<Timestamp>> + 'b {
2274        let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2275            let collection = self.collections.get(&dep_id);
2276            collection.map(|c| c.write_frontier())
2277        });
2278        let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2279            let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2280            frontiers.map(|f| f.write_frontier)
2281        });
2282
2283        compute_frontiers.chain(storage_frontiers)
2284    }
2285
2286    /// Return the write frontiers of transitive storage dependencies of the given collection.
2287    fn transitive_storage_dependency_write_frontiers<'b>(
2288        &'b self,
2289        collection: &'b CollectionState,
2290    ) -> impl Iterator<Item = Antichain<Timestamp>> + 'b {
2291        let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2292        let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2293        let mut done = BTreeSet::new();
2294
2295        while let Some(id) = todo.pop() {
2296            if done.contains(&id) {
2297                continue;
2298            }
2299            if let Some(dep) = self.collections.get(&id) {
2300                storage_ids.extend(dep.storage_dependency_ids());
2301                todo.extend(dep.compute_dependency_ids())
2302            }
2303            done.insert(id);
2304        }
2305
2306        let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2307            let frontiers = self.storage_collections.collection_frontiers(id).ok();
2308            frontiers.map(|f| f.write_frontier)
2309        });
2310
2311        storage_frontiers
2312    }
2313
2314    /// Downgrade the warmup capabilities of collections as much as possible.
2315    ///
2316    /// The only requirement we have for a collection's warmup capability is that it is for a time
2317    /// that is available in all of the collection's inputs. For each input the latest time that is
2318    /// the case for is `write_frontier - 1`. So the farthest we can downgrade a collection's
2319    /// warmup capability is the minimum of `write_frontier - 1` of all its inputs.
2320    ///
2321    /// This method expects to be periodically called as part of instance maintenance work.
2322    /// We would like to instead update the warmup capabilities synchronously in response to
2323    /// frontier updates of dependency collections, but that is not generally possible because we
2324    /// don't learn about frontier updates of storage collections synchronously. We could do
2325    /// synchronous updates for compute dependencies, but we refrain from doing for simplicity.
2326    fn downgrade_warmup_capabilities(&mut self) {
2327        let mut new_capabilities = BTreeMap::new();
2328        for (id, collection) in &self.collections {
2329            // For write-only collections that have advanced to the empty frontier, we can drop the
2330            // warmup capability entirely. There is no reason why we would need to hydrate those
2331            // collections again, so being able to warm them up is not useful.
2332            if collection.read_policy.is_none()
2333                && collection.shared.lock_write_frontier(|f| f.is_empty())
2334            {
2335                new_capabilities.insert(*id, Antichain::new());
2336                continue;
2337            }
2338
2339            let mut new_capability = Antichain::new();
2340            for frontier in self.dependency_write_frontiers(collection) {
2341                for time in frontier {
2342                    new_capability.insert(time.step_back().unwrap_or(time));
2343                }
2344            }
2345
2346            new_capabilities.insert(*id, new_capability);
2347        }
2348
2349        for (id, new_capability) in new_capabilities {
2350            let collection = self.expect_collection_mut(id);
2351            let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2352        }
2353    }
2354
2355    /// Forward the implied capabilities of collections, if possible.
2356    ///
2357    /// The implied capability of a collection controls (a) which times are still readable (for
2358    /// indexes) and (b) with which as-of the collection gets installed on a new replica. We are
2359    /// usually not allowed to advance an implied capability beyond the frontier that follows from
2360    /// the collection's read policy applied to its write frontier:
2361    ///
2362    ///  * For sink collections, some external consumer might rely on seeing all distinct times in
2363    ///    the input reflected in the output. If we'd forward the implied capability of a sink,
2364    ///    we'd risk skipping times in the output across replica restarts.
2365    ///  * For index collections, we might make the index unreadable by advancing its read frontier
2366    ///    beyond its write frontier.
2367    ///
2368    /// There is one case where forwarding an implied capability is fine though: an index installed
2369    /// on a cluster that has no replicas. Such indexes are not readable anyway until a new replica
2370    /// is added, so advancing its read frontier can't make it unreadable. We can thus advance the
2371    /// implied capability as long as we make sure that when a new replica is added, the expected
2372    /// relationship between write frontier, read policy, and implied capability can be restored
2373    /// immediately (modulo computation time).
2374    ///
2375    /// Forwarding implied capabilities is not necessary for the correct functioning of the
2376    /// controller but an optimization that is beneficial in two ways:
2377    ///
2378    ///  * It relaxes read holds on inputs to forwarded collections, allowing their compaction.
2379    ///  * It reduces the amount of historical detail new replicas need to process when computing
2380    ///    forwarded collections, as forwarding the implied capability also forwards the corresponding
2381    ///    dataflow as-of.
2382    fn forward_implied_capabilities(&mut self) {
2383        if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2384            return;
2385        }
2386        if !self.replicas.is_empty() {
2387            return;
2388        }
2389
2390        let mut new_capabilities = BTreeMap::new();
2391        for (id, collection) in &self.collections {
2392            let Some(read_policy) = &collection.read_policy else {
2393                // Collection is write-only, i.e. a sink.
2394                continue;
2395            };
2396
2397            // When a new replica is started, it will immediately be able to compute all collection
2398            // output up to the write frontier of its transitive storage inputs. So the new implied
2399            // read capability should be the read policy applied to that frontier.
2400            let mut dep_frontier = Antichain::new();
2401            for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2402                dep_frontier.extend(frontier);
2403            }
2404
2405            let new_capability = read_policy.frontier(dep_frontier.borrow());
2406            if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2407                new_capabilities.insert(*id, new_capability);
2408            }
2409        }
2410
2411        for (id, new_capability) in new_capabilities {
2412            let collection = self.expect_collection_mut(id);
2413            let _ = collection.implied_read_hold.try_downgrade(new_capability);
2414        }
2415    }
2416
2417    /// Acquires a `ReadHold` for the identified compute collection.
2418    ///
2419    /// This mirrors the logic used by the controller-side `InstanceState::acquire_read_hold`,
2420    /// but executes on the instance task itself.
2421    pub(super) fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold, CollectionMissing> {
2422        // Similarly to InstanceState::acquire_read_hold and StorageCollections::acquire_read_holds,
2423        // we acquire read holds at the earliest possible time rather than returning a copy
2424        // of the implied read hold. This is so that dependents can acquire read holds on
2425        // compute dependencies at frontiers that are held back by other read holds the caller
2426        // has previously taken.
2427        let collection = self.collection(id)?;
2428        let since = collection.shared.lock_read_capabilities(|caps| {
2429            let since = caps.frontier().to_owned();
2430            caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
2431            since
2432        });
2433        let hold = ReadHold::new(id, since, Arc::clone(&self.read_hold_tx));
2434        Ok(hold)
2435    }
2436
2437    /// Process pending maintenance work.
2438    ///
2439    /// This method is invoked periodically by the global controller.
2440    /// It is a good place to perform maintenance work that arises from various controller state
2441    /// changes and that cannot conveniently be handled synchronously with those state changes.
2442    #[mz_ore::instrument(level = "debug")]
2443    pub fn maintain(&mut self) {
2444        self.rehydrate_failed_replicas();
2445        self.downgrade_warmup_capabilities();
2446        self.forward_implied_capabilities();
2447        self.schedule_collections();
2448        self.cleanup_collections();
2449        self.update_frontier_introspection();
2450        self.refresh_state_metrics();
2451        self.refresh_wallclock_lag();
2452    }
2453}
2454
2455/// State maintained about individual compute collections.
2456///
2457/// A compute collection is either an index, or a storage sink, or a subscribe, exported by a
2458/// compute dataflow.
2459#[derive(Debug)]
2460struct CollectionState {
2461    /// If set, this collection is only maintained by the specified replica.
2462    target_replica: Option<ReplicaId>,
2463    /// Whether this collection is a log collection.
2464    ///
2465    /// Log collections are special in that they are only maintained by a subset of all replicas.
2466    log_collection: bool,
2467    /// Whether this collection has been dropped by a controller client.
2468    ///
2469    /// The controller is allowed to remove the `CollectionState` for a collection only when
2470    /// `dropped == true`. Otherwise, clients might still expect to be able to query information
2471    /// about this collection.
2472    dropped: bool,
2473    /// Whether this collection has been scheduled, i.e., the controller has sent a `Schedule`
2474    /// command for it.
2475    scheduled: bool,
2476
2477    /// Whether this collection is in read-only mode.
2478    ///
2479    /// When in read-only mode, the dataflow is not allowed to affect external state (largely persist).
2480    read_only: bool,
2481
2482    /// State shared with the `ComputeController`.
2483    shared: SharedCollectionState,
2484
2485    /// A read hold maintaining the implicit capability of the collection.
2486    ///
2487    /// This capability is kept to ensure that the collection remains readable according to its
2488    /// `read_policy`. It also ensures that read holds on the collection's dependencies are kept at
2489    /// some time not greater than the collection's `write_frontier`, guaranteeing that the
2490    /// collection's next outputs can always be computed without skipping times.
2491    implied_read_hold: ReadHold,
2492    /// A read hold held to enable dataflow warmup.
2493    ///
2494    /// Dataflow warmup is an optimization that allows dataflows to immediately start hydrating
2495    /// even when their next output time (as implied by the `write_frontier`) is in the future.
2496    /// By installing a read capability derived from the write frontiers of the collection's
2497    /// inputs, we ensure that the as-of of new dataflows installed for the collection is at a time
2498    /// that is immediately available, so hydration can begin immediately too.
2499    warmup_read_hold: ReadHold,
2500    /// The policy to use to downgrade `self.implied_read_hold`.
2501    ///
2502    /// If `None`, the collection is a write-only collection (i.e. a sink). For write-only
2503    /// collections, the `implied_read_hold` is only required for maintaining read holds on the
2504    /// inputs, so we can immediately downgrade it to the `write_frontier`.
2505    read_policy: Option<ReadPolicy>,
2506
2507    /// Storage identifiers on which this collection depends, and read holds this collection
2508    /// requires on them.
2509    storage_dependencies: BTreeMap<GlobalId, ReadHold>,
2510    /// Compute identifiers on which this collection depends, and read holds this collection
2511    /// requires on them.
2512    compute_dependencies: BTreeMap<GlobalId, ReadHold>,
2513
2514    /// Introspection state associated with this collection.
2515    introspection: CollectionIntrospection,
2516
2517    /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
2518    /// introspection update.
2519    ///
2520    /// Keys are `(period, lag, labels)` triples, values are counts.
2521    ///
2522    /// If this is `None`, wallclock lag is not tracked for this collection.
2523    wallclock_lag_histogram_stash: Option<
2524        BTreeMap<
2525            (
2526                WallclockLagHistogramPeriod,
2527                WallclockLag,
2528                BTreeMap<&'static str, String>,
2529            ),
2530            Diff,
2531        >,
2532    >,
2533}
2534
2535impl CollectionState {
2536    /// Creates a new collection state, with an initial read policy valid from `since`.
2537    fn new(
2538        collection_id: GlobalId,
2539        as_of: Antichain<Timestamp>,
2540        shared: SharedCollectionState,
2541        storage_dependencies: BTreeMap<GlobalId, ReadHold>,
2542        compute_dependencies: BTreeMap<GlobalId, ReadHold>,
2543        read_hold_tx: read_holds::ChangeTx,
2544        introspection: CollectionIntrospection,
2545    ) -> Self {
2546        // A collection is not readable before the `as_of`.
2547        let since = as_of.clone();
2548        // A collection won't produce updates for times before the `as_of`.
2549        let upper = as_of;
2550
2551        // Ensure that the provided `shared` is valid for the given `as_of`.
2552        assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2553        assert!(shared.lock_write_frontier(|f| f == &upper));
2554
2555        // Initialize collection read holds.
2556        // Note that the implied read hold was already added to the `read_capabilities` when
2557        // `shared` was created, so we only need to add the warmup read hold here.
2558        let implied_read_hold =
2559            ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2560        let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2561
2562        let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2563        shared.lock_read_capabilities(|c| {
2564            c.update_iter(updates);
2565        });
2566
2567        // In an effort to keep the produced wallclock lag introspection data small and
2568        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
2569        // select indexes and subscribes.
2570        let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2571            true => None,
2572            false => Some(Default::default()),
2573        };
2574
2575        Self {
2576            target_replica: None,
2577            log_collection: false,
2578            dropped: false,
2579            scheduled: false,
2580            read_only: true,
2581            shared,
2582            implied_read_hold,
2583            warmup_read_hold,
2584            read_policy: Some(ReadPolicy::ValidFrom(since)),
2585            storage_dependencies,
2586            compute_dependencies,
2587            introspection,
2588            wallclock_lag_histogram_stash,
2589        }
2590    }
2591
2592    /// Creates a new collection state for a log collection.
2593    fn new_log_collection(
2594        id: GlobalId,
2595        shared: SharedCollectionState,
2596        read_hold_tx: read_holds::ChangeTx,
2597        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2598    ) -> Self {
2599        let since = Antichain::from_elem(Timestamp::MIN);
2600        let introspection = CollectionIntrospection::new(
2601            id,
2602            introspection_tx,
2603            since.clone(),
2604            false,
2605            None,
2606            None,
2607            Vec::new(),
2608        );
2609        let mut state = Self::new(
2610            id,
2611            since,
2612            shared,
2613            Default::default(),
2614            Default::default(),
2615            read_hold_tx,
2616            introspection,
2617        );
2618        state.log_collection = true;
2619        // Log collections are created and scheduled implicitly as part of replica initialization.
2620        state.scheduled = true;
2621        state
2622    }
2623
2624    /// Reports the current read frontier.
2625    fn read_frontier(&self) -> Antichain<Timestamp> {
2626        self.shared
2627            .lock_read_capabilities(|c| c.frontier().to_owned())
2628    }
2629
2630    /// Reports the current write frontier.
2631    fn write_frontier(&self) -> Antichain<Timestamp> {
2632        self.shared.lock_write_frontier(|f| f.clone())
2633    }
2634
2635    fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2636        self.storage_dependencies.keys().copied()
2637    }
2638
2639    fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2640        self.compute_dependencies.keys().copied()
2641    }
2642}
2643
2644/// Collection state shared with the `ComputeController`.
2645///
2646/// Having this allows certain controller APIs, such as `ComputeController::collection_frontiers`
2647/// and `ComputeController::acquire_read_hold` to be non-`async`. This comes at the cost of
2648/// complexity (by introducing shared mutable state) and performance (by introducing locking). We
2649/// should aim to reduce the amount of shared state over time, rather than expand it.
2650///
2651/// Note that [`SharedCollectionState`]s are initialized by the `ComputeController` prior to the
2652/// collection's creation in the [`Instance`]. This is to allow compute clients to query frontiers
2653/// and take new read holds immediately, without having to wait for the [`Instance`] to update.
2654#[derive(Clone, Debug)]
2655pub(super) struct SharedCollectionState {
2656    /// Accumulation of read capabilities for the collection.
2657    ///
2658    /// This accumulation contains the capabilities held by all [`ReadHold`]s given out for the
2659    /// collection, including `implied_read_hold` and `warmup_read_hold`.
2660    ///
2661    /// NOTE: This field may only be modified by [`Instance::apply_read_hold_change`],
2662    /// [`Instance::acquire_read_hold`], and `ComputeController::acquire_read_hold`.
2663    /// Nobody else should modify read capabilities directly. Instead, collection users should
2664    /// manage read holds through [`ReadHold`] objects acquired through
2665    /// `ComputeController::acquire_read_hold`.
2666    ///
2667    /// TODO(teskje): Restructure the code to enforce the above in the type system.
2668    read_capabilities: Arc<Mutex<MutableAntichain<Timestamp>>>,
2669    /// The write frontier of this collection.
2670    write_frontier: Arc<Mutex<Antichain<Timestamp>>>,
2671}
2672
2673impl SharedCollectionState {
2674    pub fn new(as_of: Antichain<Timestamp>) -> Self {
2675        // A collection is not readable before the `as_of`.
2676        let since = as_of.clone();
2677        // A collection won't produce updates for times before the `as_of`.
2678        let upper = as_of;
2679
2680        // Initialize read capabilities to the `since`.
2681        // The is the implied read capability. The corresponding [`ReadHold`] is created in
2682        // [`CollectionState::new`].
2683        let mut read_capabilities = MutableAntichain::new();
2684        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2685
2686        Self {
2687            read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2688            write_frontier: Arc::new(Mutex::new(upper)),
2689        }
2690    }
2691
2692    pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2693    where
2694        F: FnOnce(&mut MutableAntichain<Timestamp>) -> R,
2695    {
2696        let mut caps = self.read_capabilities.lock().expect("poisoned");
2697        f(&mut *caps)
2698    }
2699
2700    pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2701    where
2702        F: FnOnce(&mut Antichain<Timestamp>) -> R,
2703    {
2704        let mut frontier = self.write_frontier.lock().expect("poisoned");
2705        f(&mut *frontier)
2706    }
2707}
2708
2709/// Manages certain introspection relations associated with a collection. Upon creation, it adds
2710/// rows to introspection relations. When dropped, it retracts its managed rows.
2711#[derive(Debug)]
2712struct CollectionIntrospection {
2713    /// The ID of the compute collection.
2714    collection_id: GlobalId,
2715    /// A channel through which introspection updates are delivered.
2716    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2717    /// Introspection state for `IntrospectionType::Frontiers`.
2718    ///
2719    /// `Some` if the collection does _not_ sink into a storage collection (i.e. is not an MV). If
2720    /// the collection sinks into storage, the storage controller reports its frontiers instead.
2721    frontiers: Option<FrontiersIntrospectionState>,
2722    /// Introspection state for `IntrospectionType::ComputeMaterializedViewRefreshes`.
2723    ///
2724    /// `Some` if the collection is a REFRESH MV.
2725    refresh: Option<RefreshIntrospectionState>,
2726    /// The IDs of the collection's dependencies, for `IntrospectionType::ComputeDependencies`.
2727    dependency_ids: Vec<GlobalId>,
2728}
2729
2730impl CollectionIntrospection {
2731    fn new(
2732        collection_id: GlobalId,
2733        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2734        as_of: Antichain<Timestamp>,
2735        storage_sink: bool,
2736        initial_as_of: Option<Antichain<Timestamp>>,
2737        refresh_schedule: Option<RefreshSchedule>,
2738        dependency_ids: Vec<GlobalId>,
2739    ) -> Self {
2740        let refresh =
2741            match (refresh_schedule, initial_as_of) {
2742                (Some(refresh_schedule), Some(initial_as_of)) => Some(
2743                    RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2744                ),
2745                (refresh_schedule, _) => {
2746                    // If we have a `refresh_schedule`, then the collection is a MV, so we should also have
2747                    // an `initial_as_of`.
2748                    soft_assert_or_log!(
2749                        refresh_schedule.is_none(),
2750                        "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2751                    );
2752                    None
2753                }
2754            };
2755        let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2756
2757        let self_ = Self {
2758            collection_id,
2759            introspection_tx,
2760            frontiers,
2761            refresh,
2762            dependency_ids,
2763        };
2764
2765        self_.report_initial_state();
2766        self_
2767    }
2768
2769    /// Reports the initial introspection state.
2770    fn report_initial_state(&self) {
2771        if let Some(frontiers) = &self.frontiers {
2772            let row = frontiers.row_for_collection(self.collection_id);
2773            let updates = vec![(row, Diff::ONE)];
2774            self.send(IntrospectionType::Frontiers, updates);
2775        }
2776
2777        if let Some(refresh) = &self.refresh {
2778            let row = refresh.row_for_collection(self.collection_id);
2779            let updates = vec![(row, Diff::ONE)];
2780            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2781        }
2782
2783        if !self.dependency_ids.is_empty() {
2784            let updates = self.dependency_rows(Diff::ONE);
2785            self.send(IntrospectionType::ComputeDependencies, updates);
2786        }
2787    }
2788
2789    /// Produces rows for the `ComputeDependencies` introspection relation.
2790    fn dependency_rows(&self, diff: Diff) -> Vec<(Row, Diff)> {
2791        self.dependency_ids
2792            .iter()
2793            .map(|dependency_id| {
2794                let row = Row::pack_slice(&[
2795                    Datum::String(&self.collection_id.to_string()),
2796                    Datum::String(&dependency_id.to_string()),
2797                ]);
2798                (row, diff)
2799            })
2800            .collect()
2801    }
2802
2803    /// Observe the given current collection frontiers and update the introspection state as
2804    /// necessary.
2805    fn observe_frontiers(
2806        &mut self,
2807        read_frontier: &Antichain<Timestamp>,
2808        write_frontier: &Antichain<Timestamp>,
2809    ) {
2810        self.update_frontier_introspection(read_frontier, write_frontier);
2811        self.update_refresh_introspection(write_frontier);
2812    }
2813
2814    fn update_frontier_introspection(
2815        &mut self,
2816        read_frontier: &Antichain<Timestamp>,
2817        write_frontier: &Antichain<Timestamp>,
2818    ) {
2819        let Some(frontiers) = &mut self.frontiers else {
2820            return;
2821        };
2822
2823        if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2824        {
2825            return; // no change
2826        };
2827
2828        let retraction = frontiers.row_for_collection(self.collection_id);
2829        frontiers.update(read_frontier, write_frontier);
2830        let insertion = frontiers.row_for_collection(self.collection_id);
2831        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2832        self.send(IntrospectionType::Frontiers, updates);
2833    }
2834
2835    fn update_refresh_introspection(&mut self, write_frontier: &Antichain<Timestamp>) {
2836        let Some(refresh) = &mut self.refresh else {
2837            return;
2838        };
2839
2840        let retraction = refresh.row_for_collection(self.collection_id);
2841        refresh.frontier_update(write_frontier);
2842        let insertion = refresh.row_for_collection(self.collection_id);
2843
2844        if retraction == insertion {
2845            return; // no change
2846        }
2847
2848        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2849        self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2850    }
2851
2852    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2853        // Failure to send means the `ComputeController` has been dropped and doesn't care about
2854        // introspection updates anymore.
2855        let _ = self.introspection_tx.send((introspection_type, updates));
2856    }
2857}
2858
2859impl Drop for CollectionIntrospection {
2860    fn drop(&mut self) {
2861        // Retract collection frontiers.
2862        if let Some(frontiers) = &self.frontiers {
2863            let row = frontiers.row_for_collection(self.collection_id);
2864            let updates = vec![(row, Diff::MINUS_ONE)];
2865            self.send(IntrospectionType::Frontiers, updates);
2866        }
2867
2868        // Retract MV refresh state.
2869        if let Some(refresh) = &self.refresh {
2870            let retraction = refresh.row_for_collection(self.collection_id);
2871            let updates = vec![(retraction, Diff::MINUS_ONE)];
2872            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2873        }
2874
2875        // Retract collection dependencies.
2876        if !self.dependency_ids.is_empty() {
2877            let updates = self.dependency_rows(Diff::MINUS_ONE);
2878            self.send(IntrospectionType::ComputeDependencies, updates);
2879        }
2880    }
2881}
2882
2883#[derive(Debug)]
2884struct FrontiersIntrospectionState {
2885    read_frontier: Antichain<Timestamp>,
2886    write_frontier: Antichain<Timestamp>,
2887}
2888
2889impl FrontiersIntrospectionState {
2890    fn new(as_of: Antichain<Timestamp>) -> Self {
2891        Self {
2892            read_frontier: as_of.clone(),
2893            write_frontier: as_of,
2894        }
2895    }
2896
2897    /// Return a `Row` reflecting the current collection frontiers.
2898    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2899        let read_frontier = self
2900            .read_frontier
2901            .as_option()
2902            .map_or(Datum::Null, |ts| ts.clone().into());
2903        let write_frontier = self
2904            .write_frontier
2905            .as_option()
2906            .map_or(Datum::Null, |ts| ts.clone().into());
2907        Row::pack_slice(&[
2908            Datum::String(&collection_id.to_string()),
2909            read_frontier,
2910            write_frontier,
2911        ])
2912    }
2913
2914    /// Update the introspection state with the given new frontiers.
2915    fn update(
2916        &mut self,
2917        read_frontier: &Antichain<Timestamp>,
2918        write_frontier: &Antichain<Timestamp>,
2919    ) {
2920        if read_frontier != &self.read_frontier {
2921            self.read_frontier.clone_from(read_frontier);
2922        }
2923        if write_frontier != &self.write_frontier {
2924            self.write_frontier.clone_from(write_frontier);
2925        }
2926    }
2927}
2928
2929/// Information needed to compute introspection updates for a REFRESH materialized view when the
2930/// write frontier advances.
2931#[derive(Debug)]
2932struct RefreshIntrospectionState {
2933    // Immutable properties of the MV
2934    refresh_schedule: RefreshSchedule,
2935    initial_as_of: Antichain<Timestamp>,
2936    // Refresh state
2937    next_refresh: Datum<'static>,           // Null or an MzTimestamp
2938    last_completed_refresh: Datum<'static>, // Null or an MzTimestamp
2939}
2940
2941impl RefreshIntrospectionState {
2942    /// Return a `Row` reflecting the current refresh introspection state.
2943    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2944        Row::pack_slice(&[
2945            Datum::String(&collection_id.to_string()),
2946            self.last_completed_refresh,
2947            self.next_refresh,
2948        ])
2949    }
2950}
2951
2952impl RefreshIntrospectionState {
2953    /// Construct a new [`RefreshIntrospectionState`], and apply an initial `frontier_update()` at
2954    /// the `upper`.
2955    fn new(
2956        refresh_schedule: RefreshSchedule,
2957        initial_as_of: Antichain<Timestamp>,
2958        upper: &Antichain<Timestamp>,
2959    ) -> Self {
2960        let mut self_ = Self {
2961            refresh_schedule: refresh_schedule.clone(),
2962            initial_as_of: initial_as_of.clone(),
2963            next_refresh: Datum::Null,
2964            last_completed_refresh: Datum::Null,
2965        };
2966        self_.frontier_update(upper);
2967        self_
2968    }
2969
2970    /// Should be called whenever the write frontier of the collection advances. It updates the
2971    /// state that should be recorded in introspection relations, but doesn't send the updates yet.
2972    fn frontier_update(&mut self, write_frontier: &Antichain<Timestamp>) {
2973        if write_frontier.is_empty() {
2974            self.last_completed_refresh =
2975                if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2976                    last_refresh.into()
2977                } else {
2978                    // If there is no last refresh, then we have a `REFRESH EVERY`, in which case
2979                    // the saturating roundup puts a refresh at the maximum possible timestamp.
2980                    Timestamp::MAX.into()
2981                };
2982            self.next_refresh = Datum::Null;
2983        } else {
2984            if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2985                // We are before the first refresh.
2986                self.last_completed_refresh = Datum::Null;
2987                let initial_as_of = self.initial_as_of.as_option().expect(
2988                    "initial_as_of can't be [], because then there would be no refreshes at all",
2989                );
2990                let first_refresh = self
2991                    .refresh_schedule
2992                    .round_up_timestamp(*initial_as_of)
2993                    .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2994                soft_assert_or_log!(
2995                    first_refresh == *initial_as_of,
2996                    "initial_as_of should be set to the first refresh"
2997                );
2998                self.next_refresh = first_refresh.into();
2999            } else {
3000                // The first refresh has already happened.
3001                let write_frontier = write_frontier.as_option().expect("checked above");
3002                self.last_completed_refresh = self
3003                    .refresh_schedule
3004                    .round_down_timestamp_m1(*write_frontier)
3005                    .map_or_else(
3006                        || {
3007                            soft_panic_or_log!(
3008                                "rounding down should have returned the first refresh or later"
3009                            );
3010                            Datum::Null
3011                        },
3012                        |last_completed_refresh| last_completed_refresh.into(),
3013                    );
3014                self.next_refresh = write_frontier.clone().into();
3015            }
3016        }
3017    }
3018}
3019
3020/// A note of an outstanding peek response.
3021#[derive(Debug)]
3022struct PendingPeek {
3023    /// For replica-targeted peeks, this specifies the replica whose response we should pass on.
3024    ///
3025    /// If this value is `None`, we pass on the first response.
3026    target_replica: Option<ReplicaId>,
3027    /// The OpenTelemetry context for this peek.
3028    otel_ctx: OpenTelemetryContext,
3029    /// The time at which the peek was requested.
3030    ///
3031    /// Used to track peek durations.
3032    requested_at: Instant,
3033    /// The read hold installed to serve this peek.
3034    read_hold: ReadHold,
3035    /// The channel to send peek results.
3036    peek_response_tx: oneshot::Sender<PeekResponse>,
3037    /// An optional limit of the peek's result size.
3038    limit: Option<usize>,
3039    /// The offset into the peek's result.
3040    offset: usize,
3041}
3042
3043#[derive(Debug, Clone)]
3044struct ActiveSubscribe {
3045    /// Current upper frontier of this subscribe.
3046    frontier: Antichain<Timestamp>,
3047}
3048
3049impl Default for ActiveSubscribe {
3050    fn default() -> Self {
3051        Self {
3052            frontier: Antichain::from_elem(Timestamp::MIN),
3053        }
3054    }
3055}
3056
3057/// State maintained about individual replicas.
3058#[derive(Debug)]
3059struct ReplicaState {
3060    /// The ID of the replica.
3061    id: ReplicaId,
3062    /// Client for the running replica task.
3063    client: ReplicaClient,
3064    /// The replica configuration.
3065    config: ReplicaConfig,
3066    /// Replica metrics.
3067    metrics: ReplicaMetrics,
3068    /// A channel through which introspection updates are delivered.
3069    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3070    /// Per-replica collection state.
3071    collections: BTreeMap<GlobalId, ReplicaCollectionState>,
3072    /// The epoch of the replica.
3073    epoch: u64,
3074}
3075
3076impl ReplicaState {
3077    fn new(
3078        id: ReplicaId,
3079        client: ReplicaClient,
3080        config: ReplicaConfig,
3081        metrics: ReplicaMetrics,
3082        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3083        epoch: u64,
3084    ) -> Self {
3085        Self {
3086            id,
3087            client,
3088            config,
3089            metrics,
3090            introspection_tx,
3091            epoch,
3092            collections: Default::default(),
3093        }
3094    }
3095
3096    /// Add a collection to the replica state.
3097    ///
3098    /// # Panics
3099    ///
3100    /// Panics if a collection with the same ID exists already.
3101    fn add_collection(
3102        &mut self,
3103        id: GlobalId,
3104        as_of: Antichain<Timestamp>,
3105        input_read_holds: Vec<ReadHold>,
3106    ) {
3107        let metrics = self.metrics.for_collection(id);
3108        let introspection = ReplicaCollectionIntrospection::new(
3109            self.id,
3110            id,
3111            self.introspection_tx.clone(),
3112            as_of.clone(),
3113        );
3114        let mut state =
3115            ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
3116
3117        // In an effort to keep the produced wallclock lag introspection data small and
3118        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
3119        // select indexes and subscribes.
3120        if id.is_transient() {
3121            state.wallclock_lag_max = None;
3122        }
3123
3124        if let Some(previous) = self.collections.insert(id, state) {
3125            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
3126        }
3127    }
3128
3129    /// Remove state for a collection.
3130    fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState> {
3131        self.collections.remove(&id)
3132    }
3133
3134    /// Returns whether all replica frontiers of the given collection are empty.
3135    fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
3136        self.collections.get(&id).map_or(true, |c| {
3137            c.write_frontier.is_empty()
3138                && c.input_frontier.is_empty()
3139                && c.output_frontier.is_empty()
3140        })
3141    }
3142
3143    /// Returns the state of the [`ReplicaState`] formatted as JSON.
3144    ///
3145    /// The returned value is not guaranteed to be stable and may change at any point in time.
3146    #[mz_ore::instrument(level = "debug")]
3147    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3148        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
3149        // returned object as a tradeoff between usability and stability. `serde_json` will fail
3150        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
3151        // prevents a future unrelated change from silently breaking this method.
3152
3153        // Destructure `self` here so we don't forget to consider dumping newly added fields.
3154        let Self {
3155            id,
3156            client: _,
3157            config: _,
3158            metrics: _,
3159            introspection_tx: _,
3160            epoch,
3161            collections,
3162        } = self;
3163
3164        let collections: BTreeMap<_, _> = collections
3165            .iter()
3166            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
3167            .collect();
3168
3169        Ok(serde_json::json!({
3170            "id": id.to_string(),
3171            "collections": collections,
3172            "epoch": epoch,
3173        }))
3174    }
3175}
3176
3177#[derive(Debug)]
3178struct ReplicaCollectionState {
3179    /// The replica write frontier of this collection.
3180    ///
3181    /// See [`FrontiersResponse::write_frontier`].
3182    write_frontier: Antichain<Timestamp>,
3183    /// The replica input frontier of this collection.
3184    ///
3185    /// See [`FrontiersResponse::input_frontier`].
3186    input_frontier: Antichain<Timestamp>,
3187    /// The replica output frontier of this collection.
3188    ///
3189    /// See [`FrontiersResponse::output_frontier`].
3190    output_frontier: Antichain<Timestamp>,
3191
3192    /// Metrics tracked for this collection.
3193    ///
3194    /// If this is `None`, no metrics are collected.
3195    metrics: Option<ReplicaCollectionMetrics>,
3196    /// As-of frontier with which this collection was installed on the replica.
3197    as_of: Antichain<Timestamp>,
3198    /// Tracks introspection state for this collection.
3199    introspection: ReplicaCollectionIntrospection,
3200    /// Read holds on storage inputs to this collection.
3201    ///
3202    /// These read holds are kept to ensure that the replica is able to read from storage inputs at
3203    /// all times it hasn't read yet. We only need to install read holds for storage inputs since
3204    /// compaction of compute inputs is implicitly held back by Timely/DD.
3205    input_read_holds: Vec<ReadHold>,
3206
3207    /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
3208    ///
3209    /// If this is `None`, wallclock lag is not tracked for this collection.
3210    wallclock_lag_max: Option<WallclockLag>,
3211}
3212
3213impl ReplicaCollectionState {
3214    fn new(
3215        metrics: Option<ReplicaCollectionMetrics>,
3216        as_of: Antichain<Timestamp>,
3217        introspection: ReplicaCollectionIntrospection,
3218        input_read_holds: Vec<ReadHold>,
3219    ) -> Self {
3220        Self {
3221            write_frontier: as_of.clone(),
3222            input_frontier: as_of.clone(),
3223            output_frontier: as_of.clone(),
3224            metrics,
3225            as_of,
3226            introspection,
3227            input_read_holds,
3228            wallclock_lag_max: Some(WallclockLag::MIN),
3229        }
3230    }
3231
3232    /// Returns whether this collection is hydrated.
3233    fn hydrated(&self) -> bool {
3234        // If the observed frontier is greater than the collection's as-of, the collection has
3235        // produced some output and is therefore hydrated.
3236        //
3237        // We need to consider the edge case where the as-of is the empty frontier. Such an as-of
3238        // is not useful for indexes, because they wouldn't be readable. For write-only
3239        // collections, an empty as-of means that the collection has been fully written and no new
3240        // dataflow needs to be created for it. Consequently, no hydration will happen either.
3241        //
3242        // Based on this, we could respond in two ways:
3243        //  * `false`, as in "the dataflow was never created"
3244        //  * `true`, as in "the dataflow completed immediately"
3245        //
3246        // Since hydration is often used as a measure of dataflow progress and we don't want to
3247        // give the impression that certain dataflows are somehow stuck when they are not, we go
3248        // with the second interpretation here.
3249        self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3250    }
3251
3252    /// Updates the replica write frontier of this collection.
3253    fn update_write_frontier(&mut self, new_frontier: Antichain<Timestamp>) {
3254        if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3255            soft_panic_or_log!(
3256                "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3257                self.write_frontier,
3258            );
3259            return;
3260        } else if new_frontier == self.write_frontier {
3261            return;
3262        }
3263
3264        self.write_frontier = new_frontier;
3265    }
3266
3267    /// Updates the replica input frontier of this collection.
3268    fn update_input_frontier(&mut self, new_frontier: Antichain<Timestamp>) {
3269        if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3270            soft_panic_or_log!(
3271                "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3272                self.input_frontier,
3273            );
3274            return;
3275        } else if new_frontier == self.input_frontier {
3276            return;
3277        }
3278
3279        self.input_frontier = new_frontier;
3280
3281        // Relax our read holds on collection inputs.
3282        for read_hold in &mut self.input_read_holds {
3283            let result = read_hold.try_downgrade(self.input_frontier.clone());
3284            soft_assert_or_log!(
3285                result.is_ok(),
3286                "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3287                self.input_frontier,
3288            );
3289        }
3290    }
3291
3292    /// Updates the replica output frontier of this collection.
3293    fn update_output_frontier(&mut self, new_frontier: Antichain<Timestamp>) {
3294        if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3295            soft_panic_or_log!(
3296                "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3297                self.output_frontier,
3298            );
3299            return;
3300        } else if new_frontier == self.output_frontier {
3301            return;
3302        }
3303
3304        self.output_frontier = new_frontier;
3305    }
3306}
3307
3308/// Maintains the introspection state for a given replica and collection, and ensures that reported
3309/// introspection data is retracted when the collection is dropped.
3310#[derive(Debug)]
3311struct ReplicaCollectionIntrospection {
3312    /// The ID of the replica.
3313    replica_id: ReplicaId,
3314    /// The ID of the compute collection.
3315    collection_id: GlobalId,
3316    /// The collection's reported replica write frontier.
3317    write_frontier: Antichain<Timestamp>,
3318    /// A channel through which introspection updates are delivered.
3319    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3320}
3321
3322impl ReplicaCollectionIntrospection {
3323    /// Create a new `HydrationState` and initialize introspection.
3324    fn new(
3325        replica_id: ReplicaId,
3326        collection_id: GlobalId,
3327        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3328        as_of: Antichain<Timestamp>,
3329    ) -> Self {
3330        let self_ = Self {
3331            replica_id,
3332            collection_id,
3333            write_frontier: as_of,
3334            introspection_tx,
3335        };
3336
3337        self_.report_initial_state();
3338        self_
3339    }
3340
3341    /// Reports the initial introspection state.
3342    fn report_initial_state(&self) {
3343        let row = self.write_frontier_row();
3344        let updates = vec![(row, Diff::ONE)];
3345        self.send(IntrospectionType::ReplicaFrontiers, updates);
3346    }
3347
3348    /// Observe the given current write frontier and update the introspection state as necessary.
3349    fn observe_frontier(&mut self, write_frontier: &Antichain<Timestamp>) {
3350        if self.write_frontier == *write_frontier {
3351            return; // no change
3352        }
3353
3354        let retraction = self.write_frontier_row();
3355        self.write_frontier.clone_from(write_frontier);
3356        let insertion = self.write_frontier_row();
3357
3358        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3359        self.send(IntrospectionType::ReplicaFrontiers, updates);
3360    }
3361
3362    /// Return a `Row` reflecting the current replica write frontier.
3363    fn write_frontier_row(&self) -> Row {
3364        let write_frontier = self
3365            .write_frontier
3366            .as_option()
3367            .map_or(Datum::Null, |ts| ts.clone().into());
3368        Row::pack_slice(&[
3369            Datum::String(&self.collection_id.to_string()),
3370            Datum::String(&self.replica_id.to_string()),
3371            write_frontier,
3372        ])
3373    }
3374
3375    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3376        // Failure to send means the `ComputeController` has been dropped and doesn't care about
3377        // introspection updates anymore.
3378        let _ = self.introspection_tx.send((introspection_type, updates));
3379    }
3380}
3381
3382impl Drop for ReplicaCollectionIntrospection {
3383    fn drop(&mut self) {
3384        // Retract the write frontier.
3385        let row = self.write_frontier_row();
3386        let updates = vec![(row, Diff::MINUS_ONE)];
3387        self.send(IntrospectionType::ReplicaFrontiers, updates);
3388    }
3389}