Skip to main content

mz_compute_client/controller/
instance.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller for a compute instance.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
21use mz_compute_types::plan::render_plan::RenderPlan;
22use mz_compute_types::sinks::{
23    ComputeSinkConnection, ComputeSinkDesc, MaterializedViewSinkConnection,
24};
25use mz_compute_types::sources::SourceInstanceDesc;
26use mz_controller_types::dyncfgs::{
27    ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, WALLCLOCK_LAG_RECORDING_INTERVAL,
28};
29use mz_dyncfg::ConfigSet;
30use mz_expr::RowSetFinishing;
31use mz_ore::cast::CastFrom;
32use mz_ore::channel::instrumented_unbounded_channel;
33use mz_ore::now::NowFn;
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{soft_assert_or_log, soft_panic_or_log};
36use mz_persist_types::PersistLocation;
37use mz_repr::adt::timestamp::CheckedTimestamp;
38use mz_repr::refresh_schedule::RefreshSchedule;
39use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row, Timestamp};
40use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
41use mz_storage_types::read_holds::{self, ReadHold};
42use mz_storage_types::read_policy::ReadPolicy;
43use thiserror::Error;
44use timely::PartialOrder;
45use timely::progress::frontier::MutableAntichain;
46use timely::progress::{Antichain, ChangeBatch};
47use tokio::sync::{mpsc, oneshot};
48use uuid::Uuid;
49
50use crate::controller::error::{
51    CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
52};
53use crate::controller::instance_client::PeekError;
54use crate::controller::replica::{ReplicaClient, ReplicaConfig};
55use crate::controller::{
56    ComputeControllerResponse, IntrospectionUpdates, PeekNotification, ReplicaId,
57    StorageCollections,
58};
59use crate::logging::LogVariant;
60use crate::metrics::IntCounter;
61use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
62use crate::protocol::command::{
63    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
64};
65use crate::protocol::history::ComputeCommandHistory;
66use crate::protocol::response::{
67    ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StatusResponse,
68    SubscribeBatch, SubscribeResponse,
69};
70
71#[derive(Error, Debug)]
72#[error("replica exists already: {0}")]
73pub(super) struct ReplicaExists(pub ReplicaId);
74
75#[derive(Error, Debug)]
76#[error("replica does not exist: {0}")]
77pub(super) struct ReplicaMissing(pub ReplicaId);
78
79#[derive(Error, Debug)]
80pub(super) enum DataflowCreationError {
81    #[error("collection does not exist: {0}")]
82    CollectionMissing(GlobalId),
83    #[error("replica does not exist: {0}")]
84    ReplicaMissing(ReplicaId),
85    #[error("dataflow definition lacks an as_of value")]
86    MissingAsOf,
87    #[error("subscribe dataflow has an empty as_of")]
88    EmptyAsOfForSubscribe,
89    #[error("copy to dataflow has an empty as_of")]
90    EmptyAsOfForCopyTo,
91    #[error("no read hold provided for dataflow import: {0}")]
92    ReadHoldMissing(GlobalId),
93    #[error("insufficient read hold provided for dataflow import: {0}")]
94    ReadHoldInsufficient(GlobalId),
95}
96
97impl From<CollectionMissing> for DataflowCreationError {
98    fn from(error: CollectionMissing) -> Self {
99        Self::CollectionMissing(error.0)
100    }
101}
102
103#[derive(Error, Debug)]
104pub(super) enum ReadPolicyError {
105    #[error("collection does not exist: {0}")]
106    CollectionMissing(GlobalId),
107    #[error("collection is write-only: {0}")]
108    WriteOnlyCollection(GlobalId),
109}
110
111impl From<CollectionMissing> for ReadPolicyError {
112    fn from(error: CollectionMissing) -> Self {
113        Self::CollectionMissing(error.0)
114    }
115}
116
117/// A command sent to an [`Instance`] task.
118pub(super) type Command = Box<dyn FnOnce(&mut Instance) + Send>;
119
120/// A response from a replica, composed of a replica ID, the replica's current epoch, and the
121/// compute response itself.
122pub(super) type ReplicaResponse = (ReplicaId, u64, ComputeResponse);
123
124/// The state we keep for a compute instance.
125pub(super) struct Instance {
126    /// Build info for spawning replicas
127    build_info: &'static BuildInfo,
128    /// A handle providing access to storage collections.
129    storage_collections: StorageCollections,
130    /// Whether instance initialization has been completed.
131    initialized: bool,
132    /// Whether this instance is in read-only mode.
133    ///
134    /// When in read-only mode, this instance will not update persistent state, such as
135    /// wallclock lag introspection.
136    read_only: bool,
137    /// The workload class of this instance.
138    ///
139    /// This is currently only used to annotate metrics.
140    workload_class: Option<String>,
141    /// The replicas of this compute instance.
142    replicas: BTreeMap<ReplicaId, ReplicaState>,
143    /// Currently installed compute collections.
144    ///
145    /// New entries are added for all collections exported from dataflows created through
146    /// [`Instance::create_dataflow`].
147    ///
148    /// Entries are removed by [`Instance::cleanup_collections`]. See that method's documentation
149    /// about the conditions for removing collection state.
150    collections: BTreeMap<GlobalId, CollectionState>,
151    /// IDs of log sources maintained by this compute instance.
152    log_sources: BTreeMap<LogVariant, GlobalId>,
153    /// Currently outstanding peeks.
154    ///
155    /// New entries are added for all peeks initiated through [`Instance::peek`].
156    ///
157    /// The entry for a peek is only removed once all replicas have responded to the peek. This is
158    /// currently required to ensure all replicas have stopped reading from the peeked collection's
159    /// inputs before we allow them to compact. database-issues#4822 tracks changing this so we only have to wait
160    /// for the first peek response.
161    peeks: BTreeMap<Uuid, PendingPeek>,
162    /// Currently in-progress subscribes.
163    ///
164    /// New entries are added for all subscribes exported from dataflows created through
165    /// [`Instance::create_dataflow`].
166    ///
167    /// The entry for a subscribe is removed once at least one replica has reported the subscribe
168    /// to have advanced to the empty frontier or to have been dropped, implying that no further
169    /// updates will be emitted for this subscribe.
170    ///
171    /// Note that subscribes are tracked both in `collections` and `subscribes`. `collections`
172    /// keeps track of the subscribe's upper and since frontiers and ensures appropriate read holds
173    /// on the subscribe's input. `subscribes` is only used to track which updates have been
174    /// emitted, to decide if new ones should be emitted or suppressed.
175    subscribes: BTreeMap<GlobalId, ActiveSubscribe>,
176    /// Tracks all in-progress COPY TOs.
177    ///
178    /// New entries are added for all s3 oneshot sinks (corresponding to a COPY TO) exported from
179    /// dataflows created through [`Instance::create_dataflow`].
180    ///
181    /// The entry for a copy to is removed once at least one replica has finished
182    /// or the exporting collection is dropped.
183    copy_tos: BTreeSet<GlobalId>,
184    /// The command history, used when introducing new replicas or restarting existing replicas.
185    history: ComputeCommandHistory<UIntGauge>,
186    /// Receiver for commands to be executed.
187    command_rx: mpsc::UnboundedReceiver<Command>,
188    /// Sender for responses to be delivered.
189    response_tx: mpsc::UnboundedSender<ComputeControllerResponse>,
190    /// Sender for introspection updates to be recorded.
191    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
192    /// The registry the controller uses to report metrics.
193    metrics: InstanceMetrics,
194    /// Dynamic system configuration.
195    dyncfg: Arc<ConfigSet>,
196
197    /// The persist location where we can stash large peek results.
198    peek_stash_persist_location: PersistLocation,
199
200    /// A function that produces the current wallclock time.
201    now: NowFn,
202    /// A function that computes the lag between the given time and wallclock time.
203    wallclock_lag: WallclockLagFn<Timestamp>,
204    /// The last time wallclock lag introspection was recorded.
205    wallclock_lag_last_recorded: DateTime<Utc>,
206
207    /// Sender for updates to collection read holds.
208    ///
209    /// Copies of this sender are given to [`ReadHold`]s that are created in
210    /// [`CollectionState::new`].
211    read_hold_tx: read_holds::ChangeTx,
212    /// A sender for responses from replicas.
213    replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse, IntCounter>,
214    /// A receiver for responses from replicas.
215    replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse, IntCounter>,
216}
217
218impl Instance {
219    /// Acquire a handle to the collection state associated with `id`.
220    fn collection(&self, id: GlobalId) -> Result<&CollectionState, CollectionMissing> {
221        self.collections.get(&id).ok_or(CollectionMissing(id))
222    }
223
224    /// Acquire a mutable handle to the collection state associated with `id`.
225    fn collection_mut(&mut self, id: GlobalId) -> Result<&mut CollectionState, CollectionMissing> {
226        self.collections.get_mut(&id).ok_or(CollectionMissing(id))
227    }
228
229    /// Acquire a handle to the collection state associated with `id`.
230    ///
231    /// # Panics
232    ///
233    /// Panics if the identified collection does not exist.
234    fn expect_collection(&self, id: GlobalId) -> &CollectionState {
235        self.collections.get(&id).expect("collection must exist")
236    }
237
238    /// Acquire a mutable handle to the collection state associated with `id`.
239    ///
240    /// # Panics
241    ///
242    /// Panics if the identified collection does not exist.
243    fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
244        self.collections
245            .get_mut(&id)
246            .expect("collection must exist")
247    }
248
249    fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState)> {
250        self.collections.iter().map(|(id, coll)| (*id, coll))
251    }
252
253    /// Returns an iterator over replicas that host the given collection.
254    ///
255    /// For replica-targeted collections, this returns only the target replica.
256    /// For non-targeted collections, this returns all replicas.
257    ///
258    /// Returns `Err` if the collection does not exist.
259    fn replicas_hosting(
260        &self,
261        id: GlobalId,
262    ) -> Result<impl Iterator<Item = &ReplicaState>, CollectionMissing> {
263        let target = self.collection(id)?.target_replica;
264        Ok(self
265            .replicas
266            .values()
267            .filter(move |r| target.map_or(true, |t| t == r.id)))
268    }
269
270    /// Add a collection to the instance state.
271    ///
272    /// # Panics
273    ///
274    /// Panics if a collection with the same ID exists already.
275    fn add_collection(
276        &mut self,
277        id: GlobalId,
278        as_of: Antichain<Timestamp>,
279        shared: SharedCollectionState,
280        storage_dependencies: BTreeMap<GlobalId, ReadHold>,
281        compute_dependencies: BTreeMap<GlobalId, ReadHold>,
282        replica_input_read_holds: Vec<ReadHold>,
283        write_only: bool,
284        storage_sink: bool,
285        initial_as_of: Option<Antichain<Timestamp>>,
286        refresh_schedule: Option<RefreshSchedule>,
287        target_replica: Option<ReplicaId>,
288    ) {
289        // Add global collection state.
290        let dependency_ids: Vec<GlobalId> = compute_dependencies
291            .keys()
292            .chain(storage_dependencies.keys())
293            .copied()
294            .collect();
295        let introspection = CollectionIntrospection::new(
296            id,
297            self.introspection_tx.clone(),
298            as_of.clone(),
299            storage_sink,
300            initial_as_of,
301            refresh_schedule,
302            dependency_ids,
303        );
304        let mut state = CollectionState::new(
305            id,
306            as_of.clone(),
307            shared,
308            storage_dependencies,
309            compute_dependencies,
310            Arc::clone(&self.read_hold_tx),
311            introspection,
312        );
313        state.target_replica = target_replica;
314        // If the collection is write-only, clear its read policy to reflect that.
315        if write_only {
316            state.read_policy = None;
317        }
318
319        if let Some(previous) = self.collections.insert(id, state) {
320            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
321        }
322
323        // Add per-replica collection state.
324        for replica in self.replicas.values_mut() {
325            if target_replica.is_some_and(|id| id != replica.id) {
326                continue;
327            }
328            replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
329        }
330    }
331
332    fn remove_collection(&mut self, id: GlobalId) {
333        // Remove per-replica collection state.
334        for replica in self.replicas.values_mut() {
335            replica.remove_collection(id);
336        }
337
338        // Remove global collection state.
339        self.collections.remove(&id);
340    }
341
342    fn add_replica_state(
343        &mut self,
344        id: ReplicaId,
345        client: ReplicaClient,
346        config: ReplicaConfig,
347        epoch: u64,
348    ) {
349        let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
350
351        let metrics = self.metrics.for_replica(id);
352        let mut replica = ReplicaState::new(
353            id,
354            client,
355            config,
356            metrics,
357            self.introspection_tx.clone(),
358            epoch,
359        );
360
361        // Add per-replica collection state.
362        for (collection_id, collection) in &self.collections {
363            // Skip log collections not maintained by this replica,
364            // and collections targeted at a different replica.
365            if (collection.log_collection && !log_ids.contains(collection_id))
366                || collection.target_replica.is_some_and(|rid| rid != id)
367            {
368                continue;
369            }
370
371            let as_of = if collection.log_collection {
372                // For log collections, we don't send a `CreateDataflow` command to the replica, so
373                // it doesn't know which as-of the controler chose and defaults to the minimum
374                // frontier instead. We need to initialize the controller-side tracking with the
375                // same frontier, to avoid observing regressions in the reported frontiers.
376                Antichain::from_elem(Timestamp::MIN)
377            } else {
378                collection.read_frontier().to_owned()
379            };
380
381            let input_read_holds = collection.storage_dependencies.values().cloned().collect();
382            replica.add_collection(*collection_id, as_of, input_read_holds);
383        }
384
385        self.replicas.insert(id, replica);
386    }
387
388    /// Enqueue the given response for delivery to the controller clients.
389    fn deliver_response(&self, response: ComputeControllerResponse) {
390        // Failure to send means the `ComputeController` has been dropped and doesn't care about
391        // responses anymore.
392        let _ = self.response_tx.send(response);
393    }
394
395    /// Enqueue the given introspection updates for recording.
396    fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
397        // Failure to send means the `ComputeController` has been dropped and doesn't care about
398        // introspection updates anymore.
399        let _ = self.introspection_tx.send((type_, updates));
400    }
401
402    /// Returns whether the identified replica exists.
403    fn replica_exists(&self, id: ReplicaId) -> bool {
404        self.replicas.contains_key(&id)
405    }
406
407    /// Return the IDs of pending peeks targeting the specified replica.
408    fn peeks_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = (Uuid, &PendingPeek)> {
409        self.peeks.iter().filter_map(move |(uuid, peek)| {
410            if peek.target_replica == Some(replica_id) {
411                Some((*uuid, peek))
412            } else {
413                None
414            }
415        })
416    }
417
418    /// Return the IDs of in-progress subscribes targeting the specified replica.
419    fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
420        self.subscribes.keys().copied().filter(move |id| {
421            let collection = self.expect_collection(*id);
422            collection.target_replica == Some(replica_id)
423        })
424    }
425
426    /// Update introspection with the current collection frontiers.
427    ///
428    /// We could also do this directly in response to frontier changes, but doing it periodically
429    /// lets us avoid emitting some introspection updates that can be consolidated (e.g. a write
430    /// frontier updated immediately followed by a read frontier update).
431    ///
432    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
433    /// per second during normal operation.
434    fn update_frontier_introspection(&mut self) {
435        for collection in self.collections.values_mut() {
436            collection
437                .introspection
438                .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
439        }
440
441        for replica in self.replicas.values_mut() {
442            for collection in replica.collections.values_mut() {
443                collection
444                    .introspection
445                    .observe_frontier(&collection.write_frontier);
446            }
447        }
448    }
449
450    /// Refresh the controller state metrics for this instance.
451    ///
452    /// We could also do state metric updates directly in response to state changes, but that would
453    /// mean littering the code with metric update calls. Encapsulating state metric maintenance in
454    /// a single method is less noisy.
455    ///
456    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
457    /// per second during normal operation.
458    fn refresh_state_metrics(&self) {
459        let unscheduled_collections_count =
460            self.collections.values().filter(|c| !c.scheduled).count();
461        let connected_replica_count = self
462            .replicas
463            .values()
464            .filter(|r| r.client.is_connected())
465            .count();
466
467        self.metrics
468            .replica_count
469            .set(u64::cast_from(self.replicas.len()));
470        self.metrics
471            .collection_count
472            .set(u64::cast_from(self.collections.len()));
473        self.metrics
474            .collection_unscheduled_count
475            .set(u64::cast_from(unscheduled_collections_count));
476        self.metrics
477            .peek_count
478            .set(u64::cast_from(self.peeks.len()));
479        self.metrics
480            .subscribe_count
481            .set(u64::cast_from(self.subscribes.len()));
482        self.metrics
483            .copy_to_count
484            .set(u64::cast_from(self.copy_tos.len()));
485        self.metrics
486            .connected_replica_count
487            .set(u64::cast_from(connected_replica_count));
488    }
489
490    /// Refresh the wallclock lag introspection and metrics with the current lag values.
491    ///
492    /// This method produces wallclock lag metrics of two different shapes:
493    ///
494    /// * Histories: For each replica and each collection, we measure the lag of the write frontier
495    ///   behind the wallclock time every second. Every minute we emit the maximum lag observed
496    ///   over the last minute, together with the current time.
497    /// * Histograms: For each collection, we measure the lag of the write frontier behind
498    ///   wallclock time every second. Every minute we emit all lags observed over the last minute,
499    ///   together with the current histogram period.
500    ///
501    /// Histories are emitted to both Mz introspection and Prometheus, histograms only to
502    /// introspection. We treat lags of unreadable collections (i.e. collections that contain no
503    /// readable times) as undefined and set them to NULL in introspection and `u64::MAX` in
504    /// Prometheus.
505    ///
506    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
507    /// per second during normal operation.
508    fn refresh_wallclock_lag(&mut self) {
509        let frontier_lag = |frontier: &Antichain<Timestamp>| match frontier.as_option() {
510            Some(ts) => (self.wallclock_lag)(ts.clone()),
511            None => Duration::ZERO,
512        };
513
514        let now_ms = (self.now)();
515        let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
516        let histogram_labels = match &self.workload_class {
517            Some(wc) => [("workload_class", wc.clone())].into(),
518            None => BTreeMap::new(),
519        };
520
521        // For collections that sink into storage, we need to ask the storage controller to know
522        // whether they're currently readable.
523        let readable_storage_collections: BTreeSet<_> = self
524            .collections
525            .keys()
526            .filter_map(|id| {
527                let frontiers = self.storage_collections.collection_frontiers(*id).ok()?;
528                PartialOrder::less_than(&frontiers.read_capabilities, &frontiers.write_frontier)
529                    .then_some(*id)
530            })
531            .collect();
532
533        // First, iterate over all collections and collect histogram measurements.
534        for (id, collection) in &mut self.collections {
535            let write_frontier = collection.write_frontier();
536            let readable = if self.storage_collections.check_exists(*id).is_ok() {
537                readable_storage_collections.contains(id)
538            } else {
539                PartialOrder::less_than(&collection.read_frontier(), &write_frontier)
540            };
541
542            if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
543                let bucket = if readable {
544                    let lag = frontier_lag(&write_frontier);
545                    let lag = lag.as_secs().next_power_of_two();
546                    WallclockLag::Seconds(lag)
547                } else {
548                    WallclockLag::Undefined
549                };
550
551                let key = (histogram_period, bucket, histogram_labels.clone());
552                *stash.entry(key).or_default() += Diff::ONE;
553            }
554        }
555
556        // Second, iterate over all per-replica collections and collect history measurements.
557        for replica in self.replicas.values_mut() {
558            for (id, collection) in &mut replica.collections {
559                // A per-replica collection is considered readable in the context of lag
560                // measurement if either:
561                //  (a) it sinks into a storage collection that is readable
562                //  (b) it is hydrated
563                let readable = readable_storage_collections.contains(id) || collection.hydrated();
564
565                let lag = if readable {
566                    let lag = frontier_lag(&collection.write_frontier);
567                    WallclockLag::Seconds(lag.as_secs())
568                } else {
569                    WallclockLag::Undefined
570                };
571
572                if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
573                    *wallclock_lag_max = (*wallclock_lag_max).max(lag);
574                }
575
576                if let Some(metrics) = &mut collection.metrics {
577                    // No way to specify values as undefined in Prometheus metrics, so we use the
578                    // maximum value instead.
579                    let secs = lag.unwrap_seconds_or(u64::MAX);
580                    metrics.wallclock_lag.observe(secs);
581                };
582            }
583        }
584
585        // Record lags to persist, if it's time.
586        self.maybe_record_wallclock_lag();
587    }
588
589    /// Produce new wallclock lag introspection updates, provided enough time has passed since the
590    /// last recording.
591    //
592    /// We emit new introspection updates if the system time has passed into a new multiple of the
593    /// recording interval (typically 1 minute) since the last refresh. The storage controller uses
594    /// the same approach, ensuring that both controllers commit their lags at roughly the same
595    /// time, avoiding confusion caused by inconsistencies.
596    fn maybe_record_wallclock_lag(&mut self) {
597        if self.read_only {
598            return;
599        }
600
601        let duration_trunc = |datetime: DateTime<_>, interval| {
602            let td = TimeDelta::from_std(interval).ok()?;
603            datetime.duration_trunc(td).ok()
604        };
605
606        let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
607        let now_dt = mz_ore::now::to_datetime((self.now)());
608        let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
609            soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
610            let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
611            duration_trunc(now_dt, *default).unwrap()
612        });
613        if now_trunc <= self.wallclock_lag_last_recorded {
614            return;
615        }
616
617        let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
618
619        let mut history_updates = Vec::new();
620        for (replica_id, replica) in &mut self.replicas {
621            for (collection_id, collection) in &mut replica.collections {
622                let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
623                    continue;
624                };
625
626                let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
627                let row = Row::pack_slice(&[
628                    Datum::String(&collection_id.to_string()),
629                    Datum::String(&replica_id.to_string()),
630                    max_lag.into_interval_datum(),
631                    Datum::TimestampTz(now_ts),
632                ]);
633                history_updates.push((row, Diff::ONE));
634            }
635        }
636        if !history_updates.is_empty() {
637            self.deliver_introspection_updates(
638                IntrospectionType::WallclockLagHistory,
639                history_updates,
640            );
641        }
642
643        let mut histogram_updates = Vec::new();
644        let mut row_buf = Row::default();
645        for (collection_id, collection) in &mut self.collections {
646            let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
647                continue;
648            };
649
650            for ((period, lag, labels), count) in std::mem::take(stash) {
651                let mut packer = row_buf.packer();
652                packer.extend([
653                    Datum::TimestampTz(period.start),
654                    Datum::TimestampTz(period.end),
655                    Datum::String(&collection_id.to_string()),
656                    lag.into_uint64_datum(),
657                ]);
658                let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
659                packer.push_dict(labels);
660
661                histogram_updates.push((row_buf.clone(), count));
662            }
663        }
664        if !histogram_updates.is_empty() {
665            self.deliver_introspection_updates(
666                IntrospectionType::WallclockLagHistogram,
667                histogram_updates,
668            );
669        }
670
671        self.wallclock_lag_last_recorded = now_trunc;
672    }
673
674    /// Returns `true` if the given collection is hydrated on at least one
675    /// replica.
676    ///
677    /// This also returns `true` in case this cluster does not have any
678    /// replicas that host the given collection.
679    #[mz_ore::instrument(level = "debug")]
680    pub fn collection_hydrated(&self, collection_id: GlobalId) -> Result<bool, CollectionMissing> {
681        let mut hosting_replicas = self.replicas_hosting(collection_id)?.peekable();
682        if hosting_replicas.peek().is_none() {
683            return Ok(true);
684        }
685        for replica_state in hosting_replicas {
686            let collection_state = replica_state
687                .collections
688                .get(&collection_id)
689                .expect("hosting replica must have per-replica collection state");
690
691            if collection_state.hydrated() {
692                return Ok(true);
693            }
694        }
695
696        Ok(false)
697    }
698
699    /// Returns `true` if each non-transient, non-excluded collection is hydrated on at
700    /// least one replica.
701    ///
702    /// This also returns `true` in case this cluster does not have any
703    /// replicas.
704    #[mz_ore::instrument(level = "debug")]
705    pub fn collections_hydrated_on_replicas(
706        &self,
707        target_replica_ids: Option<Vec<ReplicaId>>,
708        exclude_collections: &BTreeSet<GlobalId>,
709    ) -> Result<bool, HydrationCheckBadTarget> {
710        if self.replicas.is_empty() {
711            return Ok(true);
712        }
713        let mut all_hydrated = true;
714        let target_replicas: BTreeSet<ReplicaId> = self
715            .replicas
716            .keys()
717            .filter_map(|id| match target_replica_ids {
718                None => Some(id.clone()),
719                Some(ref ids) if ids.contains(id) => Some(id.clone()),
720                Some(_) => None,
721            })
722            .collect();
723        if let Some(targets) = target_replica_ids {
724            if target_replicas.is_empty() {
725                return Err(HydrationCheckBadTarget(targets));
726            }
727        }
728
729        for (id, _collection) in self.collections_iter() {
730            if id.is_transient() || exclude_collections.contains(&id) {
731                continue;
732            }
733
734            let mut collection_hydrated = false;
735            // `replicas_hosting` cannot fail here because `collections_iter`
736            // only yields collections that exist.
737            for replica_state in self.replicas_hosting(id).expect("collection must exist") {
738                if !target_replicas.contains(&replica_state.id) {
739                    continue;
740                }
741                let collection_state = replica_state
742                    .collections
743                    .get(&id)
744                    .expect("hosting replica must have per-replica collection state");
745
746                if collection_state.hydrated() {
747                    collection_hydrated = true;
748                    break;
749                }
750            }
751
752            if !collection_hydrated {
753                tracing::info!("collection {id} is not hydrated on any replica");
754                all_hydrated = false;
755                // We continue with our loop instead of breaking out early, so
756                // that we log all non-hydrated replicas.
757            }
758        }
759
760        Ok(all_hydrated)
761    }
762
763    /// Clean up collection state that is not needed anymore.
764    ///
765    /// Three conditions need to be true before we can remove state for a collection:
766    ///
767    ///  1. A client must have explicitly dropped the collection. If that is not the case, clients
768    ///     can still reasonably assume that the controller knows about the collection and can
769    ///     answer queries about it.
770    ///  2. There must be no outstanding read capabilities on the collection. As long as someone
771    ///     still holds read capabilities on a collection, we need to keep it around to be able
772    ///     to properly handle downgrading of said capabilities.
773    ///  3. All replica frontiers for the collection must have advanced to the empty frontier.
774    ///     Advancement to the empty frontiers signals that replicas are done computing the
775    ///     collection and that they won't send more `ComputeResponse`s for it. As long as we might
776    ///     receive responses for a collection we want to keep it around to be able to validate and
777    ///     handle these responses.
778    fn cleanup_collections(&mut self) {
779        let to_remove: Vec<_> = self
780            .collections_iter()
781            .filter(|(id, collection)| {
782                collection.dropped
783                    && collection.shared.lock_read_capabilities(|c| c.is_empty())
784                    && self
785                        .replicas
786                        .values()
787                        .all(|r| r.collection_frontiers_empty(*id))
788            })
789            .map(|(id, _collection)| id)
790            .collect();
791
792        for id in to_remove {
793            self.remove_collection(id);
794        }
795    }
796
797    /// Returns the state of the [`Instance`] formatted as JSON.
798    ///
799    /// The returned value is not guaranteed to be stable and may change at any point in time.
800    #[mz_ore::instrument(level = "debug")]
801    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
802        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
803        // returned object as a tradeoff between usability and stability. `serde_json` will fail
804        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
805        // prevents a future unrelated change from silently breaking this method.
806
807        // Destructure `self` here so we don't forget to consider dumping newly added fields.
808        let Self {
809            build_info: _,
810            storage_collections: _,
811            peek_stash_persist_location: _,
812            initialized,
813            read_only,
814            workload_class,
815            replicas,
816            collections,
817            log_sources: _,
818            peeks,
819            subscribes,
820            copy_tos,
821            history: _,
822            command_rx: _,
823            response_tx: _,
824            introspection_tx: _,
825            metrics: _,
826            dyncfg: _,
827            now: _,
828            wallclock_lag: _,
829            wallclock_lag_last_recorded,
830            read_hold_tx: _,
831            replica_tx: _,
832            replica_rx: _,
833        } = self;
834
835        let replicas: BTreeMap<_, _> = replicas
836            .iter()
837            .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
838            .collect::<Result<_, anyhow::Error>>()?;
839        let collections: BTreeMap<_, _> = collections
840            .iter()
841            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
842            .collect();
843        let peeks: BTreeMap<_, _> = peeks
844            .iter()
845            .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
846            .collect();
847        let subscribes: BTreeMap<_, _> = subscribes
848            .iter()
849            .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
850            .collect();
851        let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
852        let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
853
854        Ok(serde_json::json!({
855            "initialized": initialized,
856            "read_only": read_only,
857            "workload_class": workload_class,
858            "replicas": replicas,
859            "collections": collections,
860            "peeks": peeks,
861            "subscribes": subscribes,
862            "copy_tos": copy_tos,
863            "wallclock_lag_last_recorded": wallclock_lag_last_recorded,
864        }))
865    }
866
867    /// Reports the current write frontier for the identified compute collection.
868    pub(super) fn collection_write_frontier(
869        &self,
870        id: GlobalId,
871    ) -> Result<Antichain<Timestamp>, CollectionMissing> {
872        Ok(self.collection(id)?.write_frontier())
873    }
874}
875
876impl Instance {
877    pub(super) fn new(
878        build_info: &'static BuildInfo,
879        storage: StorageCollections,
880        peek_stash_persist_location: PersistLocation,
881        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState)>,
882        metrics: InstanceMetrics,
883        now: NowFn,
884        wallclock_lag: WallclockLagFn<Timestamp>,
885        dyncfg: Arc<ConfigSet>,
886        command_rx: mpsc::UnboundedReceiver<Command>,
887        response_tx: mpsc::UnboundedSender<ComputeControllerResponse>,
888        read_hold_tx: read_holds::ChangeTx,
889        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
890        read_only: bool,
891    ) -> Self {
892        let mut collections = BTreeMap::new();
893        let mut log_sources = BTreeMap::new();
894        for (log, id, shared) in arranged_logs {
895            let collection = CollectionState::new_log_collection(
896                id,
897                shared,
898                Arc::clone(&read_hold_tx),
899                introspection_tx.clone(),
900            );
901            collections.insert(id, collection);
902            log_sources.insert(log, id);
903        }
904
905        let history = ComputeCommandHistory::new(metrics.for_history());
906
907        let send_count = metrics.response_send_count.clone();
908        let recv_count = metrics.response_recv_count.clone();
909        let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
910
911        let now_dt = mz_ore::now::to_datetime(now());
912
913        Self {
914            build_info,
915            storage_collections: storage,
916            peek_stash_persist_location,
917            initialized: false,
918            read_only,
919            workload_class: None,
920            replicas: Default::default(),
921            collections,
922            log_sources,
923            peeks: Default::default(),
924            subscribes: Default::default(),
925            copy_tos: Default::default(),
926            history,
927            command_rx,
928            response_tx,
929            introspection_tx,
930            metrics,
931            dyncfg,
932            now,
933            wallclock_lag,
934            wallclock_lag_last_recorded: now_dt,
935            read_hold_tx,
936            replica_tx,
937            replica_rx,
938        }
939    }
940
941    pub(super) async fn run(mut self) {
942        self.send(ComputeCommand::Hello {
943            // The nonce is protocol iteration-specific and will be set in
944            // `ReplicaTask::specialize_command`.
945            nonce: Uuid::default(),
946        });
947
948        let instance_config = InstanceConfig {
949            peek_stash_persist_location: self.peek_stash_persist_location.clone(),
950            // The remaining fields are replica-specific and will be set in
951            // `ReplicaTask::specialize_command`.
952            logging: Default::default(),
953            expiration_offset: Default::default(),
954        };
955
956        self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
957
958        loop {
959            tokio::select! {
960                command = self.command_rx.recv() => match command {
961                    Some(cmd) => cmd(&mut self),
962                    None => break,
963                },
964                response = self.replica_rx.recv() => match response {
965                    Some(response) => self.handle_response(response),
966                    None => unreachable!("self owns a sender side of the channel"),
967                }
968            }
969        }
970    }
971
972    /// Update instance configuration.
973    #[mz_ore::instrument(level = "debug")]
974    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
975        if let Some(workload_class) = &config_params.workload_class {
976            self.workload_class = workload_class.clone();
977        }
978
979        let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
980        self.send(command);
981    }
982
983    /// Marks the end of any initialization commands.
984    ///
985    /// Intended to be called by `Controller`, rather than by other code.
986    /// Calling this method repeatedly has no effect.
987    #[mz_ore::instrument(level = "debug")]
988    pub fn initialization_complete(&mut self) {
989        // The compute protocol requires that `InitializationComplete` is sent only once.
990        if !self.initialized {
991            self.send(ComputeCommand::InitializationComplete);
992            self.initialized = true;
993        }
994    }
995
996    /// Allows collections to affect writes to external systems (persist).
997    ///
998    /// Calling this method repeatedly has no effect.
999    #[mz_ore::instrument(level = "debug")]
1000    pub fn allow_writes(&mut self, collection_id: GlobalId) -> Result<(), CollectionMissing> {
1001        let collection = self.collection_mut(collection_id)?;
1002
1003        // Do not send redundant allow-writes commands.
1004        if !collection.read_only {
1005            return Ok(());
1006        }
1007
1008        // Don't send allow-writes for collections that are not installed.
1009        let as_of = collection.read_frontier();
1010
1011        // If the collection has an empty `as_of`, it was either never installed on the replica or
1012        // has since been dropped. In either case the replica does not expect any commands for it.
1013        if as_of.is_empty() {
1014            return Ok(());
1015        }
1016
1017        collection.read_only = false;
1018        self.send(ComputeCommand::AllowWrites(collection_id));
1019
1020        Ok(())
1021    }
1022
1023    /// Shut down this instance.
1024    ///
1025    /// This method asserts that the instance has no replicas left. It exists to help
1026    /// us find bugs where the client drops a compute instance that still has replicas
1027    /// installed, and later assumes that said replicas still exist.
1028    ///
1029    /// # Panics
1030    ///
1031    /// Soft-panics if the compute instance still has active replicas.
1032    #[mz_ore::instrument(level = "debug")]
1033    pub fn shutdown(&mut self) {
1034        // Taking the `command_rx` ensures that the [`Instance::run`] loop terminates.
1035        let (_tx, rx) = mpsc::unbounded_channel();
1036        self.command_rx = rx;
1037
1038        let stray_replicas: Vec<_> = self.replicas.keys().collect();
1039        soft_assert_or_log!(
1040            stray_replicas.is_empty(),
1041            "dropped instance still has provisioned replicas: {stray_replicas:?}",
1042        );
1043    }
1044
1045    /// Sends a command to replicas of this instance.
1046    #[mz_ore::instrument(level = "debug")]
1047    fn send(&mut self, cmd: ComputeCommand) {
1048        // Record the command so that new replicas can be brought up to speed.
1049        self.history.push(cmd.clone());
1050
1051        let target_replica = self.target_replica(&cmd);
1052
1053        if let Some(rid) = target_replica {
1054            if let Some(replica) = self.replicas.get_mut(&rid) {
1055                let _ = replica.client.send(cmd);
1056            }
1057        } else {
1058            for replica in self.replicas.values_mut() {
1059                let _ = replica.client.send(cmd.clone());
1060            }
1061        }
1062    }
1063
1064    /// Determine the target replica for a compute command. Retrieves the
1065    /// collection named by the command, and returns the target replica if
1066    /// it is set, and None if not set, or the command doesn't name a collection.
1067    ///
1068    /// Panics if a create-dataflow command names collections that have different
1069    /// target replicas. It is an error to construct such an object and would
1070    /// indicate a bug in [`Self::create_dataflow`].
1071    fn target_replica(&self, cmd: &ComputeCommand) -> Option<ReplicaId> {
1072        match &cmd {
1073            ComputeCommand::Schedule(id)
1074            | ComputeCommand::AllowWrites(id)
1075            | ComputeCommand::AllowCompaction { id, .. } => {
1076                self.expect_collection(*id).target_replica
1077            }
1078            ComputeCommand::CreateDataflow(desc) => {
1079                let mut target_replica = None;
1080                for id in desc.export_ids() {
1081                    if let Some(replica) = self.expect_collection(id).target_replica {
1082                        if target_replica.is_some() {
1083                            assert_eq!(target_replica, Some(replica));
1084                        }
1085                        target_replica = Some(replica);
1086                    }
1087                }
1088                target_replica
1089            }
1090            // Skip Peek as we don't allow replica-targeted indexes.
1091            ComputeCommand::Peek(_)
1092            | ComputeCommand::Hello { .. }
1093            | ComputeCommand::CreateInstance(_)
1094            | ComputeCommand::InitializationComplete
1095            | ComputeCommand::UpdateConfiguration(_)
1096            | ComputeCommand::CancelPeek { .. } => None,
1097        }
1098    }
1099
1100    /// Add a new instance replica, by ID.
1101    #[mz_ore::instrument(level = "debug")]
1102    pub fn add_replica(
1103        &mut self,
1104        id: ReplicaId,
1105        mut config: ReplicaConfig,
1106        epoch: Option<u64>,
1107    ) -> Result<(), ReplicaExists> {
1108        if self.replica_exists(id) {
1109            return Err(ReplicaExists(id));
1110        }
1111
1112        config.logging.index_logs = self.log_sources.clone();
1113
1114        let epoch = epoch.unwrap_or(1);
1115        let metrics = self.metrics.for_replica(id);
1116        let client = ReplicaClient::spawn(
1117            id,
1118            self.build_info,
1119            config.clone(),
1120            epoch,
1121            metrics.clone(),
1122            Arc::clone(&self.dyncfg),
1123            self.replica_tx.clone(),
1124        );
1125
1126        // Take this opportunity to clean up the history we should present.
1127        self.history.reduce();
1128
1129        // Advance the uppers of source imports
1130        self.history.update_source_uppers(&self.storage_collections);
1131
1132        // Replay the commands at the client, creating new dataflow identifiers.
1133        for command in self.history.iter() {
1134            // Skip `CreateDataflow` commands targeted at different replicas.
1135            if let Some(target_replica) = self.target_replica(command)
1136                && target_replica != id
1137            {
1138                continue;
1139            }
1140
1141            if client.send(command.clone()).is_err() {
1142                // We swallow the error here. On the next send, we will fail again, and
1143                // restart the connection as well as this rehydration.
1144                tracing::warn!("Replica {:?} connection terminated during hydration", id);
1145                break;
1146            }
1147        }
1148
1149        // Add replica to tracked state.
1150        self.add_replica_state(id, client, config, epoch);
1151
1152        Ok(())
1153    }
1154
1155    /// Remove an existing instance replica, by ID.
1156    #[mz_ore::instrument(level = "debug")]
1157    pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1158        let replica = self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1159
1160        // Before dropping the replica state (and the contained input read holds), log read holds
1161        // that are the last line of defense against compaction of a dataflow's storage inputs. If
1162        // the corresponding global read hold has already been released, dropping the per-replica
1163        // read hold will allow compaction, which can cause the replica to panic trying to install
1164        // the dataflow.
1165        //
1166        // This exists primarily to help diagnose incidents-and-escalations#39.
1167        for (collection_id, replica_collection) in &replica.collections {
1168            let collection = self.collections.get(collection_id);
1169            for replica_hold in &replica_collection.input_read_holds {
1170                let input_id = replica_hold.id();
1171                let global_hold = collection.and_then(|c| c.storage_dependencies.get(&input_id));
1172                let unprotected = global_hold
1173                    .is_none_or(|h| PartialOrder::less_than(replica_hold.since(), h.since()));
1174                if unprotected {
1175                    tracing::warn!(
1176                        replica_id = %id,
1177                        %collection_id,
1178                        %input_id,
1179                        replica_hold_since = ?replica_hold.since(),
1180                        global_hold_since = ?global_hold.map(|h| h.since()),
1181                        "dropping per-replica read hold without equivalent global read hold",
1182                    );
1183                }
1184            }
1185        }
1186        drop(replica);
1187
1188        // Subscribes targeting this replica either won't be served anymore (if the replica is
1189        // dropped) or might produce inconsistent output (if the target collection is an
1190        // introspection index). We produce an error to inform upstream.
1191        let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1192        for subscribe_id in to_drop {
1193            let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1194            let response = ComputeControllerResponse::SubscribeResponse(
1195                subscribe_id,
1196                SubscribeBatch {
1197                    lower: subscribe.frontier.clone(),
1198                    upper: subscribe.frontier,
1199                    updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1200                },
1201            );
1202            self.deliver_response(response);
1203        }
1204
1205        // Peeks targeting this replica might not be served anymore (if the replica is dropped).
1206        // If the replica has failed it might come back and respond to the peek later, but it still
1207        // seems like a good idea to cancel the peek to inform the caller about the failure. This
1208        // is consistent with how we handle targeted subscribes above.
1209        let mut peek_responses = Vec::new();
1210        let mut to_drop = Vec::new();
1211        for (uuid, peek) in self.peeks_targeting(id) {
1212            peek_responses.push(ComputeControllerResponse::PeekNotification(
1213                uuid,
1214                PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1215                peek.otel_ctx.clone(),
1216            ));
1217            to_drop.push(uuid);
1218        }
1219        for response in peek_responses {
1220            self.deliver_response(response);
1221        }
1222        for uuid in to_drop {
1223            let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1224            self.finish_peek(uuid, response);
1225        }
1226
1227        // We might have a chance to forward implied capabilities and reduce the cost of bringing
1228        // up the next replica, if the dropped replica was the only one in the cluster.
1229        self.forward_implied_capabilities();
1230
1231        Ok(())
1232    }
1233
1234    /// Rehydrate the given instance replica.
1235    ///
1236    /// # Panics
1237    ///
1238    /// Panics if the specified replica does not exist.
1239    fn rehydrate_replica(&mut self, id: ReplicaId) {
1240        let config = self.replicas[&id].config.clone();
1241        let epoch = self.replicas[&id].epoch + 1;
1242
1243        self.remove_replica(id).expect("replica must exist");
1244        let result = self.add_replica(id, config, Some(epoch));
1245
1246        match result {
1247            Ok(()) => (),
1248            Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1249        }
1250    }
1251
1252    /// Rehydrate any failed replicas of this instance.
1253    fn rehydrate_failed_replicas(&mut self) {
1254        let replicas = self.replicas.iter();
1255        let failed_replicas: Vec<_> = replicas
1256            .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1257            .collect();
1258
1259        for replica_id in failed_replicas {
1260            self.rehydrate_replica(replica_id);
1261        }
1262    }
1263
1264    /// Creates the described dataflow and initializes state for its output.
1265    ///
1266    /// This method expects a `DataflowDescription` with an `as_of` frontier specified, as well as
1267    /// for each imported collection a read hold in `import_read_holds` at at least the `as_of`.
1268    #[mz_ore::instrument(level = "debug")]
1269    pub fn create_dataflow(
1270        &mut self,
1271        dataflow: DataflowDescription<mz_compute_types::plan::Plan, ()>,
1272        import_read_holds: Vec<ReadHold>,
1273        mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState>,
1274        target_replica: Option<ReplicaId>,
1275    ) -> Result<(), DataflowCreationError> {
1276        use DataflowCreationError::*;
1277
1278        // Validate that the target replica, if specified, exists.
1279        // A targeted dataflow is only installed on a single replica; if that
1280        // replica doesn't exist, we can't create the dataflow.
1281        if let Some(replica_id) = target_replica {
1282            if !self.replica_exists(replica_id) {
1283                return Err(ReplicaMissing(replica_id));
1284            }
1285        }
1286
1287        // Simple sanity checks around `as_of`
1288        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1289        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1290            return Err(EmptyAsOfForSubscribe);
1291        }
1292        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1293            return Err(EmptyAsOfForCopyTo);
1294        }
1295
1296        // Collect all dependencies of the dataflow, and read holds on them at the `as_of`.
1297        let mut storage_dependencies = BTreeMap::new();
1298        let mut compute_dependencies = BTreeMap::new();
1299
1300        // When we install per-replica input read holds, we cannot use the `as_of` because of
1301        // reconciliation: Existing slow replicas might be reading from the inputs at times before
1302        // the `as_of` and we would rather not crash them by allowing their inputs to compact too
1303        // far. So instead we take read holds at the least time available.
1304        let mut replica_input_read_holds = Vec::new();
1305
1306        let mut import_read_holds: BTreeMap<_, _> =
1307            import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1308
1309        for &id in dataflow.source_imports.keys() {
1310            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1311            replica_input_read_holds.push(read_hold.clone());
1312
1313            read_hold
1314                .try_downgrade(as_of.clone())
1315                .map_err(|_| ReadHoldInsufficient(id))?;
1316            storage_dependencies.insert(id, read_hold);
1317        }
1318
1319        for &id in dataflow.index_imports.keys() {
1320            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1321            read_hold
1322                .try_downgrade(as_of.clone())
1323                .map_err(|_| ReadHoldInsufficient(id))?;
1324            compute_dependencies.insert(id, read_hold);
1325        }
1326
1327        // If the `as_of` is empty, we are not going to create a dataflow, so replicas won't read
1328        // from the inputs.
1329        if as_of.is_empty() {
1330            replica_input_read_holds = Default::default();
1331        }
1332
1333        // Install collection state for each of the exports.
1334        for export_id in dataflow.export_ids() {
1335            let shared = shared_collection_state
1336                .remove(&export_id)
1337                .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1338            let write_only = dataflow.sink_exports.contains_key(&export_id);
1339            let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1340
1341            self.add_collection(
1342                export_id,
1343                as_of.clone(),
1344                shared,
1345                storage_dependencies.clone(),
1346                compute_dependencies.clone(),
1347                replica_input_read_holds.clone(),
1348                write_only,
1349                storage_sink,
1350                dataflow.initial_storage_as_of.clone(),
1351                dataflow.refresh_schedule.clone(),
1352                target_replica,
1353            );
1354
1355            // If the export is a storage sink, we can advance its write frontier to the write
1356            // frontier of the target storage collection.
1357            if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1358                self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1359            }
1360        }
1361
1362        // Initialize tracking of subscribes.
1363        for subscribe_id in dataflow.subscribe_ids() {
1364            self.subscribes
1365                .insert(subscribe_id, ActiveSubscribe::default());
1366        }
1367
1368        // Initialize tracking of copy tos.
1369        for copy_to_id in dataflow.copy_to_ids() {
1370            self.copy_tos.insert(copy_to_id);
1371        }
1372
1373        // Here we augment all imported sources and all exported sinks with the appropriate
1374        // storage metadata needed by the compute instance.
1375        let mut source_imports = BTreeMap::new();
1376        for (id, import) in dataflow.source_imports {
1377            let frontiers = self
1378                .storage_collections
1379                .collection_frontiers(id)
1380                .expect("collection exists");
1381
1382            let collection_metadata = self
1383                .storage_collections
1384                .collection_metadata(id)
1385                .expect("we have a read hold on this collection");
1386
1387            let desc = SourceInstanceDesc {
1388                storage_metadata: collection_metadata.clone(),
1389                arguments: import.desc.arguments,
1390                typ: import.desc.typ.clone(),
1391            };
1392            source_imports.insert(
1393                id,
1394                mz_compute_types::dataflows::SourceImport {
1395                    desc,
1396                    monotonic: import.monotonic,
1397                    with_snapshot: import.with_snapshot,
1398                    upper: frontiers.write_frontier,
1399                },
1400            );
1401        }
1402
1403        let mut sink_exports = BTreeMap::new();
1404        for (id, se) in dataflow.sink_exports {
1405            let connection = match se.connection {
1406                ComputeSinkConnection::MaterializedView(conn) => {
1407                    let metadata = self
1408                        .storage_collections
1409                        .collection_metadata(id)
1410                        .map_err(|_| CollectionMissing(id))?
1411                        .clone();
1412                    let conn = MaterializedViewSinkConnection {
1413                        value_desc: conn.value_desc,
1414                        storage_metadata: metadata,
1415                    };
1416                    ComputeSinkConnection::MaterializedView(conn)
1417                }
1418                ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1419                ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1420                    ComputeSinkConnection::CopyToS3Oneshot(conn)
1421                }
1422            };
1423            let desc = ComputeSinkDesc {
1424                from: se.from,
1425                from_desc: se.from_desc,
1426                connection,
1427                with_snapshot: se.with_snapshot,
1428                up_to: se.up_to,
1429                non_null_assertions: se.non_null_assertions,
1430                refresh_schedule: se.refresh_schedule,
1431            };
1432            sink_exports.insert(id, desc);
1433        }
1434
1435        // Flatten the dataflow plans into the representation expected by replicas.
1436        let objects_to_build = dataflow
1437            .objects_to_build
1438            .into_iter()
1439            .map(|object| BuildDesc {
1440                id: object.id,
1441                plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1442            })
1443            .collect();
1444
1445        let augmented_dataflow = DataflowDescription {
1446            source_imports,
1447            sink_exports,
1448            objects_to_build,
1449            // The rest of the fields are identical
1450            index_imports: dataflow.index_imports,
1451            index_exports: dataflow.index_exports,
1452            as_of: dataflow.as_of.clone(),
1453            until: dataflow.until,
1454            initial_storage_as_of: dataflow.initial_storage_as_of,
1455            refresh_schedule: dataflow.refresh_schedule,
1456            debug_name: dataflow.debug_name,
1457            time_dependence: dataflow.time_dependence,
1458        };
1459
1460        if augmented_dataflow.is_transient() {
1461            tracing::debug!(
1462                name = %augmented_dataflow.debug_name,
1463                import_ids = %augmented_dataflow.display_import_ids(),
1464                export_ids = %augmented_dataflow.display_export_ids(),
1465                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1466                until = ?augmented_dataflow.until.elements(),
1467                "creating dataflow",
1468            );
1469        } else {
1470            tracing::info!(
1471                name = %augmented_dataflow.debug_name,
1472                import_ids = %augmented_dataflow.display_import_ids(),
1473                export_ids = %augmented_dataflow.display_export_ids(),
1474                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1475                until = ?augmented_dataflow.until.elements(),
1476                "creating dataflow",
1477            );
1478        }
1479
1480        // Skip the actual dataflow creation for an empty `as_of`. (Happens e.g. for the
1481        // bootstrapping of a REFRESH AT mat view that is past its last refresh.)
1482        if as_of.is_empty() {
1483            tracing::info!(
1484                name = %augmented_dataflow.debug_name,
1485                "not sending `CreateDataflow`, because of empty `as_of`",
1486            );
1487        } else {
1488            let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1489            self.send(ComputeCommand::CreateDataflow(Box::new(augmented_dataflow)));
1490
1491            for id in collections {
1492                self.maybe_schedule_collection(id);
1493            }
1494        }
1495
1496        Ok(())
1497    }
1498
1499    /// Schedule the identified collection if all its inputs are available.
1500    ///
1501    /// # Panics
1502    ///
1503    /// Panics if the identified collection does not exist.
1504    fn maybe_schedule_collection(&mut self, id: GlobalId) {
1505        let collection = self.expect_collection(id);
1506
1507        // Don't schedule collections twice.
1508        if collection.scheduled {
1509            return;
1510        }
1511
1512        let as_of = collection.read_frontier();
1513
1514        // If the collection has an empty `as_of`, it was either never installed on the replica or
1515        // has since been dropped. In either case the replica does not expect any commands for it.
1516        if as_of.is_empty() {
1517            return;
1518        }
1519
1520        let ready = if id.is_transient() {
1521            // Always schedule transient collections immediately. The assumption is that those are
1522            // created by interactive user commands and we want to schedule them as quickly as
1523            // possible. Inputs might not yet be available, but when they become available, we
1524            // don't need to wait for the controller to become aware and for the scheduling check
1525            // to run again.
1526            true
1527        } else {
1528            // Ignore self-dependencies. Any self-dependencies do not need to be
1529            // available at the as_of for the dataflow to make progress, so we
1530            // can ignore them here. At the moment, only continual tasks have
1531            // self-dependencies, but this logic is correct for any dataflow, so
1532            // we don't special case it to CTs.
1533            let not_self_dep = |x: &GlobalId| *x != id;
1534
1535            // Make sure we never schedule a collection before its input compute collections have
1536            // been scheduled. Scheduling in the wrong order can lead to deadlocks.
1537            let mut deps_scheduled = true;
1538
1539            // Check dependency frontiers to determine if all inputs are
1540            // available. An input is available when its frontier is greater
1541            // than the `as_of`, i.e., all input data up to and including the
1542            // `as_of` has been sealed.
1543            let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1544            let mut compute_frontiers = Vec::new();
1545            for id in compute_deps {
1546                let dep = &self.expect_collection(id);
1547                deps_scheduled &= dep.scheduled;
1548                compute_frontiers.push(dep.write_frontier());
1549            }
1550
1551            let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1552            let storage_frontiers = self
1553                .storage_collections
1554                .collections_frontiers(storage_deps.collect())
1555                .expect("must exist");
1556            let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1557
1558            let mut frontiers = compute_frontiers.into_iter().chain(storage_frontiers);
1559            let frontiers_ready =
1560                frontiers.all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1561
1562            deps_scheduled && frontiers_ready
1563        };
1564
1565        if ready {
1566            self.send(ComputeCommand::Schedule(id));
1567            let collection = self.expect_collection_mut(id);
1568            collection.scheduled = true;
1569        }
1570    }
1571
1572    /// Schedule any unscheduled collections that are ready.
1573    fn schedule_collections(&mut self) {
1574        let ids: Vec<_> = self.collections.keys().copied().collect();
1575        for id in ids {
1576            self.maybe_schedule_collection(id);
1577        }
1578    }
1579
1580    /// Drops the read capability for the given collections and allows their resources to be
1581    /// reclaimed.
1582    #[mz_ore::instrument(level = "debug")]
1583    pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1584        for id in &ids {
1585            let collection = self.collection_mut(*id)?;
1586
1587            // Mark the collection as dropped to allow it to be removed from the controller state.
1588            collection.dropped = true;
1589
1590            // Drop the implied and warmup read holds to announce that clients are not
1591            // interested in the collection anymore.
1592            collection.implied_read_hold.release();
1593            collection.warmup_read_hold.release();
1594
1595            // If the collection is a subscribe, stop tracking it. This ensures that the controller
1596            // ceases to produce `SubscribeResponse`s for this subscribe.
1597            self.subscribes.remove(id);
1598            // If the collection is a copy to, stop tracking it. This ensures that the controller
1599            // ceases to produce `CopyToResponse`s` for this copy to.
1600            self.copy_tos.remove(id);
1601        }
1602
1603        Ok(())
1604    }
1605
1606    /// Initiate a peek request for the contents of `id` at `timestamp`.
1607    ///
1608    /// If this returns an error, then it didn't modify any `Instance` state.
1609    #[mz_ore::instrument(level = "debug")]
1610    pub fn peek(
1611        &mut self,
1612        peek_target: PeekTarget,
1613        literal_constraints: Option<Vec<Row>>,
1614        uuid: Uuid,
1615        timestamp: Timestamp,
1616        result_desc: RelationDesc,
1617        finishing: RowSetFinishing,
1618        map_filter_project: mz_expr::SafeMfpPlan,
1619        mut read_hold: ReadHold,
1620        target_replica: Option<ReplicaId>,
1621        peek_response_tx: oneshot::Sender<PeekResponse>,
1622    ) -> Result<(), PeekError> {
1623        use PeekError::*;
1624
1625        let target_id = peek_target.id();
1626
1627        // Downgrade the provided read hold to the peek time.
1628        if read_hold.id() != target_id {
1629            return Err(ReadHoldIdMismatch(read_hold.id()));
1630        }
1631        read_hold
1632            .try_downgrade(Antichain::from_elem(timestamp.clone()))
1633            .map_err(|_| ReadHoldInsufficient(target_id))?;
1634
1635        if let Some(target) = target_replica {
1636            if !self.replica_exists(target) {
1637                return Err(ReplicaMissing(target));
1638            }
1639        }
1640
1641        let otel_ctx = OpenTelemetryContext::obtain();
1642
1643        self.peeks.insert(
1644            uuid,
1645            PendingPeek {
1646                target_replica,
1647                // TODO(guswynn): can we just hold the `tracing::Span` here instead?
1648                otel_ctx: otel_ctx.clone(),
1649                requested_at: Instant::now(),
1650                read_hold,
1651                peek_response_tx,
1652                limit: finishing.limit.map(usize::cast_from),
1653                offset: finishing.offset,
1654            },
1655        );
1656
1657        let peek = Peek {
1658            literal_constraints,
1659            uuid,
1660            timestamp,
1661            finishing,
1662            map_filter_project,
1663            // Obtain an `OpenTelemetryContext` from the thread-local tracing
1664            // tree to forward it on to the compute worker.
1665            otel_ctx,
1666            target: peek_target,
1667            result_desc,
1668        };
1669        self.send(ComputeCommand::Peek(Box::new(peek)));
1670
1671        Ok(())
1672    }
1673
1674    /// Cancels an existing peek request.
1675    #[mz_ore::instrument(level = "debug")]
1676    pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1677        let Some(peek) = self.peeks.get_mut(&uuid) else {
1678            tracing::warn!("did not find pending peek for {uuid}");
1679            return;
1680        };
1681
1682        let duration = peek.requested_at.elapsed();
1683        self.metrics
1684            .observe_peek_response(&PeekResponse::Canceled, duration);
1685
1686        // Enqueue a notification for the cancellation.
1687        let otel_ctx = peek.otel_ctx.clone();
1688        otel_ctx.attach_as_parent();
1689
1690        self.deliver_response(ComputeControllerResponse::PeekNotification(
1691            uuid,
1692            PeekNotification::Canceled,
1693            otel_ctx,
1694        ));
1695
1696        // Finish the peek.
1697        // This will also propagate the cancellation to the replicas.
1698        self.finish_peek(uuid, reason);
1699    }
1700
1701    /// Assigns a read policy to specific identifiers.
1702    ///
1703    /// The policies are assigned in the order presented, and repeated identifiers should
1704    /// conclude with the last policy. Changing a policy will immediately downgrade the read
1705    /// capability if appropriate, but it will not "recover" the read capability if the prior
1706    /// capability is already ahead of it.
1707    ///
1708    /// Identifiers not present in `policies` retain their existing read policies.
1709    ///
1710    /// It is an error to attempt to set a read policy for a collection that is not readable in the
1711    /// context of compute. At this time, only indexes are readable compute collections.
1712    #[mz_ore::instrument(level = "debug")]
1713    pub fn set_read_policy(
1714        &mut self,
1715        policies: Vec<(GlobalId, ReadPolicy)>,
1716    ) -> Result<(), ReadPolicyError> {
1717        // Do error checking upfront, to avoid introducing inconsistencies between a collection's
1718        // `implied_capability` and `read_capabilities`.
1719        for (id, _policy) in &policies {
1720            let collection = self.collection(*id)?;
1721            if collection.read_policy.is_none() {
1722                return Err(ReadPolicyError::WriteOnlyCollection(*id));
1723            }
1724        }
1725
1726        for (id, new_policy) in policies {
1727            let collection = self.expect_collection_mut(id);
1728            let new_since = new_policy.frontier(collection.write_frontier().borrow());
1729            let _ = collection.implied_read_hold.try_downgrade(new_since);
1730            collection.read_policy = Some(new_policy);
1731        }
1732
1733        Ok(())
1734    }
1735
1736    /// Advance the global write frontier of the given collection.
1737    ///
1738    /// Frontier regressions are gracefully ignored.
1739    ///
1740    /// # Panics
1741    ///
1742    /// Panics if the identified collection does not exist.
1743    #[mz_ore::instrument(level = "debug")]
1744    fn maybe_update_global_write_frontier(
1745        &mut self,
1746        id: GlobalId,
1747        new_frontier: Antichain<Timestamp>,
1748    ) {
1749        let collection = self.expect_collection_mut(id);
1750
1751        let advanced = collection.shared.lock_write_frontier(|f| {
1752            let advanced = PartialOrder::less_than(f, &new_frontier);
1753            if advanced {
1754                f.clone_from(&new_frontier);
1755            }
1756            advanced
1757        });
1758
1759        if !advanced {
1760            return;
1761        }
1762
1763        // Relax the implied read hold according to the read policy.
1764        let new_since = match &collection.read_policy {
1765            Some(read_policy) => {
1766                // For readable collections the read frontier is determined by applying the
1767                // client-provided read policy to the write frontier.
1768                read_policy.frontier(new_frontier.borrow())
1769            }
1770            None => {
1771                // Write-only collections cannot be read within the context of the compute
1772                // controller, so their read frontier only controls the read holds taken on their
1773                // inputs. We can safely downgrade the input read holds to any time less than the
1774                // write frontier.
1775                //
1776                // Note that some write-only collections (continual tasks) need to observe changes
1777                // at their current write frontier during hydration. Thus, we cannot downgrade the
1778                // read frontier to the write frontier and instead step it back by one.
1779                Antichain::from_iter(
1780                    new_frontier
1781                        .iter()
1782                        .map(|t| t.step_back().unwrap_or(Timestamp::MIN)),
1783                )
1784            }
1785        };
1786        let _ = collection.implied_read_hold.try_downgrade(new_since);
1787
1788        // Report the frontier advancement.
1789        self.deliver_response(ComputeControllerResponse::FrontierUpper {
1790            id,
1791            upper: new_frontier,
1792        });
1793    }
1794
1795    /// Apply a collection read hold change.
1796    pub(super) fn apply_read_hold_change(
1797        &mut self,
1798        id: GlobalId,
1799        mut update: ChangeBatch<Timestamp>,
1800    ) {
1801        let Some(collection) = self.collections.get_mut(&id) else {
1802            soft_panic_or_log!(
1803                "read hold change for absent collection (id={id}, changes={update:?})"
1804            );
1805            return;
1806        };
1807
1808        let new_since = collection.shared.lock_read_capabilities(|caps| {
1809            // Sanity check to prevent corrupted `read_capabilities`, which can cause hard-to-debug
1810            // issues (usually stuck read frontiers).
1811            let read_frontier = caps.frontier();
1812            for (time, diff) in update.iter() {
1813                let count = caps.count_for(time) + diff;
1814                assert!(
1815                    count >= 0,
1816                    "invalid read capabilities update: negative capability \
1817             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1818                );
1819                assert!(
1820                    count == 0 || read_frontier.less_equal(time),
1821                    "invalid read capabilities update: frontier regression \
1822             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1823                );
1824            }
1825
1826            // Apply read capability updates and learn about resulting changes to the read
1827            // frontier.
1828            let changes = caps.update_iter(update.drain());
1829
1830            let changed = changes.count() > 0;
1831            changed.then(|| caps.frontier().to_owned())
1832        });
1833
1834        let Some(new_since) = new_since else {
1835            return; // read frontier did not change
1836        };
1837
1838        // Propagate read frontier update to dependencies.
1839        for read_hold in collection.compute_dependencies.values_mut() {
1840            read_hold
1841                .try_downgrade(new_since.clone())
1842                .expect("frontiers don't regress");
1843        }
1844        for read_hold in collection.storage_dependencies.values_mut() {
1845            read_hold
1846                .try_downgrade(new_since.clone())
1847                .expect("frontiers don't regress");
1848        }
1849
1850        // Produce `AllowCompaction` command.
1851        self.send(ComputeCommand::AllowCompaction {
1852            id,
1853            frontier: new_since,
1854        });
1855    }
1856
1857    /// Fulfills a registered peek and cleans up associated state.
1858    ///
1859    /// As part of this we:
1860    ///  * Send a `PeekResponse` through the peek's response channel.
1861    ///  * Emit a `CancelPeek` command to instruct replicas to stop spending resources on this
1862    ///    peek, and to allow the `ComputeCommandHistory` to reduce away the corresponding `Peek`
1863    ///    command.
1864    ///  * Remove the read hold for this peek, unblocking compaction that might have waited on it.
1865    fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1866        let Some(peek) = self.peeks.remove(&uuid) else {
1867            return;
1868        };
1869
1870        // The recipient might not be interested in the peek response anymore, which is fine.
1871        let _ = peek.peek_response_tx.send(response);
1872
1873        // NOTE: We need to send the `CancelPeek` command _before_ we release the peek's read hold
1874        // (by dropping it), to avoid the edge case that caused database-issues#4812.
1875        self.send(ComputeCommand::CancelPeek { uuid });
1876
1877        drop(peek.read_hold);
1878    }
1879
1880    /// Handles a response from a replica. Replica IDs are re-used across replica restarts, so we
1881    /// use the replica epoch to drop stale responses.
1882    fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse) {
1883        // Filter responses from non-existing or stale replicas.
1884        if self
1885            .replicas
1886            .get(&replica_id)
1887            .filter(|replica| replica.epoch == epoch)
1888            .is_none()
1889        {
1890            return;
1891        }
1892
1893        // Invariant: the replica exists and has the expected epoch.
1894
1895        match response {
1896            ComputeResponse::Frontiers(id, frontiers) => {
1897                self.handle_frontiers_response(id, frontiers, replica_id);
1898            }
1899            ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
1900                self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
1901            }
1902            ComputeResponse::CopyToResponse(id, response) => {
1903                self.handle_copy_to_response(id, response, replica_id);
1904            }
1905            ComputeResponse::SubscribeResponse(id, response) => {
1906                self.handle_subscribe_response(id, response, replica_id);
1907            }
1908            ComputeResponse::Status(response) => {
1909                self.handle_status_response(response, replica_id);
1910            }
1911        }
1912    }
1913
1914    /// Handle new frontiers, returning any compute response that needs to
1915    /// be sent to the client.
1916    fn handle_frontiers_response(
1917        &mut self,
1918        id: GlobalId,
1919        frontiers: FrontiersResponse,
1920        replica_id: ReplicaId,
1921    ) {
1922        if !self.collections.contains_key(&id) {
1923            soft_panic_or_log!(
1924                "frontiers update for an unknown collection \
1925                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1926            );
1927            return;
1928        }
1929        let Some(replica) = self.replicas.get_mut(&replica_id) else {
1930            soft_panic_or_log!(
1931                "frontiers update for an unknown replica \
1932                 (replica_id={replica_id}, frontiers={frontiers:?})"
1933            );
1934            return;
1935        };
1936        let Some(replica_collection) = replica.collections.get_mut(&id) else {
1937            soft_panic_or_log!(
1938                "frontiers update for an unknown replica collection \
1939                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1940            );
1941            return;
1942        };
1943
1944        if let Some(new_frontier) = frontiers.input_frontier {
1945            replica_collection.update_input_frontier(new_frontier.clone());
1946        }
1947        if let Some(new_frontier) = frontiers.output_frontier {
1948            replica_collection.update_output_frontier(new_frontier.clone());
1949        }
1950        if let Some(new_frontier) = frontiers.write_frontier {
1951            replica_collection.update_write_frontier(new_frontier.clone());
1952            self.maybe_update_global_write_frontier(id, new_frontier);
1953        }
1954    }
1955
1956    #[mz_ore::instrument(level = "debug")]
1957    fn handle_peek_response(
1958        &mut self,
1959        uuid: Uuid,
1960        response: PeekResponse,
1961        otel_ctx: OpenTelemetryContext,
1962        replica_id: ReplicaId,
1963    ) {
1964        otel_ctx.attach_as_parent();
1965
1966        // We might not be tracking this peek anymore, because we have served a response already or
1967        // because it was canceled. If this is the case, we ignore the response.
1968        let Some(peek) = self.peeks.get(&uuid) else {
1969            return;
1970        };
1971
1972        // If the peek is targeting a replica, ignore responses from other replicas.
1973        let target_replica = peek.target_replica.unwrap_or(replica_id);
1974        if target_replica != replica_id {
1975            return;
1976        }
1977
1978        let duration = peek.requested_at.elapsed();
1979        self.metrics.observe_peek_response(&response, duration);
1980
1981        let notification = PeekNotification::new(&response, peek.offset, peek.limit);
1982        // NOTE: We use the `otel_ctx` from the response, not the pending peek, because we
1983        // currently want the parent to be whatever the compute worker did with this peek.
1984        self.deliver_response(ComputeControllerResponse::PeekNotification(
1985            uuid,
1986            notification,
1987            otel_ctx,
1988        ));
1989
1990        self.finish_peek(uuid, response)
1991    }
1992
1993    fn handle_copy_to_response(
1994        &mut self,
1995        sink_id: GlobalId,
1996        response: CopyToResponse,
1997        replica_id: ReplicaId,
1998    ) {
1999        if !self.collections.contains_key(&sink_id) {
2000            soft_panic_or_log!(
2001                "received response for an unknown copy-to \
2002                 (sink_id={sink_id}, replica_id={replica_id})",
2003            );
2004            return;
2005        }
2006        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2007            soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
2008            return;
2009        };
2010        let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
2011            soft_panic_or_log!(
2012                "copy-to response for an unknown replica collection \
2013                 (sink_id={sink_id}, replica_id={replica_id})"
2014            );
2015            return;
2016        };
2017
2018        // Downgrade the replica frontiers, to enable dropping of input read holds and clean up of
2019        // collection state.
2020        // TODO(database-issues#4701): report copy-to frontiers through `Frontiers` responses
2021        replica_collection.update_write_frontier(Antichain::new());
2022        replica_collection.update_input_frontier(Antichain::new());
2023        replica_collection.update_output_frontier(Antichain::new());
2024
2025        // We might not be tracking this COPY TO because we have already returned a response
2026        // from one of the replicas. In that case, we ignore the response.
2027        if !self.copy_tos.remove(&sink_id) {
2028            return;
2029        }
2030
2031        let result = match response {
2032            CopyToResponse::RowCount(count) => Ok(count),
2033            CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
2034            // We should never get here: Replicas only drop copy to collections in response
2035            // to the controller allowing them to do so, and when the controller drops a
2036            // copy to it also removes it from the list of tracked copy_tos (see
2037            // [`Instance::drop_collections`]).
2038            CopyToResponse::Dropped => {
2039                tracing::error!(
2040                    %sink_id, %replica_id,
2041                    "received `Dropped` response for a tracked copy to",
2042                );
2043                return;
2044            }
2045        };
2046
2047        self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2048    }
2049
2050    fn handle_subscribe_response(
2051        &mut self,
2052        subscribe_id: GlobalId,
2053        response: SubscribeResponse,
2054        replica_id: ReplicaId,
2055    ) {
2056        if !self.collections.contains_key(&subscribe_id) {
2057            soft_panic_or_log!(
2058                "received response for an unknown subscribe \
2059                 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2060            );
2061            return;
2062        }
2063        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2064            soft_panic_or_log!(
2065                "subscribe response for an unknown replica (replica_id={replica_id})"
2066            );
2067            return;
2068        };
2069        let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2070            soft_panic_or_log!(
2071                "subscribe response for an unknown replica collection \
2072                 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2073            );
2074            return;
2075        };
2076
2077        // Always apply replica write frontier updates. Even if the subscribe is not tracked
2078        // anymore, there might still be replicas reading from its inputs, so we need to track the
2079        // frontiers until all replicas have advanced to the empty one.
2080        let write_frontier = match &response {
2081            SubscribeResponse::Batch(batch) => batch.upper.clone(),
2082            SubscribeResponse::DroppedAt(_) => Antichain::new(),
2083        };
2084
2085        // For subscribes we downgrade all replica frontiers based on write frontiers. This should
2086        // be fine because the input and output frontier of a subscribe track its write frontier.
2087        // TODO(database-issues#4701): report subscribe frontiers through `Frontiers` responses
2088        replica_collection.update_write_frontier(write_frontier.clone());
2089        replica_collection.update_input_frontier(write_frontier.clone());
2090        replica_collection.update_output_frontier(write_frontier.clone());
2091
2092        // If the subscribe is not tracked, or targets a different replica, there is nothing to do.
2093        let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2094            return;
2095        };
2096
2097        // Apply a global frontier update.
2098        // If this is a replica-targeted subscribe, it is important that we advance the global
2099        // frontier only based on responses from the targeted replica. Otherwise, another replica
2100        // could advance to the empty frontier, making us drop the subscribe on the targeted
2101        // replica prematurely.
2102        self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2103
2104        match response {
2105            SubscribeResponse::Batch(batch) => {
2106                let upper = batch.upper;
2107                let mut updates = batch.updates;
2108
2109                // If this batch advances the subscribe's frontier, we emit all updates at times
2110                // greater or equal to the last frontier (to avoid emitting duplicate updates).
2111                if PartialOrder::less_than(&subscribe.frontier, &upper) {
2112                    let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2113
2114                    if upper.is_empty() {
2115                        // This subscribe cannot produce more data. Stop tracking it.
2116                        self.subscribes.remove(&subscribe_id);
2117                    } else {
2118                        // This subscribe can produce more data. Update our tracking of it.
2119                        self.subscribes.insert(subscribe_id, subscribe);
2120                    }
2121
2122                    if let Ok(updates) = updates.as_mut() {
2123                        updates.retain_mut(|updates| {
2124                            let offset = updates.times().partition_point(|t| {
2125                                // True for times that are strictly less than lower (and should be skipped)
2126                                // and false otherwise.
2127                                !lower.less_equal(t)
2128                            });
2129                            let (_, past_lower) = std::mem::take(updates).split_at(offset);
2130                            *updates = past_lower;
2131                            updates.len() > 0
2132                        });
2133                    }
2134                    self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2135                        subscribe_id,
2136                        SubscribeBatch {
2137                            lower,
2138                            upper,
2139                            updates,
2140                        },
2141                    ));
2142                }
2143            }
2144            SubscribeResponse::DroppedAt(frontier) => {
2145                // We should never get here: Replicas only drop subscribe collections in response
2146                // to the controller allowing them to do so, and when the controller drops a
2147                // subscribe it also removes it from the list of tracked subscribes (see
2148                // [`Instance::drop_collections`]).
2149                tracing::error!(
2150                    %subscribe_id,
2151                    %replica_id,
2152                    frontier = ?frontier.elements(),
2153                    "received `DroppedAt` response for a tracked subscribe",
2154                );
2155                self.subscribes.remove(&subscribe_id);
2156            }
2157        }
2158    }
2159
2160    fn handle_status_response(&self, response: StatusResponse, _replica_id: ReplicaId) {
2161        match response {
2162            StatusResponse::Placeholder => {}
2163        }
2164    }
2165
2166    /// Return the write frontiers of the dependencies of the given collection.
2167    fn dependency_write_frontiers<'b>(
2168        &'b self,
2169        collection: &'b CollectionState,
2170    ) -> impl Iterator<Item = Antichain<Timestamp>> + 'b {
2171        let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2172            let collection = self.collections.get(&dep_id);
2173            collection.map(|c| c.write_frontier())
2174        });
2175        let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2176            let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2177            frontiers.map(|f| f.write_frontier)
2178        });
2179
2180        compute_frontiers.chain(storage_frontiers)
2181    }
2182
2183    /// Return the write frontiers of transitive storage dependencies of the given collection.
2184    fn transitive_storage_dependency_write_frontiers<'b>(
2185        &'b self,
2186        collection: &'b CollectionState,
2187    ) -> impl Iterator<Item = Antichain<Timestamp>> + 'b {
2188        let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2189        let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2190        let mut done = BTreeSet::new();
2191
2192        while let Some(id) = todo.pop() {
2193            if done.contains(&id) {
2194                continue;
2195            }
2196            if let Some(dep) = self.collections.get(&id) {
2197                storage_ids.extend(dep.storage_dependency_ids());
2198                todo.extend(dep.compute_dependency_ids())
2199            }
2200            done.insert(id);
2201        }
2202
2203        let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2204            let frontiers = self.storage_collections.collection_frontiers(id).ok();
2205            frontiers.map(|f| f.write_frontier)
2206        });
2207
2208        storage_frontiers
2209    }
2210
2211    /// Downgrade the warmup capabilities of collections as much as possible.
2212    ///
2213    /// The only requirement we have for a collection's warmup capability is that it is for a time
2214    /// that is available in all of the collection's inputs. For each input the latest time that is
2215    /// the case for is `write_frontier - 1`. So the farthest we can downgrade a collection's
2216    /// warmup capability is the minimum of `write_frontier - 1` of all its inputs.
2217    ///
2218    /// This method expects to be periodically called as part of instance maintenance work.
2219    /// We would like to instead update the warmup capabilities synchronously in response to
2220    /// frontier updates of dependency collections, but that is not generally possible because we
2221    /// don't learn about frontier updates of storage collections synchronously. We could do
2222    /// synchronous updates for compute dependencies, but we refrain from doing for simplicity.
2223    fn downgrade_warmup_capabilities(&mut self) {
2224        let mut new_capabilities = BTreeMap::new();
2225        for (id, collection) in &self.collections {
2226            // For write-only collections that have advanced to the empty frontier, we can drop the
2227            // warmup capability entirely. There is no reason why we would need to hydrate those
2228            // collections again, so being able to warm them up is not useful.
2229            if collection.read_policy.is_none()
2230                && collection.shared.lock_write_frontier(|f| f.is_empty())
2231            {
2232                new_capabilities.insert(*id, Antichain::new());
2233                continue;
2234            }
2235
2236            let mut new_capability = Antichain::new();
2237            for frontier in self.dependency_write_frontiers(collection) {
2238                for time in frontier {
2239                    new_capability.insert(time.step_back().unwrap_or(time));
2240                }
2241            }
2242
2243            new_capabilities.insert(*id, new_capability);
2244        }
2245
2246        for (id, new_capability) in new_capabilities {
2247            let collection = self.expect_collection_mut(id);
2248            let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2249        }
2250    }
2251
2252    /// Forward the implied capabilities of collections, if possible.
2253    ///
2254    /// The implied capability of a collection controls (a) which times are still readable (for
2255    /// indexes) and (b) with which as-of the collection gets installed on a new replica. We are
2256    /// usually not allowed to advance an implied capability beyond the frontier that follows from
2257    /// the collection's read policy applied to its write frontier:
2258    ///
2259    ///  * For sink collections, some external consumer might rely on seeing all distinct times in
2260    ///    the input reflected in the output. If we'd forward the implied capability of a sink,
2261    ///    we'd risk skipping times in the output across replica restarts.
2262    ///  * For index collections, we might make the index unreadable by advancing its read frontier
2263    ///    beyond its write frontier.
2264    ///
2265    /// There is one case where forwarding an implied capability is fine though: an index installed
2266    /// on a cluster that has no replicas. Such indexes are not readable anyway until a new replica
2267    /// is added, so advancing its read frontier can't make it unreadable. We can thus advance the
2268    /// implied capability as long as we make sure that when a new replica is added, the expected
2269    /// relationship between write frontier, read policy, and implied capability can be restored
2270    /// immediately (modulo computation time).
2271    ///
2272    /// Forwarding implied capabilities is not necessary for the correct functioning of the
2273    /// controller but an optimization that is beneficial in two ways:
2274    ///
2275    ///  * It relaxes read holds on inputs to forwarded collections, allowing their compaction.
2276    ///  * It reduces the amount of historical detail new replicas need to process when computing
2277    ///    forwarded collections, as forwarding the implied capability also forwards the corresponding
2278    ///    dataflow as-of.
2279    fn forward_implied_capabilities(&mut self) {
2280        if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2281            return;
2282        }
2283        if !self.replicas.is_empty() {
2284            return;
2285        }
2286
2287        let mut new_capabilities = BTreeMap::new();
2288        for (id, collection) in &self.collections {
2289            let Some(read_policy) = &collection.read_policy else {
2290                // Collection is write-only, i.e. a sink.
2291                continue;
2292            };
2293
2294            // When a new replica is started, it will immediately be able to compute all collection
2295            // output up to the write frontier of its transitive storage inputs. So the new implied
2296            // read capability should be the read policy applied to that frontier.
2297            let mut dep_frontier = Antichain::new();
2298            for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2299                dep_frontier.extend(frontier);
2300            }
2301
2302            let new_capability = read_policy.frontier(dep_frontier.borrow());
2303            if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2304                new_capabilities.insert(*id, new_capability);
2305            }
2306        }
2307
2308        for (id, new_capability) in new_capabilities {
2309            let collection = self.expect_collection_mut(id);
2310            let _ = collection.implied_read_hold.try_downgrade(new_capability);
2311        }
2312    }
2313
2314    /// Acquires a `ReadHold` for the identified compute collection.
2315    ///
2316    /// This mirrors the logic used by the controller-side `InstanceState::acquire_read_hold`,
2317    /// but executes on the instance task itself.
2318    pub(super) fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold, CollectionMissing> {
2319        // Similarly to InstanceState::acquire_read_hold and StorageCollections::acquire_read_holds,
2320        // we acquire read holds at the earliest possible time rather than returning a copy
2321        // of the implied read hold. This is so that dependents can acquire read holds on
2322        // compute dependencies at frontiers that are held back by other read holds the caller
2323        // has previously taken.
2324        let collection = self.collection(id)?;
2325        let since = collection.shared.lock_read_capabilities(|caps| {
2326            let since = caps.frontier().to_owned();
2327            caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
2328            since
2329        });
2330        let hold = ReadHold::new(id, since, Arc::clone(&self.read_hold_tx));
2331        Ok(hold)
2332    }
2333
2334    /// Process pending maintenance work.
2335    ///
2336    /// This method is invoked periodically by the global controller.
2337    /// It is a good place to perform maintenance work that arises from various controller state
2338    /// changes and that cannot conveniently be handled synchronously with those state changes.
2339    #[mz_ore::instrument(level = "debug")]
2340    pub fn maintain(&mut self) {
2341        self.rehydrate_failed_replicas();
2342        self.downgrade_warmup_capabilities();
2343        self.forward_implied_capabilities();
2344        self.schedule_collections();
2345        self.cleanup_collections();
2346        self.update_frontier_introspection();
2347        self.refresh_state_metrics();
2348        self.refresh_wallclock_lag();
2349    }
2350}
2351
2352/// State maintained about individual compute collections.
2353///
2354/// A compute collection is either an index, or a storage sink, or a subscribe, exported by a
2355/// compute dataflow.
2356#[derive(Debug)]
2357struct CollectionState {
2358    /// If set, this collection is only maintained by the specified replica.
2359    target_replica: Option<ReplicaId>,
2360    /// Whether this collection is a log collection.
2361    ///
2362    /// Log collections are special in that they are only maintained by a subset of all replicas.
2363    log_collection: bool,
2364    /// Whether this collection has been dropped by a controller client.
2365    ///
2366    /// The controller is allowed to remove the `CollectionState` for a collection only when
2367    /// `dropped == true`. Otherwise, clients might still expect to be able to query information
2368    /// about this collection.
2369    dropped: bool,
2370    /// Whether this collection has been scheduled, i.e., the controller has sent a `Schedule`
2371    /// command for it.
2372    scheduled: bool,
2373
2374    /// Whether this collection is in read-only mode.
2375    ///
2376    /// When in read-only mode, the dataflow is not allowed to affect external state (largely persist).
2377    read_only: bool,
2378
2379    /// State shared with the `ComputeController`.
2380    shared: SharedCollectionState,
2381
2382    /// A read hold maintaining the implicit capability of the collection.
2383    ///
2384    /// This capability is kept to ensure that the collection remains readable according to its
2385    /// `read_policy`. It also ensures that read holds on the collection's dependencies are kept at
2386    /// some time not greater than the collection's `write_frontier`, guaranteeing that the
2387    /// collection's next outputs can always be computed without skipping times.
2388    implied_read_hold: ReadHold,
2389    /// A read hold held to enable dataflow warmup.
2390    ///
2391    /// Dataflow warmup is an optimization that allows dataflows to immediately start hydrating
2392    /// even when their next output time (as implied by the `write_frontier`) is in the future.
2393    /// By installing a read capability derived from the write frontiers of the collection's
2394    /// inputs, we ensure that the as-of of new dataflows installed for the collection is at a time
2395    /// that is immediately available, so hydration can begin immediately too.
2396    warmup_read_hold: ReadHold,
2397    /// The policy to use to downgrade `self.implied_read_hold`.
2398    ///
2399    /// If `None`, the collection is a write-only collection (i.e. a sink). For write-only
2400    /// collections, the `implied_read_hold` is only required for maintaining read holds on the
2401    /// inputs, so we can immediately downgrade it to the `write_frontier`.
2402    read_policy: Option<ReadPolicy>,
2403
2404    /// Storage identifiers on which this collection depends, and read holds this collection
2405    /// requires on them.
2406    storage_dependencies: BTreeMap<GlobalId, ReadHold>,
2407    /// Compute identifiers on which this collection depends, and read holds this collection
2408    /// requires on them.
2409    compute_dependencies: BTreeMap<GlobalId, ReadHold>,
2410
2411    /// Introspection state associated with this collection.
2412    introspection: CollectionIntrospection,
2413
2414    /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
2415    /// introspection update.
2416    ///
2417    /// Keys are `(period, lag, labels)` triples, values are counts.
2418    ///
2419    /// If this is `None`, wallclock lag is not tracked for this collection.
2420    wallclock_lag_histogram_stash: Option<
2421        BTreeMap<
2422            (
2423                WallclockLagHistogramPeriod,
2424                WallclockLag,
2425                BTreeMap<&'static str, String>,
2426            ),
2427            Diff,
2428        >,
2429    >,
2430}
2431
2432impl CollectionState {
2433    /// Creates a new collection state, with an initial read policy valid from `since`.
2434    fn new(
2435        collection_id: GlobalId,
2436        as_of: Antichain<Timestamp>,
2437        shared: SharedCollectionState,
2438        storage_dependencies: BTreeMap<GlobalId, ReadHold>,
2439        compute_dependencies: BTreeMap<GlobalId, ReadHold>,
2440        read_hold_tx: read_holds::ChangeTx,
2441        introspection: CollectionIntrospection,
2442    ) -> Self {
2443        // A collection is not readable before the `as_of`.
2444        let since = as_of.clone();
2445        // A collection won't produce updates for times before the `as_of`.
2446        let upper = as_of;
2447
2448        // Ensure that the provided `shared` is valid for the given `as_of`.
2449        assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2450        assert!(shared.lock_write_frontier(|f| f == &upper));
2451
2452        // Initialize collection read holds.
2453        // Note that the implied read hold was already added to the `read_capabilities` when
2454        // `shared` was created, so we only need to add the warmup read hold here.
2455        let implied_read_hold =
2456            ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2457        let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2458
2459        let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2460        shared.lock_read_capabilities(|c| {
2461            c.update_iter(updates);
2462        });
2463
2464        // In an effort to keep the produced wallclock lag introspection data small and
2465        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
2466        // select indexes and subscribes.
2467        let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2468            true => None,
2469            false => Some(Default::default()),
2470        };
2471
2472        Self {
2473            target_replica: None,
2474            log_collection: false,
2475            dropped: false,
2476            scheduled: false,
2477            read_only: true,
2478            shared,
2479            implied_read_hold,
2480            warmup_read_hold,
2481            read_policy: Some(ReadPolicy::ValidFrom(since)),
2482            storage_dependencies,
2483            compute_dependencies,
2484            introspection,
2485            wallclock_lag_histogram_stash,
2486        }
2487    }
2488
2489    /// Creates a new collection state for a log collection.
2490    fn new_log_collection(
2491        id: GlobalId,
2492        shared: SharedCollectionState,
2493        read_hold_tx: read_holds::ChangeTx,
2494        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2495    ) -> Self {
2496        let since = Antichain::from_elem(Timestamp::MIN);
2497        let introspection = CollectionIntrospection::new(
2498            id,
2499            introspection_tx,
2500            since.clone(),
2501            false,
2502            None,
2503            None,
2504            Vec::new(),
2505        );
2506        let mut state = Self::new(
2507            id,
2508            since,
2509            shared,
2510            Default::default(),
2511            Default::default(),
2512            read_hold_tx,
2513            introspection,
2514        );
2515        state.log_collection = true;
2516        // Log collections are created and scheduled implicitly as part of replica initialization.
2517        state.scheduled = true;
2518        state
2519    }
2520
2521    /// Reports the current read frontier.
2522    fn read_frontier(&self) -> Antichain<Timestamp> {
2523        self.shared
2524            .lock_read_capabilities(|c| c.frontier().to_owned())
2525    }
2526
2527    /// Reports the current write frontier.
2528    fn write_frontier(&self) -> Antichain<Timestamp> {
2529        self.shared.lock_write_frontier(|f| f.clone())
2530    }
2531
2532    fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2533        self.storage_dependencies.keys().copied()
2534    }
2535
2536    fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2537        self.compute_dependencies.keys().copied()
2538    }
2539}
2540
2541/// Collection state shared with the `ComputeController`.
2542///
2543/// Having this allows certain controller APIs, such as `ComputeController::collection_frontiers`
2544/// and `ComputeController::acquire_read_hold` to be non-`async`. This comes at the cost of
2545/// complexity (by introducing shared mutable state) and performance (by introducing locking). We
2546/// should aim to reduce the amount of shared state over time, rather than expand it.
2547///
2548/// Note that [`SharedCollectionState`]s are initialized by the `ComputeController` prior to the
2549/// collection's creation in the [`Instance`]. This is to allow compute clients to query frontiers
2550/// and take new read holds immediately, without having to wait for the [`Instance`] to update.
2551#[derive(Clone, Debug)]
2552pub(super) struct SharedCollectionState {
2553    /// Accumulation of read capabilities for the collection.
2554    ///
2555    /// This accumulation contains the capabilities held by all [`ReadHold`]s given out for the
2556    /// collection, including `implied_read_hold` and `warmup_read_hold`.
2557    ///
2558    /// NOTE: This field may only be modified by [`Instance::apply_read_hold_change`],
2559    /// [`Instance::acquire_read_hold`], and `ComputeController::acquire_read_hold`.
2560    /// Nobody else should modify read capabilities directly. Instead, collection users should
2561    /// manage read holds through [`ReadHold`] objects acquired through
2562    /// `ComputeController::acquire_read_hold`.
2563    ///
2564    /// TODO(teskje): Restructure the code to enforce the above in the type system.
2565    read_capabilities: Arc<Mutex<MutableAntichain<Timestamp>>>,
2566    /// The write frontier of this collection.
2567    write_frontier: Arc<Mutex<Antichain<Timestamp>>>,
2568}
2569
2570impl SharedCollectionState {
2571    pub fn new(as_of: Antichain<Timestamp>) -> Self {
2572        // A collection is not readable before the `as_of`.
2573        let since = as_of.clone();
2574        // A collection won't produce updates for times before the `as_of`.
2575        let upper = as_of;
2576
2577        // Initialize read capabilities to the `since`.
2578        // The is the implied read capability. The corresponding [`ReadHold`] is created in
2579        // [`CollectionState::new`].
2580        let mut read_capabilities = MutableAntichain::new();
2581        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2582
2583        Self {
2584            read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2585            write_frontier: Arc::new(Mutex::new(upper)),
2586        }
2587    }
2588
2589    pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2590    where
2591        F: FnOnce(&mut MutableAntichain<Timestamp>) -> R,
2592    {
2593        let mut caps = self.read_capabilities.lock().expect("poisoned");
2594        f(&mut *caps)
2595    }
2596
2597    pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2598    where
2599        F: FnOnce(&mut Antichain<Timestamp>) -> R,
2600    {
2601        let mut frontier = self.write_frontier.lock().expect("poisoned");
2602        f(&mut *frontier)
2603    }
2604}
2605
2606/// Manages certain introspection relations associated with a collection. Upon creation, it adds
2607/// rows to introspection relations. When dropped, it retracts its managed rows.
2608#[derive(Debug)]
2609struct CollectionIntrospection {
2610    /// The ID of the compute collection.
2611    collection_id: GlobalId,
2612    /// A channel through which introspection updates are delivered.
2613    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2614    /// Introspection state for `IntrospectionType::Frontiers`.
2615    ///
2616    /// `Some` if the collection does _not_ sink into a storage collection (i.e. is not an MV). If
2617    /// the collection sinks into storage, the storage controller reports its frontiers instead.
2618    frontiers: Option<FrontiersIntrospectionState>,
2619    /// Introspection state for `IntrospectionType::ComputeMaterializedViewRefreshes`.
2620    ///
2621    /// `Some` if the collection is a REFRESH MV.
2622    refresh: Option<RefreshIntrospectionState>,
2623    /// The IDs of the collection's dependencies, for `IntrospectionType::ComputeDependencies`.
2624    dependency_ids: Vec<GlobalId>,
2625}
2626
2627impl CollectionIntrospection {
2628    fn new(
2629        collection_id: GlobalId,
2630        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2631        as_of: Antichain<Timestamp>,
2632        storage_sink: bool,
2633        initial_as_of: Option<Antichain<Timestamp>>,
2634        refresh_schedule: Option<RefreshSchedule>,
2635        dependency_ids: Vec<GlobalId>,
2636    ) -> Self {
2637        let refresh =
2638            match (refresh_schedule, initial_as_of) {
2639                (Some(refresh_schedule), Some(initial_as_of)) => Some(
2640                    RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2641                ),
2642                (refresh_schedule, _) => {
2643                    // If we have a `refresh_schedule`, then the collection is a MV, so we should also have
2644                    // an `initial_as_of`.
2645                    soft_assert_or_log!(
2646                        refresh_schedule.is_none(),
2647                        "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2648                    );
2649                    None
2650                }
2651            };
2652        let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2653
2654        let self_ = Self {
2655            collection_id,
2656            introspection_tx,
2657            frontiers,
2658            refresh,
2659            dependency_ids,
2660        };
2661
2662        self_.report_initial_state();
2663        self_
2664    }
2665
2666    /// Reports the initial introspection state.
2667    fn report_initial_state(&self) {
2668        if let Some(frontiers) = &self.frontiers {
2669            let row = frontiers.row_for_collection(self.collection_id);
2670            let updates = vec![(row, Diff::ONE)];
2671            self.send(IntrospectionType::Frontiers, updates);
2672        }
2673
2674        if let Some(refresh) = &self.refresh {
2675            let row = refresh.row_for_collection(self.collection_id);
2676            let updates = vec![(row, Diff::ONE)];
2677            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2678        }
2679
2680        if !self.dependency_ids.is_empty() {
2681            let updates = self.dependency_rows(Diff::ONE);
2682            self.send(IntrospectionType::ComputeDependencies, updates);
2683        }
2684    }
2685
2686    /// Produces rows for the `ComputeDependencies` introspection relation.
2687    fn dependency_rows(&self, diff: Diff) -> Vec<(Row, Diff)> {
2688        self.dependency_ids
2689            .iter()
2690            .map(|dependency_id| {
2691                let row = Row::pack_slice(&[
2692                    Datum::String(&self.collection_id.to_string()),
2693                    Datum::String(&dependency_id.to_string()),
2694                ]);
2695                (row, diff)
2696            })
2697            .collect()
2698    }
2699
2700    /// Observe the given current collection frontiers and update the introspection state as
2701    /// necessary.
2702    fn observe_frontiers(
2703        &mut self,
2704        read_frontier: &Antichain<Timestamp>,
2705        write_frontier: &Antichain<Timestamp>,
2706    ) {
2707        self.update_frontier_introspection(read_frontier, write_frontier);
2708        self.update_refresh_introspection(write_frontier);
2709    }
2710
2711    fn update_frontier_introspection(
2712        &mut self,
2713        read_frontier: &Antichain<Timestamp>,
2714        write_frontier: &Antichain<Timestamp>,
2715    ) {
2716        let Some(frontiers) = &mut self.frontiers else {
2717            return;
2718        };
2719
2720        if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2721        {
2722            return; // no change
2723        };
2724
2725        let retraction = frontiers.row_for_collection(self.collection_id);
2726        frontiers.update(read_frontier, write_frontier);
2727        let insertion = frontiers.row_for_collection(self.collection_id);
2728        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2729        self.send(IntrospectionType::Frontiers, updates);
2730    }
2731
2732    fn update_refresh_introspection(&mut self, write_frontier: &Antichain<Timestamp>) {
2733        let Some(refresh) = &mut self.refresh else {
2734            return;
2735        };
2736
2737        let retraction = refresh.row_for_collection(self.collection_id);
2738        refresh.frontier_update(write_frontier);
2739        let insertion = refresh.row_for_collection(self.collection_id);
2740
2741        if retraction == insertion {
2742            return; // no change
2743        }
2744
2745        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2746        self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2747    }
2748
2749    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2750        // Failure to send means the `ComputeController` has been dropped and doesn't care about
2751        // introspection updates anymore.
2752        let _ = self.introspection_tx.send((introspection_type, updates));
2753    }
2754}
2755
2756impl Drop for CollectionIntrospection {
2757    fn drop(&mut self) {
2758        // Retract collection frontiers.
2759        if let Some(frontiers) = &self.frontiers {
2760            let row = frontiers.row_for_collection(self.collection_id);
2761            let updates = vec![(row, Diff::MINUS_ONE)];
2762            self.send(IntrospectionType::Frontiers, updates);
2763        }
2764
2765        // Retract MV refresh state.
2766        if let Some(refresh) = &self.refresh {
2767            let retraction = refresh.row_for_collection(self.collection_id);
2768            let updates = vec![(retraction, Diff::MINUS_ONE)];
2769            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2770        }
2771
2772        // Retract collection dependencies.
2773        if !self.dependency_ids.is_empty() {
2774            let updates = self.dependency_rows(Diff::MINUS_ONE);
2775            self.send(IntrospectionType::ComputeDependencies, updates);
2776        }
2777    }
2778}
2779
2780#[derive(Debug)]
2781struct FrontiersIntrospectionState {
2782    read_frontier: Antichain<Timestamp>,
2783    write_frontier: Antichain<Timestamp>,
2784}
2785
2786impl FrontiersIntrospectionState {
2787    fn new(as_of: Antichain<Timestamp>) -> Self {
2788        Self {
2789            read_frontier: as_of.clone(),
2790            write_frontier: as_of,
2791        }
2792    }
2793
2794    /// Return a `Row` reflecting the current collection frontiers.
2795    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2796        let read_frontier = self
2797            .read_frontier
2798            .as_option()
2799            .map_or(Datum::Null, |ts| ts.clone().into());
2800        let write_frontier = self
2801            .write_frontier
2802            .as_option()
2803            .map_or(Datum::Null, |ts| ts.clone().into());
2804        Row::pack_slice(&[
2805            Datum::String(&collection_id.to_string()),
2806            read_frontier,
2807            write_frontier,
2808        ])
2809    }
2810
2811    /// Update the introspection state with the given new frontiers.
2812    fn update(
2813        &mut self,
2814        read_frontier: &Antichain<Timestamp>,
2815        write_frontier: &Antichain<Timestamp>,
2816    ) {
2817        if read_frontier != &self.read_frontier {
2818            self.read_frontier.clone_from(read_frontier);
2819        }
2820        if write_frontier != &self.write_frontier {
2821            self.write_frontier.clone_from(write_frontier);
2822        }
2823    }
2824}
2825
2826/// Information needed to compute introspection updates for a REFRESH materialized view when the
2827/// write frontier advances.
2828#[derive(Debug)]
2829struct RefreshIntrospectionState {
2830    // Immutable properties of the MV
2831    refresh_schedule: RefreshSchedule,
2832    initial_as_of: Antichain<Timestamp>,
2833    // Refresh state
2834    next_refresh: Datum<'static>,           // Null or an MzTimestamp
2835    last_completed_refresh: Datum<'static>, // Null or an MzTimestamp
2836}
2837
2838impl RefreshIntrospectionState {
2839    /// Return a `Row` reflecting the current refresh introspection state.
2840    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2841        Row::pack_slice(&[
2842            Datum::String(&collection_id.to_string()),
2843            self.last_completed_refresh,
2844            self.next_refresh,
2845        ])
2846    }
2847}
2848
2849impl RefreshIntrospectionState {
2850    /// Construct a new [`RefreshIntrospectionState`], and apply an initial `frontier_update()` at
2851    /// the `upper`.
2852    fn new(
2853        refresh_schedule: RefreshSchedule,
2854        initial_as_of: Antichain<Timestamp>,
2855        upper: &Antichain<Timestamp>,
2856    ) -> Self {
2857        let mut self_ = Self {
2858            refresh_schedule: refresh_schedule.clone(),
2859            initial_as_of: initial_as_of.clone(),
2860            next_refresh: Datum::Null,
2861            last_completed_refresh: Datum::Null,
2862        };
2863        self_.frontier_update(upper);
2864        self_
2865    }
2866
2867    /// Should be called whenever the write frontier of the collection advances. It updates the
2868    /// state that should be recorded in introspection relations, but doesn't send the updates yet.
2869    fn frontier_update(&mut self, write_frontier: &Antichain<Timestamp>) {
2870        if write_frontier.is_empty() {
2871            self.last_completed_refresh =
2872                if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2873                    last_refresh.into()
2874                } else {
2875                    // If there is no last refresh, then we have a `REFRESH EVERY`, in which case
2876                    // the saturating roundup puts a refresh at the maximum possible timestamp.
2877                    Timestamp::MAX.into()
2878                };
2879            self.next_refresh = Datum::Null;
2880        } else {
2881            if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2882                // We are before the first refresh.
2883                self.last_completed_refresh = Datum::Null;
2884                let initial_as_of = self.initial_as_of.as_option().expect(
2885                    "initial_as_of can't be [], because then there would be no refreshes at all",
2886                );
2887                let first_refresh = self
2888                    .refresh_schedule
2889                    .round_up_timestamp(*initial_as_of)
2890                    .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2891                soft_assert_or_log!(
2892                    first_refresh == *initial_as_of,
2893                    "initial_as_of should be set to the first refresh"
2894                );
2895                self.next_refresh = first_refresh.into();
2896            } else {
2897                // The first refresh has already happened.
2898                let write_frontier = write_frontier.as_option().expect("checked above");
2899                self.last_completed_refresh = self
2900                    .refresh_schedule
2901                    .round_down_timestamp_m1(*write_frontier)
2902                    .map_or_else(
2903                        || {
2904                            soft_panic_or_log!(
2905                                "rounding down should have returned the first refresh or later"
2906                            );
2907                            Datum::Null
2908                        },
2909                        |last_completed_refresh| last_completed_refresh.into(),
2910                    );
2911                self.next_refresh = write_frontier.clone().into();
2912            }
2913        }
2914    }
2915}
2916
2917/// A note of an outstanding peek response.
2918#[derive(Debug)]
2919struct PendingPeek {
2920    /// For replica-targeted peeks, this specifies the replica whose response we should pass on.
2921    ///
2922    /// If this value is `None`, we pass on the first response.
2923    target_replica: Option<ReplicaId>,
2924    /// The OpenTelemetry context for this peek.
2925    otel_ctx: OpenTelemetryContext,
2926    /// The time at which the peek was requested.
2927    ///
2928    /// Used to track peek durations.
2929    requested_at: Instant,
2930    /// The read hold installed to serve this peek.
2931    read_hold: ReadHold,
2932    /// The channel to send peek results.
2933    peek_response_tx: oneshot::Sender<PeekResponse>,
2934    /// An optional limit of the peek's result size.
2935    limit: Option<usize>,
2936    /// The offset into the peek's result.
2937    offset: usize,
2938}
2939
2940#[derive(Debug, Clone)]
2941struct ActiveSubscribe {
2942    /// Current upper frontier of this subscribe.
2943    frontier: Antichain<Timestamp>,
2944}
2945
2946impl Default for ActiveSubscribe {
2947    fn default() -> Self {
2948        Self {
2949            frontier: Antichain::from_elem(Timestamp::MIN),
2950        }
2951    }
2952}
2953
2954/// State maintained about individual replicas.
2955#[derive(Debug)]
2956struct ReplicaState {
2957    /// The ID of the replica.
2958    id: ReplicaId,
2959    /// Client for the running replica task.
2960    client: ReplicaClient,
2961    /// The replica configuration.
2962    config: ReplicaConfig,
2963    /// Replica metrics.
2964    metrics: ReplicaMetrics,
2965    /// A channel through which introspection updates are delivered.
2966    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2967    /// Per-replica collection state.
2968    collections: BTreeMap<GlobalId, ReplicaCollectionState>,
2969    /// The epoch of the replica.
2970    epoch: u64,
2971}
2972
2973impl ReplicaState {
2974    fn new(
2975        id: ReplicaId,
2976        client: ReplicaClient,
2977        config: ReplicaConfig,
2978        metrics: ReplicaMetrics,
2979        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2980        epoch: u64,
2981    ) -> Self {
2982        Self {
2983            id,
2984            client,
2985            config,
2986            metrics,
2987            introspection_tx,
2988            epoch,
2989            collections: Default::default(),
2990        }
2991    }
2992
2993    /// Add a collection to the replica state.
2994    ///
2995    /// # Panics
2996    ///
2997    /// Panics if a collection with the same ID exists already.
2998    fn add_collection(
2999        &mut self,
3000        id: GlobalId,
3001        as_of: Antichain<Timestamp>,
3002        input_read_holds: Vec<ReadHold>,
3003    ) {
3004        let metrics = self.metrics.for_collection(id);
3005        let introspection = ReplicaCollectionIntrospection::new(
3006            self.id,
3007            id,
3008            self.introspection_tx.clone(),
3009            as_of.clone(),
3010        );
3011        let mut state =
3012            ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
3013
3014        // In an effort to keep the produced wallclock lag introspection data small and
3015        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
3016        // select indexes and subscribes.
3017        if id.is_transient() {
3018            state.wallclock_lag_max = None;
3019        }
3020
3021        if let Some(previous) = self.collections.insert(id, state) {
3022            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
3023        }
3024    }
3025
3026    /// Remove state for a collection.
3027    fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState> {
3028        self.collections.remove(&id)
3029    }
3030
3031    /// Returns whether all replica frontiers of the given collection are empty.
3032    fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
3033        self.collections.get(&id).map_or(true, |c| {
3034            c.write_frontier.is_empty()
3035                && c.input_frontier.is_empty()
3036                && c.output_frontier.is_empty()
3037        })
3038    }
3039
3040    /// Returns the state of the [`ReplicaState`] formatted as JSON.
3041    ///
3042    /// The returned value is not guaranteed to be stable and may change at any point in time.
3043    #[mz_ore::instrument(level = "debug")]
3044    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3045        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
3046        // returned object as a tradeoff between usability and stability. `serde_json` will fail
3047        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
3048        // prevents a future unrelated change from silently breaking this method.
3049
3050        // Destructure `self` here so we don't forget to consider dumping newly added fields.
3051        let Self {
3052            id,
3053            client: _,
3054            config: _,
3055            metrics: _,
3056            introspection_tx: _,
3057            epoch,
3058            collections,
3059        } = self;
3060
3061        let collections: BTreeMap<_, _> = collections
3062            .iter()
3063            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
3064            .collect();
3065
3066        Ok(serde_json::json!({
3067            "id": id.to_string(),
3068            "collections": collections,
3069            "epoch": epoch,
3070        }))
3071    }
3072}
3073
3074#[derive(Debug)]
3075struct ReplicaCollectionState {
3076    /// The replica write frontier of this collection.
3077    ///
3078    /// See [`FrontiersResponse::write_frontier`].
3079    write_frontier: Antichain<Timestamp>,
3080    /// The replica input frontier of this collection.
3081    ///
3082    /// See [`FrontiersResponse::input_frontier`].
3083    input_frontier: Antichain<Timestamp>,
3084    /// The replica output frontier of this collection.
3085    ///
3086    /// See [`FrontiersResponse::output_frontier`].
3087    output_frontier: Antichain<Timestamp>,
3088
3089    /// Metrics tracked for this collection.
3090    ///
3091    /// If this is `None`, no metrics are collected.
3092    metrics: Option<ReplicaCollectionMetrics>,
3093    /// As-of frontier with which this collection was installed on the replica.
3094    as_of: Antichain<Timestamp>,
3095    /// Tracks introspection state for this collection.
3096    introspection: ReplicaCollectionIntrospection,
3097    /// Read holds on storage inputs to this collection.
3098    ///
3099    /// These read holds are kept to ensure that the replica is able to read from storage inputs at
3100    /// all times it hasn't read yet. We only need to install read holds for storage inputs since
3101    /// compaction of compute inputs is implicitly held back by Timely/DD.
3102    input_read_holds: Vec<ReadHold>,
3103
3104    /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
3105    ///
3106    /// If this is `None`, wallclock lag is not tracked for this collection.
3107    wallclock_lag_max: Option<WallclockLag>,
3108}
3109
3110impl ReplicaCollectionState {
3111    fn new(
3112        metrics: Option<ReplicaCollectionMetrics>,
3113        as_of: Antichain<Timestamp>,
3114        introspection: ReplicaCollectionIntrospection,
3115        input_read_holds: Vec<ReadHold>,
3116    ) -> Self {
3117        Self {
3118            write_frontier: as_of.clone(),
3119            input_frontier: as_of.clone(),
3120            output_frontier: as_of.clone(),
3121            metrics,
3122            as_of,
3123            introspection,
3124            input_read_holds,
3125            wallclock_lag_max: Some(WallclockLag::MIN),
3126        }
3127    }
3128
3129    /// Returns whether this collection is hydrated.
3130    fn hydrated(&self) -> bool {
3131        // If the observed frontier is greater than the collection's as-of, the collection has
3132        // produced some output and is therefore hydrated.
3133        //
3134        // We need to consider the edge case where the as-of is the empty frontier. Such an as-of
3135        // is not useful for indexes, because they wouldn't be readable. For write-only
3136        // collections, an empty as-of means that the collection has been fully written and no new
3137        // dataflow needs to be created for it. Consequently, no hydration will happen either.
3138        //
3139        // Based on this, we could respond in two ways:
3140        //  * `false`, as in "the dataflow was never created"
3141        //  * `true`, as in "the dataflow completed immediately"
3142        //
3143        // Since hydration is often used as a measure of dataflow progress and we don't want to
3144        // give the impression that certain dataflows are somehow stuck when they are not, we go
3145        // with the second interpretation here.
3146        self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3147    }
3148
3149    /// Updates the replica write frontier of this collection.
3150    fn update_write_frontier(&mut self, new_frontier: Antichain<Timestamp>) {
3151        if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3152            soft_panic_or_log!(
3153                "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3154                self.write_frontier,
3155            );
3156            return;
3157        } else if new_frontier == self.write_frontier {
3158            return;
3159        }
3160
3161        self.write_frontier = new_frontier;
3162    }
3163
3164    /// Updates the replica input frontier of this collection.
3165    fn update_input_frontier(&mut self, new_frontier: Antichain<Timestamp>) {
3166        if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3167            soft_panic_or_log!(
3168                "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3169                self.input_frontier,
3170            );
3171            return;
3172        } else if new_frontier == self.input_frontier {
3173            return;
3174        }
3175
3176        self.input_frontier = new_frontier;
3177
3178        // Relax our read holds on collection inputs.
3179        for read_hold in &mut self.input_read_holds {
3180            let result = read_hold.try_downgrade(self.input_frontier.clone());
3181            soft_assert_or_log!(
3182                result.is_ok(),
3183                "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3184                self.input_frontier,
3185            );
3186        }
3187    }
3188
3189    /// Updates the replica output frontier of this collection.
3190    fn update_output_frontier(&mut self, new_frontier: Antichain<Timestamp>) {
3191        if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3192            soft_panic_or_log!(
3193                "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3194                self.output_frontier,
3195            );
3196            return;
3197        } else if new_frontier == self.output_frontier {
3198            return;
3199        }
3200
3201        self.output_frontier = new_frontier;
3202    }
3203}
3204
3205/// Maintains the introspection state for a given replica and collection, and ensures that reported
3206/// introspection data is retracted when the collection is dropped.
3207#[derive(Debug)]
3208struct ReplicaCollectionIntrospection {
3209    /// The ID of the replica.
3210    replica_id: ReplicaId,
3211    /// The ID of the compute collection.
3212    collection_id: GlobalId,
3213    /// The collection's reported replica write frontier.
3214    write_frontier: Antichain<Timestamp>,
3215    /// A channel through which introspection updates are delivered.
3216    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3217}
3218
3219impl ReplicaCollectionIntrospection {
3220    /// Create a new `HydrationState` and initialize introspection.
3221    fn new(
3222        replica_id: ReplicaId,
3223        collection_id: GlobalId,
3224        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3225        as_of: Antichain<Timestamp>,
3226    ) -> Self {
3227        let self_ = Self {
3228            replica_id,
3229            collection_id,
3230            write_frontier: as_of,
3231            introspection_tx,
3232        };
3233
3234        self_.report_initial_state();
3235        self_
3236    }
3237
3238    /// Reports the initial introspection state.
3239    fn report_initial_state(&self) {
3240        let row = self.write_frontier_row();
3241        let updates = vec![(row, Diff::ONE)];
3242        self.send(IntrospectionType::ReplicaFrontiers, updates);
3243    }
3244
3245    /// Observe the given current write frontier and update the introspection state as necessary.
3246    fn observe_frontier(&mut self, write_frontier: &Antichain<Timestamp>) {
3247        if self.write_frontier == *write_frontier {
3248            return; // no change
3249        }
3250
3251        let retraction = self.write_frontier_row();
3252        self.write_frontier.clone_from(write_frontier);
3253        let insertion = self.write_frontier_row();
3254
3255        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3256        self.send(IntrospectionType::ReplicaFrontiers, updates);
3257    }
3258
3259    /// Return a `Row` reflecting the current replica write frontier.
3260    fn write_frontier_row(&self) -> Row {
3261        let write_frontier = self
3262            .write_frontier
3263            .as_option()
3264            .map_or(Datum::Null, |ts| ts.clone().into());
3265        Row::pack_slice(&[
3266            Datum::String(&self.collection_id.to_string()),
3267            Datum::String(&self.replica_id.to_string()),
3268            write_frontier,
3269        ])
3270    }
3271
3272    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3273        // Failure to send means the `ComputeController` has been dropped and doesn't care about
3274        // introspection updates anymore.
3275        let _ = self.introspection_tx.send((introspection_type, updates));
3276    }
3277}
3278
3279impl Drop for ReplicaCollectionIntrospection {
3280    fn drop(&mut self) {
3281        // Retract the write frontier.
3282        let row = self.write_frontier_row();
3283        let updates = vec![(row, Diff::MINUS_ONE)];
3284        self.send(IntrospectionType::ReplicaFrontiers, updates);
3285    }
3286}