Skip to main content

mz_compute_client/controller/
instance.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller for a compute instance.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
21use mz_compute_types::plan::render_plan::RenderPlan;
22use mz_compute_types::sinks::{
23    ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, MaterializedViewSinkConnection,
24};
25use mz_compute_types::sources::SourceInstanceDesc;
26use mz_controller_types::dyncfgs::{
27    ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, WALLCLOCK_LAG_RECORDING_INTERVAL,
28};
29use mz_dyncfg::ConfigSet;
30use mz_expr::RowSetFinishing;
31use mz_ore::cast::CastFrom;
32use mz_ore::channel::instrumented_unbounded_channel;
33use mz_ore::now::NowFn;
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{soft_assert_or_log, soft_panic_or_log};
36use mz_persist_types::PersistLocation;
37use mz_repr::adt::timestamp::CheckedTimestamp;
38use mz_repr::refresh_schedule::RefreshSchedule;
39use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row, Timestamp};
40use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
41use mz_storage_types::read_holds::{self, ReadHold};
42use mz_storage_types::read_policy::ReadPolicy;
43use thiserror::Error;
44use timely::PartialOrder;
45use timely::progress::frontier::MutableAntichain;
46use timely::progress::{Antichain, ChangeBatch};
47use tokio::sync::{mpsc, oneshot};
48use uuid::Uuid;
49
50use crate::controller::error::{
51    CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
52};
53use crate::controller::instance_client::PeekError;
54use crate::controller::replica::{ReplicaClient, ReplicaConfig};
55use crate::controller::{
56    ComputeControllerResponse, IntrospectionUpdates, PeekNotification, ReplicaId,
57    StorageCollections,
58};
59use crate::logging::LogVariant;
60use crate::metrics::IntCounter;
61use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
62use crate::protocol::command::{
63    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
64};
65use crate::protocol::history::ComputeCommandHistory;
66use crate::protocol::response::{
67    ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StatusResponse,
68    SubscribeBatch, SubscribeResponse,
69};
70
71#[derive(Error, Debug)]
72#[error("replica exists already: {0}")]
73pub(super) struct ReplicaExists(pub ReplicaId);
74
75#[derive(Error, Debug)]
76#[error("replica does not exist: {0}")]
77pub(super) struct ReplicaMissing(pub ReplicaId);
78
79#[derive(Error, Debug)]
80pub(super) enum DataflowCreationError {
81    #[error("collection does not exist: {0}")]
82    CollectionMissing(GlobalId),
83    #[error("replica does not exist: {0}")]
84    ReplicaMissing(ReplicaId),
85    #[error("dataflow definition lacks an as_of value")]
86    MissingAsOf,
87    #[error("subscribe dataflow has an empty as_of")]
88    EmptyAsOfForSubscribe,
89    #[error("copy to dataflow has an empty as_of")]
90    EmptyAsOfForCopyTo,
91    #[error("no read hold provided for dataflow import: {0}")]
92    ReadHoldMissing(GlobalId),
93    #[error("insufficient read hold provided for dataflow import: {0}")]
94    ReadHoldInsufficient(GlobalId),
95}
96
97impl From<CollectionMissing> for DataflowCreationError {
98    fn from(error: CollectionMissing) -> Self {
99        Self::CollectionMissing(error.0)
100    }
101}
102
103#[derive(Error, Debug)]
104pub(super) enum ReadPolicyError {
105    #[error("collection does not exist: {0}")]
106    CollectionMissing(GlobalId),
107    #[error("collection is write-only: {0}")]
108    WriteOnlyCollection(GlobalId),
109}
110
111impl From<CollectionMissing> for ReadPolicyError {
112    fn from(error: CollectionMissing) -> Self {
113        Self::CollectionMissing(error.0)
114    }
115}
116
117/// A command sent to an [`Instance`] task.
118pub(super) type Command = Box<dyn FnOnce(&mut Instance) + Send>;
119
120/// A response from a replica, composed of a replica ID, the replica's current epoch, and the
121/// compute response itself.
122pub(super) type ReplicaResponse = (ReplicaId, u64, ComputeResponse);
123
124/// The state we keep for a compute instance.
125pub(super) struct Instance {
126    /// Build info for spawning replicas
127    build_info: &'static BuildInfo,
128    /// A handle providing access to storage collections.
129    storage_collections: StorageCollections,
130    /// Whether instance initialization has been completed.
131    initialized: bool,
132    /// Whether this instance is in read-only mode.
133    ///
134    /// When in read-only mode, this instance will not update persistent state, such as
135    /// wallclock lag introspection.
136    read_only: bool,
137    /// The workload class of this instance.
138    ///
139    /// This is currently only used to annotate metrics.
140    workload_class: Option<String>,
141    /// The replicas of this compute instance.
142    replicas: BTreeMap<ReplicaId, ReplicaState>,
143    /// Currently installed compute collections.
144    ///
145    /// New entries are added for all collections exported from dataflows created through
146    /// [`Instance::create_dataflow`].
147    ///
148    /// Entries are removed by [`Instance::cleanup_collections`]. See that method's documentation
149    /// about the conditions for removing collection state.
150    collections: BTreeMap<GlobalId, CollectionState>,
151    /// IDs of log sources maintained by this compute instance.
152    log_sources: BTreeMap<LogVariant, GlobalId>,
153    /// Currently outstanding peeks.
154    ///
155    /// New entries are added for all peeks initiated through [`Instance::peek`].
156    ///
157    /// The entry for a peek is only removed once all replicas have responded to the peek. This is
158    /// currently required to ensure all replicas have stopped reading from the peeked collection's
159    /// inputs before we allow them to compact. database-issues#4822 tracks changing this so we only have to wait
160    /// for the first peek response.
161    peeks: BTreeMap<Uuid, PendingPeek>,
162    /// Currently in-progress subscribes.
163    ///
164    /// New entries are added for all subscribes exported from dataflows created through
165    /// [`Instance::create_dataflow`].
166    ///
167    /// The entry for a subscribe is removed once at least one replica has reported the subscribe
168    /// to have advanced to the empty frontier or to have been dropped, implying that no further
169    /// updates will be emitted for this subscribe.
170    ///
171    /// Note that subscribes are tracked both in `collections` and `subscribes`. `collections`
172    /// keeps track of the subscribe's upper and since frontiers and ensures appropriate read holds
173    /// on the subscribe's input. `subscribes` is only used to track which updates have been
174    /// emitted, to decide if new ones should be emitted or suppressed.
175    subscribes: BTreeMap<GlobalId, ActiveSubscribe>,
176    /// Tracks all in-progress COPY TOs.
177    ///
178    /// New entries are added for all s3 oneshot sinks (corresponding to a COPY TO) exported from
179    /// dataflows created through [`Instance::create_dataflow`].
180    ///
181    /// The entry for a copy to is removed once at least one replica has finished
182    /// or the exporting collection is dropped.
183    copy_tos: BTreeSet<GlobalId>,
184    /// The command history, used when introducing new replicas or restarting existing replicas.
185    history: ComputeCommandHistory<UIntGauge>,
186    /// Receiver for commands to be executed.
187    command_rx: mpsc::UnboundedReceiver<Command>,
188    /// Sender for responses to be delivered.
189    response_tx: mpsc::UnboundedSender<ComputeControllerResponse>,
190    /// Sender for introspection updates to be recorded.
191    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
192    /// The registry the controller uses to report metrics.
193    metrics: InstanceMetrics,
194    /// Dynamic system configuration.
195    dyncfg: Arc<ConfigSet>,
196
197    /// The persist location where we can stash large peek results.
198    peek_stash_persist_location: PersistLocation,
199
200    /// A function that produces the current wallclock time.
201    now: NowFn,
202    /// A function that computes the lag between the given time and wallclock time.
203    wallclock_lag: WallclockLagFn<Timestamp>,
204    /// The last time wallclock lag introspection was recorded.
205    wallclock_lag_last_recorded: DateTime<Utc>,
206
207    /// Sender for updates to collection read holds.
208    ///
209    /// Copies of this sender are given to [`ReadHold`]s that are created in
210    /// [`CollectionState::new`].
211    read_hold_tx: read_holds::ChangeTx,
212    /// A sender for responses from replicas.
213    replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse, IntCounter>,
214    /// A receiver for responses from replicas.
215    replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse, IntCounter>,
216}
217
218impl Instance {
219    /// Acquire a handle to the collection state associated with `id`.
220    fn collection(&self, id: GlobalId) -> Result<&CollectionState, CollectionMissing> {
221        self.collections.get(&id).ok_or(CollectionMissing(id))
222    }
223
224    /// Acquire a mutable handle to the collection state associated with `id`.
225    fn collection_mut(&mut self, id: GlobalId) -> Result<&mut CollectionState, CollectionMissing> {
226        self.collections.get_mut(&id).ok_or(CollectionMissing(id))
227    }
228
229    /// Acquire a handle to the collection state associated with `id`.
230    ///
231    /// # Panics
232    ///
233    /// Panics if the identified collection does not exist.
234    fn expect_collection(&self, id: GlobalId) -> &CollectionState {
235        self.collections.get(&id).expect("collection must exist")
236    }
237
238    /// Acquire a mutable handle to the collection state associated with `id`.
239    ///
240    /// # Panics
241    ///
242    /// Panics if the identified collection does not exist.
243    fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
244        self.collections
245            .get_mut(&id)
246            .expect("collection must exist")
247    }
248
249    fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState)> {
250        self.collections.iter().map(|(id, coll)| (*id, coll))
251    }
252
253    /// Returns an iterator over replicas that host the given collection.
254    ///
255    /// For replica-targeted collections, this returns only the target replica.
256    /// For non-targeted collections, this returns all replicas.
257    ///
258    /// Returns `Err` if the collection does not exist.
259    fn replicas_hosting(
260        &self,
261        id: GlobalId,
262    ) -> Result<impl Iterator<Item = &ReplicaState>, CollectionMissing> {
263        let target = self.collection(id)?.target_replica;
264        Ok(self
265            .replicas
266            .values()
267            .filter(move |r| target.map_or(true, |t| t == r.id)))
268    }
269
270    /// Add a collection to the instance state.
271    ///
272    /// # Panics
273    ///
274    /// Panics if a collection with the same ID exists already.
275    fn add_collection(
276        &mut self,
277        id: GlobalId,
278        as_of: Antichain<Timestamp>,
279        shared: SharedCollectionState,
280        storage_dependencies: BTreeMap<GlobalId, ReadHold>,
281        compute_dependencies: BTreeMap<GlobalId, ReadHold>,
282        replica_input_read_holds: Vec<ReadHold>,
283        write_only: bool,
284        storage_sink: bool,
285        initial_as_of: Option<Antichain<Timestamp>>,
286        refresh_schedule: Option<RefreshSchedule>,
287        target_replica: Option<ReplicaId>,
288    ) {
289        // Add global collection state.
290        let dependency_ids: Vec<GlobalId> = compute_dependencies
291            .keys()
292            .chain(storage_dependencies.keys())
293            .copied()
294            .collect();
295        let introspection = CollectionIntrospection::new(
296            id,
297            self.introspection_tx.clone(),
298            as_of.clone(),
299            storage_sink,
300            initial_as_of,
301            refresh_schedule,
302            dependency_ids,
303        );
304        let mut state = CollectionState::new(
305            id,
306            as_of.clone(),
307            shared,
308            storage_dependencies,
309            compute_dependencies,
310            Arc::clone(&self.read_hold_tx),
311            introspection,
312        );
313        state.target_replica = target_replica;
314        // If the collection is write-only, clear its read policy to reflect that.
315        if write_only {
316            state.read_policy = None;
317        }
318
319        if let Some(previous) = self.collections.insert(id, state) {
320            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
321        }
322
323        // Add per-replica collection state.
324        for replica in self.replicas.values_mut() {
325            if target_replica.is_some_and(|id| id != replica.id) {
326                continue;
327            }
328            replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
329        }
330    }
331
332    fn remove_collection(&mut self, id: GlobalId) {
333        // Remove per-replica collection state.
334        for replica in self.replicas.values_mut() {
335            replica.remove_collection(id);
336        }
337
338        // Remove global collection state.
339        self.collections.remove(&id);
340    }
341
342    fn add_replica_state(
343        &mut self,
344        id: ReplicaId,
345        client: ReplicaClient,
346        config: ReplicaConfig,
347        epoch: u64,
348    ) {
349        let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
350
351        let metrics = self.metrics.for_replica(id);
352        let mut replica = ReplicaState::new(
353            id,
354            client,
355            config,
356            metrics,
357            self.introspection_tx.clone(),
358            epoch,
359        );
360
361        // Add per-replica collection state.
362        for (collection_id, collection) in &self.collections {
363            // Skip log collections not maintained by this replica,
364            // and collections targeted at a different replica.
365            if (collection.log_collection && !log_ids.contains(collection_id))
366                || collection.target_replica.is_some_and(|rid| rid != id)
367            {
368                continue;
369            }
370
371            let as_of = if collection.log_collection {
372                // For log collections, we don't send a `CreateDataflow` command to the replica, so
373                // it doesn't know which as-of the controler chose and defaults to the minimum
374                // frontier instead. We need to initialize the controller-side tracking with the
375                // same frontier, to avoid observing regressions in the reported frontiers.
376                Antichain::from_elem(Timestamp::MIN)
377            } else {
378                collection.read_frontier().to_owned()
379            };
380
381            let input_read_holds = collection.storage_dependencies.values().cloned().collect();
382            replica.add_collection(*collection_id, as_of, input_read_holds);
383        }
384
385        self.replicas.insert(id, replica);
386    }
387
388    /// Enqueue the given response for delivery to the controller clients.
389    fn deliver_response(&self, response: ComputeControllerResponse) {
390        // Failure to send means the `ComputeController` has been dropped and doesn't care about
391        // responses anymore.
392        let _ = self.response_tx.send(response);
393    }
394
395    /// Enqueue the given introspection updates for recording.
396    fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
397        // Failure to send means the `ComputeController` has been dropped and doesn't care about
398        // introspection updates anymore.
399        let _ = self.introspection_tx.send((type_, updates));
400    }
401
402    /// Returns whether the identified replica exists.
403    fn replica_exists(&self, id: ReplicaId) -> bool {
404        self.replicas.contains_key(&id)
405    }
406
407    /// Return the IDs of pending peeks targeting the specified replica.
408    fn peeks_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = (Uuid, &PendingPeek)> {
409        self.peeks.iter().filter_map(move |(uuid, peek)| {
410            if peek.target_replica == Some(replica_id) {
411                Some((*uuid, peek))
412            } else {
413                None
414            }
415        })
416    }
417
418    /// Return the IDs of in-progress subscribes targeting the specified replica.
419    fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
420        self.subscribes.keys().copied().filter(move |id| {
421            let collection = self.expect_collection(*id);
422            collection.target_replica == Some(replica_id)
423        })
424    }
425
426    /// Update introspection with the current collection frontiers.
427    ///
428    /// We could also do this directly in response to frontier changes, but doing it periodically
429    /// lets us avoid emitting some introspection updates that can be consolidated (e.g. a write
430    /// frontier updated immediately followed by a read frontier update).
431    ///
432    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
433    /// per second during normal operation.
434    fn update_frontier_introspection(&mut self) {
435        for collection in self.collections.values_mut() {
436            collection
437                .introspection
438                .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
439        }
440
441        for replica in self.replicas.values_mut() {
442            for collection in replica.collections.values_mut() {
443                collection
444                    .introspection
445                    .observe_frontier(&collection.write_frontier);
446            }
447        }
448    }
449
450    /// Refresh the controller state metrics for this instance.
451    ///
452    /// We could also do state metric updates directly in response to state changes, but that would
453    /// mean littering the code with metric update calls. Encapsulating state metric maintenance in
454    /// a single method is less noisy.
455    ///
456    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
457    /// per second during normal operation.
458    fn refresh_state_metrics(&self) {
459        let unscheduled_collections_count =
460            self.collections.values().filter(|c| !c.scheduled).count();
461        let connected_replica_count = self
462            .replicas
463            .values()
464            .filter(|r| r.client.is_connected())
465            .count();
466
467        self.metrics
468            .replica_count
469            .set(u64::cast_from(self.replicas.len()));
470        self.metrics
471            .collection_count
472            .set(u64::cast_from(self.collections.len()));
473        self.metrics
474            .collection_unscheduled_count
475            .set(u64::cast_from(unscheduled_collections_count));
476        self.metrics
477            .peek_count
478            .set(u64::cast_from(self.peeks.len()));
479        self.metrics
480            .subscribe_count
481            .set(u64::cast_from(self.subscribes.len()));
482        self.metrics
483            .copy_to_count
484            .set(u64::cast_from(self.copy_tos.len()));
485        self.metrics
486            .connected_replica_count
487            .set(u64::cast_from(connected_replica_count));
488    }
489
490    /// Refresh the wallclock lag introspection and metrics with the current lag values.
491    ///
492    /// This method produces wallclock lag metrics of two different shapes:
493    ///
494    /// * Histories: For each replica and each collection, we measure the lag of the write frontier
495    ///   behind the wallclock time every second. Every minute we emit the maximum lag observed
496    ///   over the last minute, together with the current time.
497    /// * Histograms: For each collection, we measure the lag of the write frontier behind
498    ///   wallclock time every second. Every minute we emit all lags observed over the last minute,
499    ///   together with the current histogram period.
500    ///
501    /// Histories are emitted to both Mz introspection and Prometheus, histograms only to
502    /// introspection. We treat lags of unreadable collections (i.e. collections that contain no
503    /// readable times) as undefined and set them to NULL in introspection and `u64::MAX` in
504    /// Prometheus.
505    ///
506    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
507    /// per second during normal operation.
508    fn refresh_wallclock_lag(&mut self) {
509        let frontier_lag = |frontier: &Antichain<Timestamp>| match frontier.as_option() {
510            Some(ts) => (self.wallclock_lag)(ts.clone()),
511            None => Duration::ZERO,
512        };
513
514        let now_ms = (self.now)();
515        let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
516        let histogram_labels = match &self.workload_class {
517            Some(wc) => [("workload_class", wc.clone())].into(),
518            None => BTreeMap::new(),
519        };
520
521        // For collections that sink into storage, we need to ask the storage controller to know
522        // whether they're currently readable.
523        let readable_storage_collections: BTreeSet<_> = self
524            .collections
525            .keys()
526            .filter_map(|id| {
527                let frontiers = self.storage_collections.collection_frontiers(*id).ok()?;
528                PartialOrder::less_than(&frontiers.read_capabilities, &frontiers.write_frontier)
529                    .then_some(*id)
530            })
531            .collect();
532
533        // First, iterate over all collections and collect histogram measurements.
534        for (id, collection) in &mut self.collections {
535            let write_frontier = collection.write_frontier();
536            let readable = if self.storage_collections.check_exists(*id).is_ok() {
537                readable_storage_collections.contains(id)
538            } else {
539                PartialOrder::less_than(&collection.read_frontier(), &write_frontier)
540            };
541
542            if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
543                let bucket = if readable {
544                    let lag = frontier_lag(&write_frontier);
545                    let lag = lag.as_secs().next_power_of_two();
546                    WallclockLag::Seconds(lag)
547                } else {
548                    WallclockLag::Undefined
549                };
550
551                let key = (histogram_period, bucket, histogram_labels.clone());
552                *stash.entry(key).or_default() += Diff::ONE;
553            }
554        }
555
556        // Second, iterate over all per-replica collections and collect history measurements.
557        for replica in self.replicas.values_mut() {
558            for (id, collection) in &mut replica.collections {
559                // A per-replica collection is considered readable in the context of lag
560                // measurement if either:
561                //  (a) it sinks into a storage collection that is readable
562                //  (b) it is hydrated
563                let readable = readable_storage_collections.contains(id) || collection.hydrated();
564
565                let lag = if readable {
566                    let lag = frontier_lag(&collection.write_frontier);
567                    WallclockLag::Seconds(lag.as_secs())
568                } else {
569                    WallclockLag::Undefined
570                };
571
572                if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
573                    *wallclock_lag_max = (*wallclock_lag_max).max(lag);
574                }
575
576                if let Some(metrics) = &mut collection.metrics {
577                    // No way to specify values as undefined in Prometheus metrics, so we use the
578                    // maximum value instead.
579                    let secs = lag.unwrap_seconds_or(u64::MAX);
580                    metrics.wallclock_lag.observe(secs);
581                };
582            }
583        }
584
585        // Record lags to persist, if it's time.
586        self.maybe_record_wallclock_lag();
587    }
588
589    /// Produce new wallclock lag introspection updates, provided enough time has passed since the
590    /// last recording.
591    //
592    /// We emit new introspection updates if the system time has passed into a new multiple of the
593    /// recording interval (typically 1 minute) since the last refresh. The storage controller uses
594    /// the same approach, ensuring that both controllers commit their lags at roughly the same
595    /// time, avoiding confusion caused by inconsistencies.
596    fn maybe_record_wallclock_lag(&mut self) {
597        if self.read_only {
598            return;
599        }
600
601        let duration_trunc = |datetime: DateTime<_>, interval| {
602            let td = TimeDelta::from_std(interval).ok()?;
603            datetime.duration_trunc(td).ok()
604        };
605
606        let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
607        let now_dt = mz_ore::now::to_datetime((self.now)());
608        let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
609            soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
610            let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
611            duration_trunc(now_dt, *default).unwrap()
612        });
613        if now_trunc <= self.wallclock_lag_last_recorded {
614            return;
615        }
616
617        let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
618
619        let mut history_updates = Vec::new();
620        for (replica_id, replica) in &mut self.replicas {
621            for (collection_id, collection) in &mut replica.collections {
622                let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
623                    continue;
624                };
625
626                let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
627                let row = Row::pack_slice(&[
628                    Datum::String(&collection_id.to_string()),
629                    Datum::String(&replica_id.to_string()),
630                    max_lag.into_interval_datum(),
631                    Datum::TimestampTz(now_ts),
632                ]);
633                history_updates.push((row, Diff::ONE));
634            }
635        }
636        if !history_updates.is_empty() {
637            self.deliver_introspection_updates(
638                IntrospectionType::WallclockLagHistory,
639                history_updates,
640            );
641        }
642
643        let mut histogram_updates = Vec::new();
644        let mut row_buf = Row::default();
645        for (collection_id, collection) in &mut self.collections {
646            let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
647                continue;
648            };
649
650            for ((period, lag, labels), count) in std::mem::take(stash) {
651                let mut packer = row_buf.packer();
652                packer.extend([
653                    Datum::TimestampTz(period.start),
654                    Datum::TimestampTz(period.end),
655                    Datum::String(&collection_id.to_string()),
656                    lag.into_uint64_datum(),
657                ]);
658                let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
659                packer.push_dict(labels);
660
661                histogram_updates.push((row_buf.clone(), count));
662            }
663        }
664        if !histogram_updates.is_empty() {
665            self.deliver_introspection_updates(
666                IntrospectionType::WallclockLagHistogram,
667                histogram_updates,
668            );
669        }
670
671        self.wallclock_lag_last_recorded = now_trunc;
672    }
673
674    /// Returns `true` if the given collection is hydrated on at least one
675    /// replica.
676    ///
677    /// This also returns `true` in case this cluster does not have any
678    /// replicas that host the given collection.
679    #[mz_ore::instrument(level = "debug")]
680    pub fn collection_hydrated(&self, collection_id: GlobalId) -> Result<bool, CollectionMissing> {
681        let mut hosting_replicas = self.replicas_hosting(collection_id)?.peekable();
682        if hosting_replicas.peek().is_none() {
683            return Ok(true);
684        }
685        for replica_state in hosting_replicas {
686            let collection_state = replica_state
687                .collections
688                .get(&collection_id)
689                .expect("hosting replica must have per-replica collection state");
690
691            if collection_state.hydrated() {
692                return Ok(true);
693            }
694        }
695
696        Ok(false)
697    }
698
699    /// Returns `true` if each non-transient, non-excluded collection is hydrated on at
700    /// least one replica.
701    ///
702    /// This also returns `true` in case this cluster does not have any
703    /// replicas.
704    #[mz_ore::instrument(level = "debug")]
705    pub fn collections_hydrated_on_replicas(
706        &self,
707        target_replica_ids: Option<Vec<ReplicaId>>,
708        exclude_collections: &BTreeSet<GlobalId>,
709    ) -> Result<bool, HydrationCheckBadTarget> {
710        if self.replicas.is_empty() {
711            return Ok(true);
712        }
713        let mut all_hydrated = true;
714        let target_replicas: BTreeSet<ReplicaId> = self
715            .replicas
716            .keys()
717            .filter_map(|id| match target_replica_ids {
718                None => Some(id.clone()),
719                Some(ref ids) if ids.contains(id) => Some(id.clone()),
720                Some(_) => None,
721            })
722            .collect();
723        if let Some(targets) = target_replica_ids {
724            if target_replicas.is_empty() {
725                return Err(HydrationCheckBadTarget(targets));
726            }
727        }
728
729        for (id, _collection) in self.collections_iter() {
730            if id.is_transient() || exclude_collections.contains(&id) {
731                continue;
732            }
733
734            let mut collection_hydrated = false;
735            // `replicas_hosting` cannot fail here because `collections_iter`
736            // only yields collections that exist.
737            for replica_state in self.replicas_hosting(id).expect("collection must exist") {
738                if !target_replicas.contains(&replica_state.id) {
739                    continue;
740                }
741                let collection_state = replica_state
742                    .collections
743                    .get(&id)
744                    .expect("hosting replica must have per-replica collection state");
745
746                if collection_state.hydrated() {
747                    collection_hydrated = true;
748                    break;
749                }
750            }
751
752            if !collection_hydrated {
753                tracing::info!("collection {id} is not hydrated on any replica");
754                all_hydrated = false;
755                // We continue with our loop instead of breaking out early, so
756                // that we log all non-hydrated replicas.
757            }
758        }
759
760        Ok(all_hydrated)
761    }
762
763    /// Clean up collection state that is not needed anymore.
764    ///
765    /// Three conditions need to be true before we can remove state for a collection:
766    ///
767    ///  1. A client must have explicitly dropped the collection. If that is not the case, clients
768    ///     can still reasonably assume that the controller knows about the collection and can
769    ///     answer queries about it.
770    ///  2. There must be no outstanding read capabilities on the collection. As long as someone
771    ///     still holds read capabilities on a collection, we need to keep it around to be able
772    ///     to properly handle downgrading of said capabilities.
773    ///  3. All replica frontiers for the collection must have advanced to the empty frontier.
774    ///     Advancement to the empty frontiers signals that replicas are done computing the
775    ///     collection and that they won't send more `ComputeResponse`s for it. As long as we might
776    ///     receive responses for a collection we want to keep it around to be able to validate and
777    ///     handle these responses.
778    fn cleanup_collections(&mut self) {
779        let to_remove: Vec<_> = self
780            .collections_iter()
781            .filter(|(id, collection)| {
782                collection.dropped
783                    && collection.shared.lock_read_capabilities(|c| c.is_empty())
784                    && self
785                        .replicas
786                        .values()
787                        .all(|r| r.collection_frontiers_empty(*id))
788            })
789            .map(|(id, _collection)| id)
790            .collect();
791
792        for id in to_remove {
793            self.remove_collection(id);
794        }
795    }
796
797    /// Returns the state of the [`Instance`] formatted as JSON.
798    ///
799    /// The returned value is not guaranteed to be stable and may change at any point in time.
800    #[mz_ore::instrument(level = "debug")]
801    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
802        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
803        // returned object as a tradeoff between usability and stability. `serde_json` will fail
804        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
805        // prevents a future unrelated change from silently breaking this method.
806
807        // Destructure `self` here so we don't forget to consider dumping newly added fields.
808        let Self {
809            build_info: _,
810            storage_collections: _,
811            peek_stash_persist_location: _,
812            initialized,
813            read_only,
814            workload_class,
815            replicas,
816            collections,
817            log_sources: _,
818            peeks,
819            subscribes,
820            copy_tos,
821            history: _,
822            command_rx: _,
823            response_tx: _,
824            introspection_tx: _,
825            metrics: _,
826            dyncfg: _,
827            now: _,
828            wallclock_lag: _,
829            wallclock_lag_last_recorded,
830            read_hold_tx: _,
831            replica_tx: _,
832            replica_rx: _,
833        } = self;
834
835        let replicas: BTreeMap<_, _> = replicas
836            .iter()
837            .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
838            .collect::<Result<_, anyhow::Error>>()?;
839        let collections: BTreeMap<_, _> = collections
840            .iter()
841            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
842            .collect();
843        let peeks: BTreeMap<_, _> = peeks
844            .iter()
845            .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
846            .collect();
847        let subscribes: BTreeMap<_, _> = subscribes
848            .iter()
849            .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
850            .collect();
851        let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
852        let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
853
854        Ok(serde_json::json!({
855            "initialized": initialized,
856            "read_only": read_only,
857            "workload_class": workload_class,
858            "replicas": replicas,
859            "collections": collections,
860            "peeks": peeks,
861            "subscribes": subscribes,
862            "copy_tos": copy_tos,
863            "wallclock_lag_last_recorded": wallclock_lag_last_recorded,
864        }))
865    }
866
867    /// Reports the current write frontier for the identified compute collection.
868    pub(super) fn collection_write_frontier(
869        &self,
870        id: GlobalId,
871    ) -> Result<Antichain<Timestamp>, CollectionMissing> {
872        Ok(self.collection(id)?.write_frontier())
873    }
874}
875
876impl Instance {
877    pub(super) fn new(
878        build_info: &'static BuildInfo,
879        storage: StorageCollections,
880        peek_stash_persist_location: PersistLocation,
881        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState)>,
882        metrics: InstanceMetrics,
883        now: NowFn,
884        wallclock_lag: WallclockLagFn<Timestamp>,
885        dyncfg: Arc<ConfigSet>,
886        command_rx: mpsc::UnboundedReceiver<Command>,
887        response_tx: mpsc::UnboundedSender<ComputeControllerResponse>,
888        read_hold_tx: read_holds::ChangeTx,
889        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
890        read_only: bool,
891    ) -> Self {
892        let mut collections = BTreeMap::new();
893        let mut log_sources = BTreeMap::new();
894        for (log, id, shared) in arranged_logs {
895            let collection = CollectionState::new_log_collection(
896                id,
897                shared,
898                Arc::clone(&read_hold_tx),
899                introspection_tx.clone(),
900            );
901            collections.insert(id, collection);
902            log_sources.insert(log, id);
903        }
904
905        let history = ComputeCommandHistory::new(metrics.for_history());
906
907        let send_count = metrics.response_send_count.clone();
908        let recv_count = metrics.response_recv_count.clone();
909        let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
910
911        let now_dt = mz_ore::now::to_datetime(now());
912
913        Self {
914            build_info,
915            storage_collections: storage,
916            peek_stash_persist_location,
917            initialized: false,
918            read_only,
919            workload_class: None,
920            replicas: Default::default(),
921            collections,
922            log_sources,
923            peeks: Default::default(),
924            subscribes: Default::default(),
925            copy_tos: Default::default(),
926            history,
927            command_rx,
928            response_tx,
929            introspection_tx,
930            metrics,
931            dyncfg,
932            now,
933            wallclock_lag,
934            wallclock_lag_last_recorded: now_dt,
935            read_hold_tx,
936            replica_tx,
937            replica_rx,
938        }
939    }
940
941    pub(super) async fn run(mut self) {
942        self.send(ComputeCommand::Hello {
943            // The nonce is protocol iteration-specific and will be set in
944            // `ReplicaTask::specialize_command`.
945            nonce: Uuid::default(),
946        });
947
948        let instance_config = InstanceConfig {
949            peek_stash_persist_location: self.peek_stash_persist_location.clone(),
950            // The remaining fields are replica-specific and will be set in
951            // `ReplicaTask::specialize_command`.
952            logging: Default::default(),
953            expiration_offset: Default::default(),
954        };
955
956        self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
957
958        loop {
959            tokio::select! {
960                command = self.command_rx.recv() => match command {
961                    Some(cmd) => cmd(&mut self),
962                    None => break,
963                },
964                response = self.replica_rx.recv() => match response {
965                    Some(response) => self.handle_response(response),
966                    None => unreachable!("self owns a sender side of the channel"),
967                }
968            }
969        }
970    }
971
972    /// Update instance configuration.
973    #[mz_ore::instrument(level = "debug")]
974    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
975        if let Some(workload_class) = &config_params.workload_class {
976            self.workload_class = workload_class.clone();
977        }
978
979        let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
980        self.send(command);
981    }
982
983    /// Marks the end of any initialization commands.
984    ///
985    /// Intended to be called by `Controller`, rather than by other code.
986    /// Calling this method repeatedly has no effect.
987    #[mz_ore::instrument(level = "debug")]
988    pub fn initialization_complete(&mut self) {
989        // The compute protocol requires that `InitializationComplete` is sent only once.
990        if !self.initialized {
991            self.send(ComputeCommand::InitializationComplete);
992            self.initialized = true;
993        }
994    }
995
996    /// Allows collections to affect writes to external systems (persist).
997    ///
998    /// Calling this method repeatedly has no effect.
999    #[mz_ore::instrument(level = "debug")]
1000    pub fn allow_writes(&mut self, collection_id: GlobalId) -> Result<(), CollectionMissing> {
1001        let collection = self.collection_mut(collection_id)?;
1002
1003        // Do not send redundant allow-writes commands.
1004        if !collection.read_only {
1005            return Ok(());
1006        }
1007
1008        // Don't send allow-writes for collections that are not installed.
1009        let as_of = collection.read_frontier();
1010
1011        // If the collection has an empty `as_of`, it was either never installed on the replica or
1012        // has since been dropped. In either case the replica does not expect any commands for it.
1013        if as_of.is_empty() {
1014            return Ok(());
1015        }
1016
1017        collection.read_only = false;
1018        self.send(ComputeCommand::AllowWrites(collection_id));
1019
1020        Ok(())
1021    }
1022
1023    /// Shut down this instance.
1024    ///
1025    /// This method asserts that the instance has no replicas left. It exists to help
1026    /// us find bugs where the client drops a compute instance that still has replicas
1027    /// installed, and later assumes that said replicas still exist.
1028    ///
1029    /// # Panics
1030    ///
1031    /// Soft-panics if the compute instance still has active replicas.
1032    #[mz_ore::instrument(level = "debug")]
1033    pub fn shutdown(&mut self) {
1034        // Taking the `command_rx` ensures that the [`Instance::run`] loop terminates.
1035        let (_tx, rx) = mpsc::unbounded_channel();
1036        self.command_rx = rx;
1037
1038        let stray_replicas: Vec<_> = self.replicas.keys().collect();
1039        soft_assert_or_log!(
1040            stray_replicas.is_empty(),
1041            "dropped instance still has provisioned replicas: {stray_replicas:?}",
1042        );
1043    }
1044
1045    /// Sends a command to replicas of this instance.
1046    #[mz_ore::instrument(level = "debug")]
1047    fn send(&mut self, cmd: ComputeCommand) {
1048        // Record the command so that new replicas can be brought up to speed.
1049        self.history.push(cmd.clone());
1050
1051        let target_replica = self.target_replica(&cmd);
1052
1053        if let Some(rid) = target_replica {
1054            if let Some(replica) = self.replicas.get_mut(&rid) {
1055                let _ = replica.client.send(cmd);
1056            }
1057        } else {
1058            for replica in self.replicas.values_mut() {
1059                let _ = replica.client.send(cmd.clone());
1060            }
1061        }
1062    }
1063
1064    /// Determine the target replica for a compute command. Retrieves the
1065    /// collection named by the command, and returns the target replica if
1066    /// it is set, and None if not set, or the command doesn't name a collection.
1067    ///
1068    /// Panics if a create-dataflow command names collections that have different
1069    /// target replicas. It is an error to construct such an object and would
1070    /// indicate a bug in [`Self::create_dataflow`].
1071    fn target_replica(&self, cmd: &ComputeCommand) -> Option<ReplicaId> {
1072        match &cmd {
1073            ComputeCommand::Schedule(id)
1074            | ComputeCommand::AllowWrites(id)
1075            | ComputeCommand::AllowCompaction { id, .. } => {
1076                self.expect_collection(*id).target_replica
1077            }
1078            ComputeCommand::CreateDataflow(desc) => {
1079                let mut target_replica = None;
1080                for id in desc.export_ids() {
1081                    if let Some(replica) = self.expect_collection(id).target_replica {
1082                        if target_replica.is_some() {
1083                            assert_eq!(target_replica, Some(replica));
1084                        }
1085                        target_replica = Some(replica);
1086                    }
1087                }
1088                target_replica
1089            }
1090            // Skip Peek as we don't allow replica-targeted indexes.
1091            ComputeCommand::Peek(_)
1092            | ComputeCommand::Hello { .. }
1093            | ComputeCommand::CreateInstance(_)
1094            | ComputeCommand::InitializationComplete
1095            | ComputeCommand::UpdateConfiguration(_)
1096            | ComputeCommand::CancelPeek { .. } => None,
1097        }
1098    }
1099
1100    /// Add a new instance replica, by ID.
1101    #[mz_ore::instrument(level = "debug")]
1102    pub fn add_replica(
1103        &mut self,
1104        id: ReplicaId,
1105        mut config: ReplicaConfig,
1106        epoch: Option<u64>,
1107    ) -> Result<(), ReplicaExists> {
1108        if self.replica_exists(id) {
1109            return Err(ReplicaExists(id));
1110        }
1111
1112        config.logging.index_logs = self.log_sources.clone();
1113
1114        let epoch = epoch.unwrap_or(1);
1115        let metrics = self.metrics.for_replica(id);
1116        let client = ReplicaClient::spawn(
1117            id,
1118            self.build_info,
1119            config.clone(),
1120            epoch,
1121            metrics.clone(),
1122            Arc::clone(&self.dyncfg),
1123            self.replica_tx.clone(),
1124        );
1125
1126        // Take this opportunity to clean up the history we should present.
1127        self.history.reduce();
1128
1129        // Advance the uppers of source imports
1130        self.history.update_source_uppers(&self.storage_collections);
1131
1132        // Replay the commands at the client, creating new dataflow identifiers.
1133        for command in self.history.iter() {
1134            // Skip `CreateDataflow` commands targeted at different replicas.
1135            if let Some(target_replica) = self.target_replica(command)
1136                && target_replica != id
1137            {
1138                continue;
1139            }
1140
1141            if client.send(command.clone()).is_err() {
1142                // We swallow the error here. On the next send, we will fail again, and
1143                // restart the connection as well as this rehydration.
1144                tracing::warn!("Replica {:?} connection terminated during hydration", id);
1145                break;
1146            }
1147        }
1148
1149        // Add replica to tracked state.
1150        self.add_replica_state(id, client, config, epoch);
1151
1152        Ok(())
1153    }
1154
1155    /// Remove an existing instance replica, by ID.
1156    #[mz_ore::instrument(level = "debug")]
1157    pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1158        let replica = self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1159
1160        // Before dropping the replica state (and the contained input read holds), log read holds
1161        // that are the last line of defense against compaction of a dataflow's storage inputs. If
1162        // the corresponding global read hold has already been released, dropping the per-replica
1163        // read hold will allow compaction, which can cause the replica to panic trying to install
1164        // the dataflow.
1165        //
1166        // This exists primarily to help diagnose incidents-and-escalations#39.
1167        for (collection_id, replica_collection) in &replica.collections {
1168            let collection = self.collections.get(collection_id);
1169            for replica_hold in &replica_collection.input_read_holds {
1170                let input_id = replica_hold.id();
1171                let global_hold = collection.and_then(|c| c.storage_dependencies.get(&input_id));
1172                let unprotected = global_hold
1173                    .is_none_or(|h| PartialOrder::less_than(replica_hold.since(), h.since()));
1174                if unprotected {
1175                    tracing::warn!(
1176                        replica_id = %id,
1177                        %collection_id,
1178                        %input_id,
1179                        replica_hold_since = ?replica_hold.since(),
1180                        global_hold_since = ?global_hold.map(|h| h.since()),
1181                        "dropping per-replica read hold without equivalent global read hold",
1182                    );
1183                }
1184            }
1185        }
1186        drop(replica);
1187
1188        // Subscribes targeting this replica either won't be served anymore (if the replica is
1189        // dropped) or might produce inconsistent output (if the target collection is an
1190        // introspection index). We produce an error to inform upstream.
1191        let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1192        for subscribe_id in to_drop {
1193            let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1194            let response = ComputeControllerResponse::SubscribeResponse(
1195                subscribe_id,
1196                SubscribeBatch {
1197                    lower: subscribe.frontier.clone(),
1198                    upper: subscribe.frontier,
1199                    updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1200                },
1201            );
1202            self.deliver_response(response);
1203        }
1204
1205        // Peeks targeting this replica might not be served anymore (if the replica is dropped).
1206        // If the replica has failed it might come back and respond to the peek later, but it still
1207        // seems like a good idea to cancel the peek to inform the caller about the failure. This
1208        // is consistent with how we handle targeted subscribes above.
1209        let mut peek_responses = Vec::new();
1210        let mut to_drop = Vec::new();
1211        for (uuid, peek) in self.peeks_targeting(id) {
1212            peek_responses.push(ComputeControllerResponse::PeekNotification(
1213                uuid,
1214                PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1215                peek.otel_ctx.clone(),
1216            ));
1217            to_drop.push(uuid);
1218        }
1219        for response in peek_responses {
1220            self.deliver_response(response);
1221        }
1222        for uuid in to_drop {
1223            let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1224            self.finish_peek(uuid, response);
1225        }
1226
1227        // We might have a chance to forward implied capabilities and reduce the cost of bringing
1228        // up the next replica, if the dropped replica was the only one in the cluster.
1229        self.forward_implied_capabilities();
1230
1231        Ok(())
1232    }
1233
1234    /// Rehydrate the given instance replica.
1235    ///
1236    /// # Panics
1237    ///
1238    /// Panics if the specified replica does not exist.
1239    fn rehydrate_replica(&mut self, id: ReplicaId) {
1240        let config = self.replicas[&id].config.clone();
1241        let epoch = self.replicas[&id].epoch + 1;
1242
1243        self.remove_replica(id).expect("replica must exist");
1244        let result = self.add_replica(id, config, Some(epoch));
1245
1246        match result {
1247            Ok(()) => (),
1248            Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1249        }
1250    }
1251
1252    /// Rehydrate any failed replicas of this instance.
1253    fn rehydrate_failed_replicas(&mut self) {
1254        let replicas = self.replicas.iter();
1255        let failed_replicas: Vec<_> = replicas
1256            .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1257            .collect();
1258
1259        for replica_id in failed_replicas {
1260            self.rehydrate_replica(replica_id);
1261        }
1262    }
1263
1264    /// Creates the described dataflow and initializes state for its output.
1265    ///
1266    /// This method expects a `DataflowDescription` with an `as_of` frontier specified, as well as
1267    /// for each imported collection a read hold in `import_read_holds` at at least the `as_of`.
1268    #[mz_ore::instrument(level = "debug")]
1269    pub fn create_dataflow(
1270        &mut self,
1271        dataflow: DataflowDescription<mz_compute_types::plan::Plan, ()>,
1272        import_read_holds: Vec<ReadHold>,
1273        mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState>,
1274        target_replica: Option<ReplicaId>,
1275    ) -> Result<(), DataflowCreationError> {
1276        use DataflowCreationError::*;
1277
1278        // Validate that the target replica, if specified, exists.
1279        // A targeted dataflow is only installed on a single replica; if that
1280        // replica doesn't exist, we can't create the dataflow.
1281        if let Some(replica_id) = target_replica {
1282            if !self.replica_exists(replica_id) {
1283                return Err(ReplicaMissing(replica_id));
1284            }
1285        }
1286
1287        // Simple sanity checks around `as_of`
1288        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1289        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1290            return Err(EmptyAsOfForSubscribe);
1291        }
1292        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1293            return Err(EmptyAsOfForCopyTo);
1294        }
1295
1296        // Collect all dependencies of the dataflow, and read holds on them at the `as_of`.
1297        let mut storage_dependencies = BTreeMap::new();
1298        let mut compute_dependencies = BTreeMap::new();
1299
1300        // When we install per-replica input read holds, we cannot use the `as_of` because of
1301        // reconciliation: Existing slow replicas might be reading from the inputs at times before
1302        // the `as_of` and we would rather not crash them by allowing their inputs to compact too
1303        // far. So instead we take read holds at the least time available.
1304        let mut replica_input_read_holds = Vec::new();
1305
1306        let mut import_read_holds: BTreeMap<_, _> =
1307            import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1308
1309        for &id in dataflow.source_imports.keys() {
1310            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1311            replica_input_read_holds.push(read_hold.clone());
1312
1313            read_hold
1314                .try_downgrade(as_of.clone())
1315                .map_err(|_| ReadHoldInsufficient(id))?;
1316            storage_dependencies.insert(id, read_hold);
1317        }
1318
1319        for &id in dataflow.index_imports.keys() {
1320            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1321            read_hold
1322                .try_downgrade(as_of.clone())
1323                .map_err(|_| ReadHoldInsufficient(id))?;
1324            compute_dependencies.insert(id, read_hold);
1325        }
1326
1327        // If the `as_of` is empty, we are not going to create a dataflow, so replicas won't read
1328        // from the inputs.
1329        if as_of.is_empty() {
1330            replica_input_read_holds = Default::default();
1331        }
1332
1333        // Install collection state for each of the exports.
1334        for export_id in dataflow.export_ids() {
1335            let shared = shared_collection_state
1336                .remove(&export_id)
1337                .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1338            let write_only = dataflow.sink_exports.contains_key(&export_id);
1339            let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1340
1341            self.add_collection(
1342                export_id,
1343                as_of.clone(),
1344                shared,
1345                storage_dependencies.clone(),
1346                compute_dependencies.clone(),
1347                replica_input_read_holds.clone(),
1348                write_only,
1349                storage_sink,
1350                dataflow.initial_storage_as_of.clone(),
1351                dataflow.refresh_schedule.clone(),
1352                target_replica,
1353            );
1354
1355            // If the export is a storage sink, we can advance its write frontier to the write
1356            // frontier of the target storage collection.
1357            if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1358                self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1359            }
1360        }
1361
1362        // Initialize tracking of subscribes.
1363        for subscribe_id in dataflow.subscribe_ids() {
1364            self.subscribes
1365                .insert(subscribe_id, ActiveSubscribe::default());
1366        }
1367
1368        // Initialize tracking of copy tos.
1369        for copy_to_id in dataflow.copy_to_ids() {
1370            self.copy_tos.insert(copy_to_id);
1371        }
1372
1373        // Here we augment all imported sources and all exported sinks with the appropriate
1374        // storage metadata needed by the compute instance.
1375        let mut source_imports = BTreeMap::new();
1376        for (id, import) in dataflow.source_imports {
1377            let frontiers = self
1378                .storage_collections
1379                .collection_frontiers(id)
1380                .expect("collection exists");
1381
1382            let collection_metadata = self
1383                .storage_collections
1384                .collection_metadata(id)
1385                .expect("we have a read hold on this collection");
1386
1387            let desc = SourceInstanceDesc {
1388                storage_metadata: collection_metadata.clone(),
1389                arguments: import.desc.arguments,
1390                typ: import.desc.typ.clone(),
1391            };
1392            source_imports.insert(
1393                id,
1394                mz_compute_types::dataflows::SourceImport {
1395                    desc,
1396                    monotonic: import.monotonic,
1397                    with_snapshot: import.with_snapshot,
1398                    upper: frontiers.write_frontier,
1399                },
1400            );
1401        }
1402
1403        let mut sink_exports = BTreeMap::new();
1404        for (id, se) in dataflow.sink_exports {
1405            let connection = match se.connection {
1406                ComputeSinkConnection::MaterializedView(conn) => {
1407                    let metadata = self
1408                        .storage_collections
1409                        .collection_metadata(id)
1410                        .map_err(|_| CollectionMissing(id))?
1411                        .clone();
1412                    let conn = MaterializedViewSinkConnection {
1413                        value_desc: conn.value_desc,
1414                        storage_metadata: metadata,
1415                    };
1416                    ComputeSinkConnection::MaterializedView(conn)
1417                }
1418                ComputeSinkConnection::ContinualTask(conn) => {
1419                    let metadata = self
1420                        .storage_collections
1421                        .collection_metadata(id)
1422                        .map_err(|_| DataflowCreationError::CollectionMissing(id))?
1423                        .clone();
1424                    let conn = ContinualTaskConnection {
1425                        input_id: conn.input_id,
1426                        storage_metadata: metadata,
1427                    };
1428                    ComputeSinkConnection::ContinualTask(conn)
1429                }
1430                ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1431                ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1432                    ComputeSinkConnection::CopyToS3Oneshot(conn)
1433                }
1434            };
1435            let desc = ComputeSinkDesc {
1436                from: se.from,
1437                from_desc: se.from_desc,
1438                connection,
1439                with_snapshot: se.with_snapshot,
1440                up_to: se.up_to,
1441                non_null_assertions: se.non_null_assertions,
1442                refresh_schedule: se.refresh_schedule,
1443            };
1444            sink_exports.insert(id, desc);
1445        }
1446
1447        // Flatten the dataflow plans into the representation expected by replicas.
1448        let objects_to_build = dataflow
1449            .objects_to_build
1450            .into_iter()
1451            .map(|object| BuildDesc {
1452                id: object.id,
1453                plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1454            })
1455            .collect();
1456
1457        let augmented_dataflow = DataflowDescription {
1458            source_imports,
1459            sink_exports,
1460            objects_to_build,
1461            // The rest of the fields are identical
1462            index_imports: dataflow.index_imports,
1463            index_exports: dataflow.index_exports,
1464            as_of: dataflow.as_of.clone(),
1465            until: dataflow.until,
1466            initial_storage_as_of: dataflow.initial_storage_as_of,
1467            refresh_schedule: dataflow.refresh_schedule,
1468            debug_name: dataflow.debug_name,
1469            time_dependence: dataflow.time_dependence,
1470        };
1471
1472        if augmented_dataflow.is_transient() {
1473            tracing::debug!(
1474                name = %augmented_dataflow.debug_name,
1475                import_ids = %augmented_dataflow.display_import_ids(),
1476                export_ids = %augmented_dataflow.display_export_ids(),
1477                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1478                until = ?augmented_dataflow.until.elements(),
1479                "creating dataflow",
1480            );
1481        } else {
1482            tracing::info!(
1483                name = %augmented_dataflow.debug_name,
1484                import_ids = %augmented_dataflow.display_import_ids(),
1485                export_ids = %augmented_dataflow.display_export_ids(),
1486                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1487                until = ?augmented_dataflow.until.elements(),
1488                "creating dataflow",
1489            );
1490        }
1491
1492        // Skip the actual dataflow creation for an empty `as_of`. (Happens e.g. for the
1493        // bootstrapping of a REFRESH AT mat view that is past its last refresh.)
1494        if as_of.is_empty() {
1495            tracing::info!(
1496                name = %augmented_dataflow.debug_name,
1497                "not sending `CreateDataflow`, because of empty `as_of`",
1498            );
1499        } else {
1500            let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1501            self.send(ComputeCommand::CreateDataflow(Box::new(augmented_dataflow)));
1502
1503            for id in collections {
1504                self.maybe_schedule_collection(id);
1505            }
1506        }
1507
1508        Ok(())
1509    }
1510
1511    /// Schedule the identified collection if all its inputs are available.
1512    ///
1513    /// # Panics
1514    ///
1515    /// Panics if the identified collection does not exist.
1516    fn maybe_schedule_collection(&mut self, id: GlobalId) {
1517        let collection = self.expect_collection(id);
1518
1519        // Don't schedule collections twice.
1520        if collection.scheduled {
1521            return;
1522        }
1523
1524        let as_of = collection.read_frontier();
1525
1526        // If the collection has an empty `as_of`, it was either never installed on the replica or
1527        // has since been dropped. In either case the replica does not expect any commands for it.
1528        if as_of.is_empty() {
1529            return;
1530        }
1531
1532        let ready = if id.is_transient() {
1533            // Always schedule transient collections immediately. The assumption is that those are
1534            // created by interactive user commands and we want to schedule them as quickly as
1535            // possible. Inputs might not yet be available, but when they become available, we
1536            // don't need to wait for the controller to become aware and for the scheduling check
1537            // to run again.
1538            true
1539        } else {
1540            // Ignore self-dependencies. Any self-dependencies do not need to be
1541            // available at the as_of for the dataflow to make progress, so we
1542            // can ignore them here. At the moment, only continual tasks have
1543            // self-dependencies, but this logic is correct for any dataflow, so
1544            // we don't special case it to CTs.
1545            let not_self_dep = |x: &GlobalId| *x != id;
1546
1547            // Make sure we never schedule a collection before its input compute collections have
1548            // been scheduled. Scheduling in the wrong order can lead to deadlocks.
1549            let mut deps_scheduled = true;
1550
1551            // Check dependency frontiers to determine if all inputs are
1552            // available. An input is available when its frontier is greater
1553            // than the `as_of`, i.e., all input data up to and including the
1554            // `as_of` has been sealed.
1555            let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1556            let mut compute_frontiers = Vec::new();
1557            for id in compute_deps {
1558                let dep = &self.expect_collection(id);
1559                deps_scheduled &= dep.scheduled;
1560                compute_frontiers.push(dep.write_frontier());
1561            }
1562
1563            let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1564            let storage_frontiers = self
1565                .storage_collections
1566                .collections_frontiers(storage_deps.collect())
1567                .expect("must exist");
1568            let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1569
1570            let mut frontiers = compute_frontiers.into_iter().chain(storage_frontiers);
1571            let frontiers_ready =
1572                frontiers.all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1573
1574            deps_scheduled && frontiers_ready
1575        };
1576
1577        if ready {
1578            self.send(ComputeCommand::Schedule(id));
1579            let collection = self.expect_collection_mut(id);
1580            collection.scheduled = true;
1581        }
1582    }
1583
1584    /// Schedule any unscheduled collections that are ready.
1585    fn schedule_collections(&mut self) {
1586        let ids: Vec<_> = self.collections.keys().copied().collect();
1587        for id in ids {
1588            self.maybe_schedule_collection(id);
1589        }
1590    }
1591
1592    /// Drops the read capability for the given collections and allows their resources to be
1593    /// reclaimed.
1594    #[mz_ore::instrument(level = "debug")]
1595    pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1596        for id in &ids {
1597            let collection = self.collection_mut(*id)?;
1598
1599            // Mark the collection as dropped to allow it to be removed from the controller state.
1600            collection.dropped = true;
1601
1602            // Drop the implied and warmup read holds to announce that clients are not
1603            // interested in the collection anymore.
1604            collection.implied_read_hold.release();
1605            collection.warmup_read_hold.release();
1606
1607            // If the collection is a subscribe, stop tracking it. This ensures that the controller
1608            // ceases to produce `SubscribeResponse`s for this subscribe.
1609            self.subscribes.remove(id);
1610            // If the collection is a copy to, stop tracking it. This ensures that the controller
1611            // ceases to produce `CopyToResponse`s` for this copy to.
1612            self.copy_tos.remove(id);
1613        }
1614
1615        Ok(())
1616    }
1617
1618    /// Initiate a peek request for the contents of `id` at `timestamp`.
1619    ///
1620    /// If this returns an error, then it didn't modify any `Instance` state.
1621    #[mz_ore::instrument(level = "debug")]
1622    pub fn peek(
1623        &mut self,
1624        peek_target: PeekTarget,
1625        literal_constraints: Option<Vec<Row>>,
1626        uuid: Uuid,
1627        timestamp: Timestamp,
1628        result_desc: RelationDesc,
1629        finishing: RowSetFinishing,
1630        map_filter_project: mz_expr::SafeMfpPlan,
1631        mut read_hold: ReadHold,
1632        target_replica: Option<ReplicaId>,
1633        peek_response_tx: oneshot::Sender<PeekResponse>,
1634    ) -> Result<(), PeekError> {
1635        use PeekError::*;
1636
1637        let target_id = peek_target.id();
1638
1639        // Downgrade the provided read hold to the peek time.
1640        if read_hold.id() != target_id {
1641            return Err(ReadHoldIdMismatch(read_hold.id()));
1642        }
1643        read_hold
1644            .try_downgrade(Antichain::from_elem(timestamp.clone()))
1645            .map_err(|_| ReadHoldInsufficient(target_id))?;
1646
1647        if let Some(target) = target_replica {
1648            if !self.replica_exists(target) {
1649                return Err(ReplicaMissing(target));
1650            }
1651        }
1652
1653        let otel_ctx = OpenTelemetryContext::obtain();
1654
1655        self.peeks.insert(
1656            uuid,
1657            PendingPeek {
1658                target_replica,
1659                // TODO(guswynn): can we just hold the `tracing::Span` here instead?
1660                otel_ctx: otel_ctx.clone(),
1661                requested_at: Instant::now(),
1662                read_hold,
1663                peek_response_tx,
1664                limit: finishing.limit.map(usize::cast_from),
1665                offset: finishing.offset,
1666            },
1667        );
1668
1669        let peek = Peek {
1670            literal_constraints,
1671            uuid,
1672            timestamp,
1673            finishing,
1674            map_filter_project,
1675            // Obtain an `OpenTelemetryContext` from the thread-local tracing
1676            // tree to forward it on to the compute worker.
1677            otel_ctx,
1678            target: peek_target,
1679            result_desc,
1680        };
1681        self.send(ComputeCommand::Peek(Box::new(peek)));
1682
1683        Ok(())
1684    }
1685
1686    /// Cancels an existing peek request.
1687    #[mz_ore::instrument(level = "debug")]
1688    pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1689        let Some(peek) = self.peeks.get_mut(&uuid) else {
1690            tracing::warn!("did not find pending peek for {uuid}");
1691            return;
1692        };
1693
1694        let duration = peek.requested_at.elapsed();
1695        self.metrics
1696            .observe_peek_response(&PeekResponse::Canceled, duration);
1697
1698        // Enqueue a notification for the cancellation.
1699        let otel_ctx = peek.otel_ctx.clone();
1700        otel_ctx.attach_as_parent();
1701
1702        self.deliver_response(ComputeControllerResponse::PeekNotification(
1703            uuid,
1704            PeekNotification::Canceled,
1705            otel_ctx,
1706        ));
1707
1708        // Finish the peek.
1709        // This will also propagate the cancellation to the replicas.
1710        self.finish_peek(uuid, reason);
1711    }
1712
1713    /// Assigns a read policy to specific identifiers.
1714    ///
1715    /// The policies are assigned in the order presented, and repeated identifiers should
1716    /// conclude with the last policy. Changing a policy will immediately downgrade the read
1717    /// capability if appropriate, but it will not "recover" the read capability if the prior
1718    /// capability is already ahead of it.
1719    ///
1720    /// Identifiers not present in `policies` retain their existing read policies.
1721    ///
1722    /// It is an error to attempt to set a read policy for a collection that is not readable in the
1723    /// context of compute. At this time, only indexes are readable compute collections.
1724    #[mz_ore::instrument(level = "debug")]
1725    pub fn set_read_policy(
1726        &mut self,
1727        policies: Vec<(GlobalId, ReadPolicy)>,
1728    ) -> Result<(), ReadPolicyError> {
1729        // Do error checking upfront, to avoid introducing inconsistencies between a collection's
1730        // `implied_capability` and `read_capabilities`.
1731        for (id, _policy) in &policies {
1732            let collection = self.collection(*id)?;
1733            if collection.read_policy.is_none() {
1734                return Err(ReadPolicyError::WriteOnlyCollection(*id));
1735            }
1736        }
1737
1738        for (id, new_policy) in policies {
1739            let collection = self.expect_collection_mut(id);
1740            let new_since = new_policy.frontier(collection.write_frontier().borrow());
1741            let _ = collection.implied_read_hold.try_downgrade(new_since);
1742            collection.read_policy = Some(new_policy);
1743        }
1744
1745        Ok(())
1746    }
1747
1748    /// Advance the global write frontier of the given collection.
1749    ///
1750    /// Frontier regressions are gracefully ignored.
1751    ///
1752    /// # Panics
1753    ///
1754    /// Panics if the identified collection does not exist.
1755    #[mz_ore::instrument(level = "debug")]
1756    fn maybe_update_global_write_frontier(
1757        &mut self,
1758        id: GlobalId,
1759        new_frontier: Antichain<Timestamp>,
1760    ) {
1761        let collection = self.expect_collection_mut(id);
1762
1763        let advanced = collection.shared.lock_write_frontier(|f| {
1764            let advanced = PartialOrder::less_than(f, &new_frontier);
1765            if advanced {
1766                f.clone_from(&new_frontier);
1767            }
1768            advanced
1769        });
1770
1771        if !advanced {
1772            return;
1773        }
1774
1775        // Relax the implied read hold according to the read policy.
1776        let new_since = match &collection.read_policy {
1777            Some(read_policy) => {
1778                // For readable collections the read frontier is determined by applying the
1779                // client-provided read policy to the write frontier.
1780                read_policy.frontier(new_frontier.borrow())
1781            }
1782            None => {
1783                // Write-only collections cannot be read within the context of the compute
1784                // controller, so their read frontier only controls the read holds taken on their
1785                // inputs. We can safely downgrade the input read holds to any time less than the
1786                // write frontier.
1787                //
1788                // Note that some write-only collections (continual tasks) need to observe changes
1789                // at their current write frontier during hydration. Thus, we cannot downgrade the
1790                // read frontier to the write frontier and instead step it back by one.
1791                Antichain::from_iter(
1792                    new_frontier
1793                        .iter()
1794                        .map(|t| t.step_back().unwrap_or(Timestamp::MIN)),
1795                )
1796            }
1797        };
1798        let _ = collection.implied_read_hold.try_downgrade(new_since);
1799
1800        // Report the frontier advancement.
1801        self.deliver_response(ComputeControllerResponse::FrontierUpper {
1802            id,
1803            upper: new_frontier,
1804        });
1805    }
1806
1807    /// Apply a collection read hold change.
1808    pub(super) fn apply_read_hold_change(
1809        &mut self,
1810        id: GlobalId,
1811        mut update: ChangeBatch<Timestamp>,
1812    ) {
1813        let Some(collection) = self.collections.get_mut(&id) else {
1814            soft_panic_or_log!(
1815                "read hold change for absent collection (id={id}, changes={update:?})"
1816            );
1817            return;
1818        };
1819
1820        let new_since = collection.shared.lock_read_capabilities(|caps| {
1821            // Sanity check to prevent corrupted `read_capabilities`, which can cause hard-to-debug
1822            // issues (usually stuck read frontiers).
1823            let read_frontier = caps.frontier();
1824            for (time, diff) in update.iter() {
1825                let count = caps.count_for(time) + diff;
1826                assert!(
1827                    count >= 0,
1828                    "invalid read capabilities update: negative capability \
1829             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1830                );
1831                assert!(
1832                    count == 0 || read_frontier.less_equal(time),
1833                    "invalid read capabilities update: frontier regression \
1834             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1835                );
1836            }
1837
1838            // Apply read capability updates and learn about resulting changes to the read
1839            // frontier.
1840            let changes = caps.update_iter(update.drain());
1841
1842            let changed = changes.count() > 0;
1843            changed.then(|| caps.frontier().to_owned())
1844        });
1845
1846        let Some(new_since) = new_since else {
1847            return; // read frontier did not change
1848        };
1849
1850        // Propagate read frontier update to dependencies.
1851        for read_hold in collection.compute_dependencies.values_mut() {
1852            read_hold
1853                .try_downgrade(new_since.clone())
1854                .expect("frontiers don't regress");
1855        }
1856        for read_hold in collection.storage_dependencies.values_mut() {
1857            read_hold
1858                .try_downgrade(new_since.clone())
1859                .expect("frontiers don't regress");
1860        }
1861
1862        // Produce `AllowCompaction` command.
1863        self.send(ComputeCommand::AllowCompaction {
1864            id,
1865            frontier: new_since,
1866        });
1867    }
1868
1869    /// Fulfills a registered peek and cleans up associated state.
1870    ///
1871    /// As part of this we:
1872    ///  * Send a `PeekResponse` through the peek's response channel.
1873    ///  * Emit a `CancelPeek` command to instruct replicas to stop spending resources on this
1874    ///    peek, and to allow the `ComputeCommandHistory` to reduce away the corresponding `Peek`
1875    ///    command.
1876    ///  * Remove the read hold for this peek, unblocking compaction that might have waited on it.
1877    fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1878        let Some(peek) = self.peeks.remove(&uuid) else {
1879            return;
1880        };
1881
1882        // The recipient might not be interested in the peek response anymore, which is fine.
1883        let _ = peek.peek_response_tx.send(response);
1884
1885        // NOTE: We need to send the `CancelPeek` command _before_ we release the peek's read hold
1886        // (by dropping it), to avoid the edge case that caused database-issues#4812.
1887        self.send(ComputeCommand::CancelPeek { uuid });
1888
1889        drop(peek.read_hold);
1890    }
1891
1892    /// Handles a response from a replica. Replica IDs are re-used across replica restarts, so we
1893    /// use the replica epoch to drop stale responses.
1894    fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse) {
1895        // Filter responses from non-existing or stale replicas.
1896        if self
1897            .replicas
1898            .get(&replica_id)
1899            .filter(|replica| replica.epoch == epoch)
1900            .is_none()
1901        {
1902            return;
1903        }
1904
1905        // Invariant: the replica exists and has the expected epoch.
1906
1907        match response {
1908            ComputeResponse::Frontiers(id, frontiers) => {
1909                self.handle_frontiers_response(id, frontiers, replica_id);
1910            }
1911            ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
1912                self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
1913            }
1914            ComputeResponse::CopyToResponse(id, response) => {
1915                self.handle_copy_to_response(id, response, replica_id);
1916            }
1917            ComputeResponse::SubscribeResponse(id, response) => {
1918                self.handle_subscribe_response(id, response, replica_id);
1919            }
1920            ComputeResponse::Status(response) => {
1921                self.handle_status_response(response, replica_id);
1922            }
1923        }
1924    }
1925
1926    /// Handle new frontiers, returning any compute response that needs to
1927    /// be sent to the client.
1928    fn handle_frontiers_response(
1929        &mut self,
1930        id: GlobalId,
1931        frontiers: FrontiersResponse,
1932        replica_id: ReplicaId,
1933    ) {
1934        if !self.collections.contains_key(&id) {
1935            soft_panic_or_log!(
1936                "frontiers update for an unknown collection \
1937                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1938            );
1939            return;
1940        }
1941        let Some(replica) = self.replicas.get_mut(&replica_id) else {
1942            soft_panic_or_log!(
1943                "frontiers update for an unknown replica \
1944                 (replica_id={replica_id}, frontiers={frontiers:?})"
1945            );
1946            return;
1947        };
1948        let Some(replica_collection) = replica.collections.get_mut(&id) else {
1949            soft_panic_or_log!(
1950                "frontiers update for an unknown replica collection \
1951                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1952            );
1953            return;
1954        };
1955
1956        if let Some(new_frontier) = frontiers.input_frontier {
1957            replica_collection.update_input_frontier(new_frontier.clone());
1958        }
1959        if let Some(new_frontier) = frontiers.output_frontier {
1960            replica_collection.update_output_frontier(new_frontier.clone());
1961        }
1962        if let Some(new_frontier) = frontiers.write_frontier {
1963            replica_collection.update_write_frontier(new_frontier.clone());
1964            self.maybe_update_global_write_frontier(id, new_frontier);
1965        }
1966    }
1967
1968    #[mz_ore::instrument(level = "debug")]
1969    fn handle_peek_response(
1970        &mut self,
1971        uuid: Uuid,
1972        response: PeekResponse,
1973        otel_ctx: OpenTelemetryContext,
1974        replica_id: ReplicaId,
1975    ) {
1976        otel_ctx.attach_as_parent();
1977
1978        // We might not be tracking this peek anymore, because we have served a response already or
1979        // because it was canceled. If this is the case, we ignore the response.
1980        let Some(peek) = self.peeks.get(&uuid) else {
1981            return;
1982        };
1983
1984        // If the peek is targeting a replica, ignore responses from other replicas.
1985        let target_replica = peek.target_replica.unwrap_or(replica_id);
1986        if target_replica != replica_id {
1987            return;
1988        }
1989
1990        let duration = peek.requested_at.elapsed();
1991        self.metrics.observe_peek_response(&response, duration);
1992
1993        let notification = PeekNotification::new(&response, peek.offset, peek.limit);
1994        // NOTE: We use the `otel_ctx` from the response, not the pending peek, because we
1995        // currently want the parent to be whatever the compute worker did with this peek.
1996        self.deliver_response(ComputeControllerResponse::PeekNotification(
1997            uuid,
1998            notification,
1999            otel_ctx,
2000        ));
2001
2002        self.finish_peek(uuid, response)
2003    }
2004
2005    fn handle_copy_to_response(
2006        &mut self,
2007        sink_id: GlobalId,
2008        response: CopyToResponse,
2009        replica_id: ReplicaId,
2010    ) {
2011        if !self.collections.contains_key(&sink_id) {
2012            soft_panic_or_log!(
2013                "received response for an unknown copy-to \
2014                 (sink_id={sink_id}, replica_id={replica_id})",
2015            );
2016            return;
2017        }
2018        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2019            soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
2020            return;
2021        };
2022        let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
2023            soft_panic_or_log!(
2024                "copy-to response for an unknown replica collection \
2025                 (sink_id={sink_id}, replica_id={replica_id})"
2026            );
2027            return;
2028        };
2029
2030        // Downgrade the replica frontiers, to enable dropping of input read holds and clean up of
2031        // collection state.
2032        // TODO(database-issues#4701): report copy-to frontiers through `Frontiers` responses
2033        replica_collection.update_write_frontier(Antichain::new());
2034        replica_collection.update_input_frontier(Antichain::new());
2035        replica_collection.update_output_frontier(Antichain::new());
2036
2037        // We might not be tracking this COPY TO because we have already returned a response
2038        // from one of the replicas. In that case, we ignore the response.
2039        if !self.copy_tos.remove(&sink_id) {
2040            return;
2041        }
2042
2043        let result = match response {
2044            CopyToResponse::RowCount(count) => Ok(count),
2045            CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
2046            // We should never get here: Replicas only drop copy to collections in response
2047            // to the controller allowing them to do so, and when the controller drops a
2048            // copy to it also removes it from the list of tracked copy_tos (see
2049            // [`Instance::drop_collections`]).
2050            CopyToResponse::Dropped => {
2051                tracing::error!(
2052                    %sink_id, %replica_id,
2053                    "received `Dropped` response for a tracked copy to",
2054                );
2055                return;
2056            }
2057        };
2058
2059        self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2060    }
2061
2062    fn handle_subscribe_response(
2063        &mut self,
2064        subscribe_id: GlobalId,
2065        response: SubscribeResponse,
2066        replica_id: ReplicaId,
2067    ) {
2068        if !self.collections.contains_key(&subscribe_id) {
2069            soft_panic_or_log!(
2070                "received response for an unknown subscribe \
2071                 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2072            );
2073            return;
2074        }
2075        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2076            soft_panic_or_log!(
2077                "subscribe response for an unknown replica (replica_id={replica_id})"
2078            );
2079            return;
2080        };
2081        let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2082            soft_panic_or_log!(
2083                "subscribe response for an unknown replica collection \
2084                 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2085            );
2086            return;
2087        };
2088
2089        // Always apply replica write frontier updates. Even if the subscribe is not tracked
2090        // anymore, there might still be replicas reading from its inputs, so we need to track the
2091        // frontiers until all replicas have advanced to the empty one.
2092        let write_frontier = match &response {
2093            SubscribeResponse::Batch(batch) => batch.upper.clone(),
2094            SubscribeResponse::DroppedAt(_) => Antichain::new(),
2095        };
2096
2097        // For subscribes we downgrade all replica frontiers based on write frontiers. This should
2098        // be fine because the input and output frontier of a subscribe track its write frontier.
2099        // TODO(database-issues#4701): report subscribe frontiers through `Frontiers` responses
2100        replica_collection.update_write_frontier(write_frontier.clone());
2101        replica_collection.update_input_frontier(write_frontier.clone());
2102        replica_collection.update_output_frontier(write_frontier.clone());
2103
2104        // If the subscribe is not tracked, or targets a different replica, there is nothing to do.
2105        let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2106            return;
2107        };
2108
2109        // Apply a global frontier update.
2110        // If this is a replica-targeted subscribe, it is important that we advance the global
2111        // frontier only based on responses from the targeted replica. Otherwise, another replica
2112        // could advance to the empty frontier, making us drop the subscribe on the targeted
2113        // replica prematurely.
2114        self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2115
2116        match response {
2117            SubscribeResponse::Batch(batch) => {
2118                let upper = batch.upper;
2119                let mut updates = batch.updates;
2120
2121                // If this batch advances the subscribe's frontier, we emit all updates at times
2122                // greater or equal to the last frontier (to avoid emitting duplicate updates).
2123                if PartialOrder::less_than(&subscribe.frontier, &upper) {
2124                    let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2125
2126                    if upper.is_empty() {
2127                        // This subscribe cannot produce more data. Stop tracking it.
2128                        self.subscribes.remove(&subscribe_id);
2129                    } else {
2130                        // This subscribe can produce more data. Update our tracking of it.
2131                        self.subscribes.insert(subscribe_id, subscribe);
2132                    }
2133
2134                    if let Ok(updates) = updates.as_mut() {
2135                        updates.retain_mut(|updates| {
2136                            let offset = updates.times().partition_point(|t| {
2137                                // True for times that are strictly less than lower (and should be skipped)
2138                                // and false otherwise.
2139                                !lower.less_equal(t)
2140                            });
2141                            let (_, past_lower) = std::mem::take(updates).split_at(offset);
2142                            *updates = past_lower;
2143                            updates.len() > 0
2144                        });
2145                    }
2146                    self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2147                        subscribe_id,
2148                        SubscribeBatch {
2149                            lower,
2150                            upper,
2151                            updates,
2152                        },
2153                    ));
2154                }
2155            }
2156            SubscribeResponse::DroppedAt(frontier) => {
2157                // We should never get here: Replicas only drop subscribe collections in response
2158                // to the controller allowing them to do so, and when the controller drops a
2159                // subscribe it also removes it from the list of tracked subscribes (see
2160                // [`Instance::drop_collections`]).
2161                tracing::error!(
2162                    %subscribe_id,
2163                    %replica_id,
2164                    frontier = ?frontier.elements(),
2165                    "received `DroppedAt` response for a tracked subscribe",
2166                );
2167                self.subscribes.remove(&subscribe_id);
2168            }
2169        }
2170    }
2171
2172    fn handle_status_response(&self, response: StatusResponse, _replica_id: ReplicaId) {
2173        match response {
2174            StatusResponse::Placeholder => {}
2175        }
2176    }
2177
2178    /// Return the write frontiers of the dependencies of the given collection.
2179    fn dependency_write_frontiers<'b>(
2180        &'b self,
2181        collection: &'b CollectionState,
2182    ) -> impl Iterator<Item = Antichain<Timestamp>> + 'b {
2183        let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2184            let collection = self.collections.get(&dep_id);
2185            collection.map(|c| c.write_frontier())
2186        });
2187        let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2188            let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2189            frontiers.map(|f| f.write_frontier)
2190        });
2191
2192        compute_frontiers.chain(storage_frontiers)
2193    }
2194
2195    /// Return the write frontiers of transitive storage dependencies of the given collection.
2196    fn transitive_storage_dependency_write_frontiers<'b>(
2197        &'b self,
2198        collection: &'b CollectionState,
2199    ) -> impl Iterator<Item = Antichain<Timestamp>> + 'b {
2200        let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2201        let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2202        let mut done = BTreeSet::new();
2203
2204        while let Some(id) = todo.pop() {
2205            if done.contains(&id) {
2206                continue;
2207            }
2208            if let Some(dep) = self.collections.get(&id) {
2209                storage_ids.extend(dep.storage_dependency_ids());
2210                todo.extend(dep.compute_dependency_ids())
2211            }
2212            done.insert(id);
2213        }
2214
2215        let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2216            let frontiers = self.storage_collections.collection_frontiers(id).ok();
2217            frontiers.map(|f| f.write_frontier)
2218        });
2219
2220        storage_frontiers
2221    }
2222
2223    /// Downgrade the warmup capabilities of collections as much as possible.
2224    ///
2225    /// The only requirement we have for a collection's warmup capability is that it is for a time
2226    /// that is available in all of the collection's inputs. For each input the latest time that is
2227    /// the case for is `write_frontier - 1`. So the farthest we can downgrade a collection's
2228    /// warmup capability is the minimum of `write_frontier - 1` of all its inputs.
2229    ///
2230    /// This method expects to be periodically called as part of instance maintenance work.
2231    /// We would like to instead update the warmup capabilities synchronously in response to
2232    /// frontier updates of dependency collections, but that is not generally possible because we
2233    /// don't learn about frontier updates of storage collections synchronously. We could do
2234    /// synchronous updates for compute dependencies, but we refrain from doing for simplicity.
2235    fn downgrade_warmup_capabilities(&mut self) {
2236        let mut new_capabilities = BTreeMap::new();
2237        for (id, collection) in &self.collections {
2238            // For write-only collections that have advanced to the empty frontier, we can drop the
2239            // warmup capability entirely. There is no reason why we would need to hydrate those
2240            // collections again, so being able to warm them up is not useful.
2241            if collection.read_policy.is_none()
2242                && collection.shared.lock_write_frontier(|f| f.is_empty())
2243            {
2244                new_capabilities.insert(*id, Antichain::new());
2245                continue;
2246            }
2247
2248            let mut new_capability = Antichain::new();
2249            for frontier in self.dependency_write_frontiers(collection) {
2250                for time in frontier {
2251                    new_capability.insert(time.step_back().unwrap_or(time));
2252                }
2253            }
2254
2255            new_capabilities.insert(*id, new_capability);
2256        }
2257
2258        for (id, new_capability) in new_capabilities {
2259            let collection = self.expect_collection_mut(id);
2260            let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2261        }
2262    }
2263
2264    /// Forward the implied capabilities of collections, if possible.
2265    ///
2266    /// The implied capability of a collection controls (a) which times are still readable (for
2267    /// indexes) and (b) with which as-of the collection gets installed on a new replica. We are
2268    /// usually not allowed to advance an implied capability beyond the frontier that follows from
2269    /// the collection's read policy applied to its write frontier:
2270    ///
2271    ///  * For sink collections, some external consumer might rely on seeing all distinct times in
2272    ///    the input reflected in the output. If we'd forward the implied capability of a sink,
2273    ///    we'd risk skipping times in the output across replica restarts.
2274    ///  * For index collections, we might make the index unreadable by advancing its read frontier
2275    ///    beyond its write frontier.
2276    ///
2277    /// There is one case where forwarding an implied capability is fine though: an index installed
2278    /// on a cluster that has no replicas. Such indexes are not readable anyway until a new replica
2279    /// is added, so advancing its read frontier can't make it unreadable. We can thus advance the
2280    /// implied capability as long as we make sure that when a new replica is added, the expected
2281    /// relationship between write frontier, read policy, and implied capability can be restored
2282    /// immediately (modulo computation time).
2283    ///
2284    /// Forwarding implied capabilities is not necessary for the correct functioning of the
2285    /// controller but an optimization that is beneficial in two ways:
2286    ///
2287    ///  * It relaxes read holds on inputs to forwarded collections, allowing their compaction.
2288    ///  * It reduces the amount of historical detail new replicas need to process when computing
2289    ///    forwarded collections, as forwarding the implied capability also forwards the corresponding
2290    ///    dataflow as-of.
2291    fn forward_implied_capabilities(&mut self) {
2292        if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2293            return;
2294        }
2295        if !self.replicas.is_empty() {
2296            return;
2297        }
2298
2299        let mut new_capabilities = BTreeMap::new();
2300        for (id, collection) in &self.collections {
2301            let Some(read_policy) = &collection.read_policy else {
2302                // Collection is write-only, i.e. a sink.
2303                continue;
2304            };
2305
2306            // When a new replica is started, it will immediately be able to compute all collection
2307            // output up to the write frontier of its transitive storage inputs. So the new implied
2308            // read capability should be the read policy applied to that frontier.
2309            let mut dep_frontier = Antichain::new();
2310            for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2311                dep_frontier.extend(frontier);
2312            }
2313
2314            let new_capability = read_policy.frontier(dep_frontier.borrow());
2315            if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2316                new_capabilities.insert(*id, new_capability);
2317            }
2318        }
2319
2320        for (id, new_capability) in new_capabilities {
2321            let collection = self.expect_collection_mut(id);
2322            let _ = collection.implied_read_hold.try_downgrade(new_capability);
2323        }
2324    }
2325
2326    /// Acquires a `ReadHold` for the identified compute collection.
2327    ///
2328    /// This mirrors the logic used by the controller-side `InstanceState::acquire_read_hold`,
2329    /// but executes on the instance task itself.
2330    pub(super) fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold, CollectionMissing> {
2331        // Similarly to InstanceState::acquire_read_hold and StorageCollections::acquire_read_holds,
2332        // we acquire read holds at the earliest possible time rather than returning a copy
2333        // of the implied read hold. This is so that dependents can acquire read holds on
2334        // compute dependencies at frontiers that are held back by other read holds the caller
2335        // has previously taken.
2336        let collection = self.collection(id)?;
2337        let since = collection.shared.lock_read_capabilities(|caps| {
2338            let since = caps.frontier().to_owned();
2339            caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
2340            since
2341        });
2342        let hold = ReadHold::new(id, since, Arc::clone(&self.read_hold_tx));
2343        Ok(hold)
2344    }
2345
2346    /// Process pending maintenance work.
2347    ///
2348    /// This method is invoked periodically by the global controller.
2349    /// It is a good place to perform maintenance work that arises from various controller state
2350    /// changes and that cannot conveniently be handled synchronously with those state changes.
2351    #[mz_ore::instrument(level = "debug")]
2352    pub fn maintain(&mut self) {
2353        self.rehydrate_failed_replicas();
2354        self.downgrade_warmup_capabilities();
2355        self.forward_implied_capabilities();
2356        self.schedule_collections();
2357        self.cleanup_collections();
2358        self.update_frontier_introspection();
2359        self.refresh_state_metrics();
2360        self.refresh_wallclock_lag();
2361    }
2362}
2363
2364/// State maintained about individual compute collections.
2365///
2366/// A compute collection is either an index, or a storage sink, or a subscribe, exported by a
2367/// compute dataflow.
2368#[derive(Debug)]
2369struct CollectionState {
2370    /// If set, this collection is only maintained by the specified replica.
2371    target_replica: Option<ReplicaId>,
2372    /// Whether this collection is a log collection.
2373    ///
2374    /// Log collections are special in that they are only maintained by a subset of all replicas.
2375    log_collection: bool,
2376    /// Whether this collection has been dropped by a controller client.
2377    ///
2378    /// The controller is allowed to remove the `CollectionState` for a collection only when
2379    /// `dropped == true`. Otherwise, clients might still expect to be able to query information
2380    /// about this collection.
2381    dropped: bool,
2382    /// Whether this collection has been scheduled, i.e., the controller has sent a `Schedule`
2383    /// command for it.
2384    scheduled: bool,
2385
2386    /// Whether this collection is in read-only mode.
2387    ///
2388    /// When in read-only mode, the dataflow is not allowed to affect external state (largely persist).
2389    read_only: bool,
2390
2391    /// State shared with the `ComputeController`.
2392    shared: SharedCollectionState,
2393
2394    /// A read hold maintaining the implicit capability of the collection.
2395    ///
2396    /// This capability is kept to ensure that the collection remains readable according to its
2397    /// `read_policy`. It also ensures that read holds on the collection's dependencies are kept at
2398    /// some time not greater than the collection's `write_frontier`, guaranteeing that the
2399    /// collection's next outputs can always be computed without skipping times.
2400    implied_read_hold: ReadHold,
2401    /// A read hold held to enable dataflow warmup.
2402    ///
2403    /// Dataflow warmup is an optimization that allows dataflows to immediately start hydrating
2404    /// even when their next output time (as implied by the `write_frontier`) is in the future.
2405    /// By installing a read capability derived from the write frontiers of the collection's
2406    /// inputs, we ensure that the as-of of new dataflows installed for the collection is at a time
2407    /// that is immediately available, so hydration can begin immediately too.
2408    warmup_read_hold: ReadHold,
2409    /// The policy to use to downgrade `self.implied_read_hold`.
2410    ///
2411    /// If `None`, the collection is a write-only collection (i.e. a sink). For write-only
2412    /// collections, the `implied_read_hold` is only required for maintaining read holds on the
2413    /// inputs, so we can immediately downgrade it to the `write_frontier`.
2414    read_policy: Option<ReadPolicy>,
2415
2416    /// Storage identifiers on which this collection depends, and read holds this collection
2417    /// requires on them.
2418    storage_dependencies: BTreeMap<GlobalId, ReadHold>,
2419    /// Compute identifiers on which this collection depends, and read holds this collection
2420    /// requires on them.
2421    compute_dependencies: BTreeMap<GlobalId, ReadHold>,
2422
2423    /// Introspection state associated with this collection.
2424    introspection: CollectionIntrospection,
2425
2426    /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
2427    /// introspection update.
2428    ///
2429    /// Keys are `(period, lag, labels)` triples, values are counts.
2430    ///
2431    /// If this is `None`, wallclock lag is not tracked for this collection.
2432    wallclock_lag_histogram_stash: Option<
2433        BTreeMap<
2434            (
2435                WallclockLagHistogramPeriod,
2436                WallclockLag,
2437                BTreeMap<&'static str, String>,
2438            ),
2439            Diff,
2440        >,
2441    >,
2442}
2443
2444impl CollectionState {
2445    /// Creates a new collection state, with an initial read policy valid from `since`.
2446    fn new(
2447        collection_id: GlobalId,
2448        as_of: Antichain<Timestamp>,
2449        shared: SharedCollectionState,
2450        storage_dependencies: BTreeMap<GlobalId, ReadHold>,
2451        compute_dependencies: BTreeMap<GlobalId, ReadHold>,
2452        read_hold_tx: read_holds::ChangeTx,
2453        introspection: CollectionIntrospection,
2454    ) -> Self {
2455        // A collection is not readable before the `as_of`.
2456        let since = as_of.clone();
2457        // A collection won't produce updates for times before the `as_of`.
2458        let upper = as_of;
2459
2460        // Ensure that the provided `shared` is valid for the given `as_of`.
2461        assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2462        assert!(shared.lock_write_frontier(|f| f == &upper));
2463
2464        // Initialize collection read holds.
2465        // Note that the implied read hold was already added to the `read_capabilities` when
2466        // `shared` was created, so we only need to add the warmup read hold here.
2467        let implied_read_hold =
2468            ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2469        let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2470
2471        let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2472        shared.lock_read_capabilities(|c| {
2473            c.update_iter(updates);
2474        });
2475
2476        // In an effort to keep the produced wallclock lag introspection data small and
2477        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
2478        // select indexes and subscribes.
2479        let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2480            true => None,
2481            false => Some(Default::default()),
2482        };
2483
2484        Self {
2485            target_replica: None,
2486            log_collection: false,
2487            dropped: false,
2488            scheduled: false,
2489            read_only: true,
2490            shared,
2491            implied_read_hold,
2492            warmup_read_hold,
2493            read_policy: Some(ReadPolicy::ValidFrom(since)),
2494            storage_dependencies,
2495            compute_dependencies,
2496            introspection,
2497            wallclock_lag_histogram_stash,
2498        }
2499    }
2500
2501    /// Creates a new collection state for a log collection.
2502    fn new_log_collection(
2503        id: GlobalId,
2504        shared: SharedCollectionState,
2505        read_hold_tx: read_holds::ChangeTx,
2506        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2507    ) -> Self {
2508        let since = Antichain::from_elem(Timestamp::MIN);
2509        let introspection = CollectionIntrospection::new(
2510            id,
2511            introspection_tx,
2512            since.clone(),
2513            false,
2514            None,
2515            None,
2516            Vec::new(),
2517        );
2518        let mut state = Self::new(
2519            id,
2520            since,
2521            shared,
2522            Default::default(),
2523            Default::default(),
2524            read_hold_tx,
2525            introspection,
2526        );
2527        state.log_collection = true;
2528        // Log collections are created and scheduled implicitly as part of replica initialization.
2529        state.scheduled = true;
2530        state
2531    }
2532
2533    /// Reports the current read frontier.
2534    fn read_frontier(&self) -> Antichain<Timestamp> {
2535        self.shared
2536            .lock_read_capabilities(|c| c.frontier().to_owned())
2537    }
2538
2539    /// Reports the current write frontier.
2540    fn write_frontier(&self) -> Antichain<Timestamp> {
2541        self.shared.lock_write_frontier(|f| f.clone())
2542    }
2543
2544    fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2545        self.storage_dependencies.keys().copied()
2546    }
2547
2548    fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2549        self.compute_dependencies.keys().copied()
2550    }
2551}
2552
2553/// Collection state shared with the `ComputeController`.
2554///
2555/// Having this allows certain controller APIs, such as `ComputeController::collection_frontiers`
2556/// and `ComputeController::acquire_read_hold` to be non-`async`. This comes at the cost of
2557/// complexity (by introducing shared mutable state) and performance (by introducing locking). We
2558/// should aim to reduce the amount of shared state over time, rather than expand it.
2559///
2560/// Note that [`SharedCollectionState`]s are initialized by the `ComputeController` prior to the
2561/// collection's creation in the [`Instance`]. This is to allow compute clients to query frontiers
2562/// and take new read holds immediately, without having to wait for the [`Instance`] to update.
2563#[derive(Clone, Debug)]
2564pub(super) struct SharedCollectionState {
2565    /// Accumulation of read capabilities for the collection.
2566    ///
2567    /// This accumulation contains the capabilities held by all [`ReadHold`]s given out for the
2568    /// collection, including `implied_read_hold` and `warmup_read_hold`.
2569    ///
2570    /// NOTE: This field may only be modified by [`Instance::apply_read_hold_change`],
2571    /// [`Instance::acquire_read_hold`], and `ComputeController::acquire_read_hold`.
2572    /// Nobody else should modify read capabilities directly. Instead, collection users should
2573    /// manage read holds through [`ReadHold`] objects acquired through
2574    /// `ComputeController::acquire_read_hold`.
2575    ///
2576    /// TODO(teskje): Restructure the code to enforce the above in the type system.
2577    read_capabilities: Arc<Mutex<MutableAntichain<Timestamp>>>,
2578    /// The write frontier of this collection.
2579    write_frontier: Arc<Mutex<Antichain<Timestamp>>>,
2580}
2581
2582impl SharedCollectionState {
2583    pub fn new(as_of: Antichain<Timestamp>) -> Self {
2584        // A collection is not readable before the `as_of`.
2585        let since = as_of.clone();
2586        // A collection won't produce updates for times before the `as_of`.
2587        let upper = as_of;
2588
2589        // Initialize read capabilities to the `since`.
2590        // The is the implied read capability. The corresponding [`ReadHold`] is created in
2591        // [`CollectionState::new`].
2592        let mut read_capabilities = MutableAntichain::new();
2593        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2594
2595        Self {
2596            read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2597            write_frontier: Arc::new(Mutex::new(upper)),
2598        }
2599    }
2600
2601    pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2602    where
2603        F: FnOnce(&mut MutableAntichain<Timestamp>) -> R,
2604    {
2605        let mut caps = self.read_capabilities.lock().expect("poisoned");
2606        f(&mut *caps)
2607    }
2608
2609    pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2610    where
2611        F: FnOnce(&mut Antichain<Timestamp>) -> R,
2612    {
2613        let mut frontier = self.write_frontier.lock().expect("poisoned");
2614        f(&mut *frontier)
2615    }
2616}
2617
2618/// Manages certain introspection relations associated with a collection. Upon creation, it adds
2619/// rows to introspection relations. When dropped, it retracts its managed rows.
2620#[derive(Debug)]
2621struct CollectionIntrospection {
2622    /// The ID of the compute collection.
2623    collection_id: GlobalId,
2624    /// A channel through which introspection updates are delivered.
2625    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2626    /// Introspection state for `IntrospectionType::Frontiers`.
2627    ///
2628    /// `Some` if the collection does _not_ sink into a storage collection (i.e. is not an MV). If
2629    /// the collection sinks into storage, the storage controller reports its frontiers instead.
2630    frontiers: Option<FrontiersIntrospectionState>,
2631    /// Introspection state for `IntrospectionType::ComputeMaterializedViewRefreshes`.
2632    ///
2633    /// `Some` if the collection is a REFRESH MV.
2634    refresh: Option<RefreshIntrospectionState>,
2635    /// The IDs of the collection's dependencies, for `IntrospectionType::ComputeDependencies`.
2636    dependency_ids: Vec<GlobalId>,
2637}
2638
2639impl CollectionIntrospection {
2640    fn new(
2641        collection_id: GlobalId,
2642        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2643        as_of: Antichain<Timestamp>,
2644        storage_sink: bool,
2645        initial_as_of: Option<Antichain<Timestamp>>,
2646        refresh_schedule: Option<RefreshSchedule>,
2647        dependency_ids: Vec<GlobalId>,
2648    ) -> Self {
2649        let refresh =
2650            match (refresh_schedule, initial_as_of) {
2651                (Some(refresh_schedule), Some(initial_as_of)) => Some(
2652                    RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2653                ),
2654                (refresh_schedule, _) => {
2655                    // If we have a `refresh_schedule`, then the collection is a MV, so we should also have
2656                    // an `initial_as_of`.
2657                    soft_assert_or_log!(
2658                        refresh_schedule.is_none(),
2659                        "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2660                    );
2661                    None
2662                }
2663            };
2664        let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2665
2666        let self_ = Self {
2667            collection_id,
2668            introspection_tx,
2669            frontiers,
2670            refresh,
2671            dependency_ids,
2672        };
2673
2674        self_.report_initial_state();
2675        self_
2676    }
2677
2678    /// Reports the initial introspection state.
2679    fn report_initial_state(&self) {
2680        if let Some(frontiers) = &self.frontiers {
2681            let row = frontiers.row_for_collection(self.collection_id);
2682            let updates = vec![(row, Diff::ONE)];
2683            self.send(IntrospectionType::Frontiers, updates);
2684        }
2685
2686        if let Some(refresh) = &self.refresh {
2687            let row = refresh.row_for_collection(self.collection_id);
2688            let updates = vec![(row, Diff::ONE)];
2689            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2690        }
2691
2692        if !self.dependency_ids.is_empty() {
2693            let updates = self.dependency_rows(Diff::ONE);
2694            self.send(IntrospectionType::ComputeDependencies, updates);
2695        }
2696    }
2697
2698    /// Produces rows for the `ComputeDependencies` introspection relation.
2699    fn dependency_rows(&self, diff: Diff) -> Vec<(Row, Diff)> {
2700        self.dependency_ids
2701            .iter()
2702            .map(|dependency_id| {
2703                let row = Row::pack_slice(&[
2704                    Datum::String(&self.collection_id.to_string()),
2705                    Datum::String(&dependency_id.to_string()),
2706                ]);
2707                (row, diff)
2708            })
2709            .collect()
2710    }
2711
2712    /// Observe the given current collection frontiers and update the introspection state as
2713    /// necessary.
2714    fn observe_frontiers(
2715        &mut self,
2716        read_frontier: &Antichain<Timestamp>,
2717        write_frontier: &Antichain<Timestamp>,
2718    ) {
2719        self.update_frontier_introspection(read_frontier, write_frontier);
2720        self.update_refresh_introspection(write_frontier);
2721    }
2722
2723    fn update_frontier_introspection(
2724        &mut self,
2725        read_frontier: &Antichain<Timestamp>,
2726        write_frontier: &Antichain<Timestamp>,
2727    ) {
2728        let Some(frontiers) = &mut self.frontiers else {
2729            return;
2730        };
2731
2732        if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2733        {
2734            return; // no change
2735        };
2736
2737        let retraction = frontiers.row_for_collection(self.collection_id);
2738        frontiers.update(read_frontier, write_frontier);
2739        let insertion = frontiers.row_for_collection(self.collection_id);
2740        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2741        self.send(IntrospectionType::Frontiers, updates);
2742    }
2743
2744    fn update_refresh_introspection(&mut self, write_frontier: &Antichain<Timestamp>) {
2745        let Some(refresh) = &mut self.refresh else {
2746            return;
2747        };
2748
2749        let retraction = refresh.row_for_collection(self.collection_id);
2750        refresh.frontier_update(write_frontier);
2751        let insertion = refresh.row_for_collection(self.collection_id);
2752
2753        if retraction == insertion {
2754            return; // no change
2755        }
2756
2757        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2758        self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2759    }
2760
2761    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2762        // Failure to send means the `ComputeController` has been dropped and doesn't care about
2763        // introspection updates anymore.
2764        let _ = self.introspection_tx.send((introspection_type, updates));
2765    }
2766}
2767
2768impl Drop for CollectionIntrospection {
2769    fn drop(&mut self) {
2770        // Retract collection frontiers.
2771        if let Some(frontiers) = &self.frontiers {
2772            let row = frontiers.row_for_collection(self.collection_id);
2773            let updates = vec![(row, Diff::MINUS_ONE)];
2774            self.send(IntrospectionType::Frontiers, updates);
2775        }
2776
2777        // Retract MV refresh state.
2778        if let Some(refresh) = &self.refresh {
2779            let retraction = refresh.row_for_collection(self.collection_id);
2780            let updates = vec![(retraction, Diff::MINUS_ONE)];
2781            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2782        }
2783
2784        // Retract collection dependencies.
2785        if !self.dependency_ids.is_empty() {
2786            let updates = self.dependency_rows(Diff::MINUS_ONE);
2787            self.send(IntrospectionType::ComputeDependencies, updates);
2788        }
2789    }
2790}
2791
2792#[derive(Debug)]
2793struct FrontiersIntrospectionState {
2794    read_frontier: Antichain<Timestamp>,
2795    write_frontier: Antichain<Timestamp>,
2796}
2797
2798impl FrontiersIntrospectionState {
2799    fn new(as_of: Antichain<Timestamp>) -> Self {
2800        Self {
2801            read_frontier: as_of.clone(),
2802            write_frontier: as_of,
2803        }
2804    }
2805
2806    /// Return a `Row` reflecting the current collection frontiers.
2807    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2808        let read_frontier = self
2809            .read_frontier
2810            .as_option()
2811            .map_or(Datum::Null, |ts| ts.clone().into());
2812        let write_frontier = self
2813            .write_frontier
2814            .as_option()
2815            .map_or(Datum::Null, |ts| ts.clone().into());
2816        Row::pack_slice(&[
2817            Datum::String(&collection_id.to_string()),
2818            read_frontier,
2819            write_frontier,
2820        ])
2821    }
2822
2823    /// Update the introspection state with the given new frontiers.
2824    fn update(
2825        &mut self,
2826        read_frontier: &Antichain<Timestamp>,
2827        write_frontier: &Antichain<Timestamp>,
2828    ) {
2829        if read_frontier != &self.read_frontier {
2830            self.read_frontier.clone_from(read_frontier);
2831        }
2832        if write_frontier != &self.write_frontier {
2833            self.write_frontier.clone_from(write_frontier);
2834        }
2835    }
2836}
2837
2838/// Information needed to compute introspection updates for a REFRESH materialized view when the
2839/// write frontier advances.
2840#[derive(Debug)]
2841struct RefreshIntrospectionState {
2842    // Immutable properties of the MV
2843    refresh_schedule: RefreshSchedule,
2844    initial_as_of: Antichain<Timestamp>,
2845    // Refresh state
2846    next_refresh: Datum<'static>,           // Null or an MzTimestamp
2847    last_completed_refresh: Datum<'static>, // Null or an MzTimestamp
2848}
2849
2850impl RefreshIntrospectionState {
2851    /// Return a `Row` reflecting the current refresh introspection state.
2852    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2853        Row::pack_slice(&[
2854            Datum::String(&collection_id.to_string()),
2855            self.last_completed_refresh,
2856            self.next_refresh,
2857        ])
2858    }
2859}
2860
2861impl RefreshIntrospectionState {
2862    /// Construct a new [`RefreshIntrospectionState`], and apply an initial `frontier_update()` at
2863    /// the `upper`.
2864    fn new(
2865        refresh_schedule: RefreshSchedule,
2866        initial_as_of: Antichain<Timestamp>,
2867        upper: &Antichain<Timestamp>,
2868    ) -> Self {
2869        let mut self_ = Self {
2870            refresh_schedule: refresh_schedule.clone(),
2871            initial_as_of: initial_as_of.clone(),
2872            next_refresh: Datum::Null,
2873            last_completed_refresh: Datum::Null,
2874        };
2875        self_.frontier_update(upper);
2876        self_
2877    }
2878
2879    /// Should be called whenever the write frontier of the collection advances. It updates the
2880    /// state that should be recorded in introspection relations, but doesn't send the updates yet.
2881    fn frontier_update(&mut self, write_frontier: &Antichain<Timestamp>) {
2882        if write_frontier.is_empty() {
2883            self.last_completed_refresh =
2884                if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2885                    last_refresh.into()
2886                } else {
2887                    // If there is no last refresh, then we have a `REFRESH EVERY`, in which case
2888                    // the saturating roundup puts a refresh at the maximum possible timestamp.
2889                    Timestamp::MAX.into()
2890                };
2891            self.next_refresh = Datum::Null;
2892        } else {
2893            if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2894                // We are before the first refresh.
2895                self.last_completed_refresh = Datum::Null;
2896                let initial_as_of = self.initial_as_of.as_option().expect(
2897                    "initial_as_of can't be [], because then there would be no refreshes at all",
2898                );
2899                let first_refresh = self
2900                    .refresh_schedule
2901                    .round_up_timestamp(*initial_as_of)
2902                    .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2903                soft_assert_or_log!(
2904                    first_refresh == *initial_as_of,
2905                    "initial_as_of should be set to the first refresh"
2906                );
2907                self.next_refresh = first_refresh.into();
2908            } else {
2909                // The first refresh has already happened.
2910                let write_frontier = write_frontier.as_option().expect("checked above");
2911                self.last_completed_refresh = self
2912                    .refresh_schedule
2913                    .round_down_timestamp_m1(*write_frontier)
2914                    .map_or_else(
2915                        || {
2916                            soft_panic_or_log!(
2917                                "rounding down should have returned the first refresh or later"
2918                            );
2919                            Datum::Null
2920                        },
2921                        |last_completed_refresh| last_completed_refresh.into(),
2922                    );
2923                self.next_refresh = write_frontier.clone().into();
2924            }
2925        }
2926    }
2927}
2928
2929/// A note of an outstanding peek response.
2930#[derive(Debug)]
2931struct PendingPeek {
2932    /// For replica-targeted peeks, this specifies the replica whose response we should pass on.
2933    ///
2934    /// If this value is `None`, we pass on the first response.
2935    target_replica: Option<ReplicaId>,
2936    /// The OpenTelemetry context for this peek.
2937    otel_ctx: OpenTelemetryContext,
2938    /// The time at which the peek was requested.
2939    ///
2940    /// Used to track peek durations.
2941    requested_at: Instant,
2942    /// The read hold installed to serve this peek.
2943    read_hold: ReadHold,
2944    /// The channel to send peek results.
2945    peek_response_tx: oneshot::Sender<PeekResponse>,
2946    /// An optional limit of the peek's result size.
2947    limit: Option<usize>,
2948    /// The offset into the peek's result.
2949    offset: usize,
2950}
2951
2952#[derive(Debug, Clone)]
2953struct ActiveSubscribe {
2954    /// Current upper frontier of this subscribe.
2955    frontier: Antichain<Timestamp>,
2956}
2957
2958impl Default for ActiveSubscribe {
2959    fn default() -> Self {
2960        Self {
2961            frontier: Antichain::from_elem(Timestamp::MIN),
2962        }
2963    }
2964}
2965
2966/// State maintained about individual replicas.
2967#[derive(Debug)]
2968struct ReplicaState {
2969    /// The ID of the replica.
2970    id: ReplicaId,
2971    /// Client for the running replica task.
2972    client: ReplicaClient,
2973    /// The replica configuration.
2974    config: ReplicaConfig,
2975    /// Replica metrics.
2976    metrics: ReplicaMetrics,
2977    /// A channel through which introspection updates are delivered.
2978    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2979    /// Per-replica collection state.
2980    collections: BTreeMap<GlobalId, ReplicaCollectionState>,
2981    /// The epoch of the replica.
2982    epoch: u64,
2983}
2984
2985impl ReplicaState {
2986    fn new(
2987        id: ReplicaId,
2988        client: ReplicaClient,
2989        config: ReplicaConfig,
2990        metrics: ReplicaMetrics,
2991        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2992        epoch: u64,
2993    ) -> Self {
2994        Self {
2995            id,
2996            client,
2997            config,
2998            metrics,
2999            introspection_tx,
3000            epoch,
3001            collections: Default::default(),
3002        }
3003    }
3004
3005    /// Add a collection to the replica state.
3006    ///
3007    /// # Panics
3008    ///
3009    /// Panics if a collection with the same ID exists already.
3010    fn add_collection(
3011        &mut self,
3012        id: GlobalId,
3013        as_of: Antichain<Timestamp>,
3014        input_read_holds: Vec<ReadHold>,
3015    ) {
3016        let metrics = self.metrics.for_collection(id);
3017        let introspection = ReplicaCollectionIntrospection::new(
3018            self.id,
3019            id,
3020            self.introspection_tx.clone(),
3021            as_of.clone(),
3022        );
3023        let mut state =
3024            ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
3025
3026        // In an effort to keep the produced wallclock lag introspection data small and
3027        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
3028        // select indexes and subscribes.
3029        if id.is_transient() {
3030            state.wallclock_lag_max = None;
3031        }
3032
3033        if let Some(previous) = self.collections.insert(id, state) {
3034            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
3035        }
3036    }
3037
3038    /// Remove state for a collection.
3039    fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState> {
3040        self.collections.remove(&id)
3041    }
3042
3043    /// Returns whether all replica frontiers of the given collection are empty.
3044    fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
3045        self.collections.get(&id).map_or(true, |c| {
3046            c.write_frontier.is_empty()
3047                && c.input_frontier.is_empty()
3048                && c.output_frontier.is_empty()
3049        })
3050    }
3051
3052    /// Returns the state of the [`ReplicaState`] formatted as JSON.
3053    ///
3054    /// The returned value is not guaranteed to be stable and may change at any point in time.
3055    #[mz_ore::instrument(level = "debug")]
3056    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3057        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
3058        // returned object as a tradeoff between usability and stability. `serde_json` will fail
3059        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
3060        // prevents a future unrelated change from silently breaking this method.
3061
3062        // Destructure `self` here so we don't forget to consider dumping newly added fields.
3063        let Self {
3064            id,
3065            client: _,
3066            config: _,
3067            metrics: _,
3068            introspection_tx: _,
3069            epoch,
3070            collections,
3071        } = self;
3072
3073        let collections: BTreeMap<_, _> = collections
3074            .iter()
3075            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
3076            .collect();
3077
3078        Ok(serde_json::json!({
3079            "id": id.to_string(),
3080            "collections": collections,
3081            "epoch": epoch,
3082        }))
3083    }
3084}
3085
3086#[derive(Debug)]
3087struct ReplicaCollectionState {
3088    /// The replica write frontier of this collection.
3089    ///
3090    /// See [`FrontiersResponse::write_frontier`].
3091    write_frontier: Antichain<Timestamp>,
3092    /// The replica input frontier of this collection.
3093    ///
3094    /// See [`FrontiersResponse::input_frontier`].
3095    input_frontier: Antichain<Timestamp>,
3096    /// The replica output frontier of this collection.
3097    ///
3098    /// See [`FrontiersResponse::output_frontier`].
3099    output_frontier: Antichain<Timestamp>,
3100
3101    /// Metrics tracked for this collection.
3102    ///
3103    /// If this is `None`, no metrics are collected.
3104    metrics: Option<ReplicaCollectionMetrics>,
3105    /// As-of frontier with which this collection was installed on the replica.
3106    as_of: Antichain<Timestamp>,
3107    /// Tracks introspection state for this collection.
3108    introspection: ReplicaCollectionIntrospection,
3109    /// Read holds on storage inputs to this collection.
3110    ///
3111    /// These read holds are kept to ensure that the replica is able to read from storage inputs at
3112    /// all times it hasn't read yet. We only need to install read holds for storage inputs since
3113    /// compaction of compute inputs is implicitly held back by Timely/DD.
3114    input_read_holds: Vec<ReadHold>,
3115
3116    /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
3117    ///
3118    /// If this is `None`, wallclock lag is not tracked for this collection.
3119    wallclock_lag_max: Option<WallclockLag>,
3120}
3121
3122impl ReplicaCollectionState {
3123    fn new(
3124        metrics: Option<ReplicaCollectionMetrics>,
3125        as_of: Antichain<Timestamp>,
3126        introspection: ReplicaCollectionIntrospection,
3127        input_read_holds: Vec<ReadHold>,
3128    ) -> Self {
3129        Self {
3130            write_frontier: as_of.clone(),
3131            input_frontier: as_of.clone(),
3132            output_frontier: as_of.clone(),
3133            metrics,
3134            as_of,
3135            introspection,
3136            input_read_holds,
3137            wallclock_lag_max: Some(WallclockLag::MIN),
3138        }
3139    }
3140
3141    /// Returns whether this collection is hydrated.
3142    fn hydrated(&self) -> bool {
3143        // If the observed frontier is greater than the collection's as-of, the collection has
3144        // produced some output and is therefore hydrated.
3145        //
3146        // We need to consider the edge case where the as-of is the empty frontier. Such an as-of
3147        // is not useful for indexes, because they wouldn't be readable. For write-only
3148        // collections, an empty as-of means that the collection has been fully written and no new
3149        // dataflow needs to be created for it. Consequently, no hydration will happen either.
3150        //
3151        // Based on this, we could respond in two ways:
3152        //  * `false`, as in "the dataflow was never created"
3153        //  * `true`, as in "the dataflow completed immediately"
3154        //
3155        // Since hydration is often used as a measure of dataflow progress and we don't want to
3156        // give the impression that certain dataflows are somehow stuck when they are not, we go
3157        // with the second interpretation here.
3158        self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3159    }
3160
3161    /// Updates the replica write frontier of this collection.
3162    fn update_write_frontier(&mut self, new_frontier: Antichain<Timestamp>) {
3163        if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3164            soft_panic_or_log!(
3165                "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3166                self.write_frontier,
3167            );
3168            return;
3169        } else if new_frontier == self.write_frontier {
3170            return;
3171        }
3172
3173        self.write_frontier = new_frontier;
3174    }
3175
3176    /// Updates the replica input frontier of this collection.
3177    fn update_input_frontier(&mut self, new_frontier: Antichain<Timestamp>) {
3178        if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3179            soft_panic_or_log!(
3180                "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3181                self.input_frontier,
3182            );
3183            return;
3184        } else if new_frontier == self.input_frontier {
3185            return;
3186        }
3187
3188        self.input_frontier = new_frontier;
3189
3190        // Relax our read holds on collection inputs.
3191        for read_hold in &mut self.input_read_holds {
3192            let result = read_hold.try_downgrade(self.input_frontier.clone());
3193            soft_assert_or_log!(
3194                result.is_ok(),
3195                "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3196                self.input_frontier,
3197            );
3198        }
3199    }
3200
3201    /// Updates the replica output frontier of this collection.
3202    fn update_output_frontier(&mut self, new_frontier: Antichain<Timestamp>) {
3203        if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3204            soft_panic_or_log!(
3205                "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3206                self.output_frontier,
3207            );
3208            return;
3209        } else if new_frontier == self.output_frontier {
3210            return;
3211        }
3212
3213        self.output_frontier = new_frontier;
3214    }
3215}
3216
3217/// Maintains the introspection state for a given replica and collection, and ensures that reported
3218/// introspection data is retracted when the collection is dropped.
3219#[derive(Debug)]
3220struct ReplicaCollectionIntrospection {
3221    /// The ID of the replica.
3222    replica_id: ReplicaId,
3223    /// The ID of the compute collection.
3224    collection_id: GlobalId,
3225    /// The collection's reported replica write frontier.
3226    write_frontier: Antichain<Timestamp>,
3227    /// A channel through which introspection updates are delivered.
3228    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3229}
3230
3231impl ReplicaCollectionIntrospection {
3232    /// Create a new `HydrationState` and initialize introspection.
3233    fn new(
3234        replica_id: ReplicaId,
3235        collection_id: GlobalId,
3236        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3237        as_of: Antichain<Timestamp>,
3238    ) -> Self {
3239        let self_ = Self {
3240            replica_id,
3241            collection_id,
3242            write_frontier: as_of,
3243            introspection_tx,
3244        };
3245
3246        self_.report_initial_state();
3247        self_
3248    }
3249
3250    /// Reports the initial introspection state.
3251    fn report_initial_state(&self) {
3252        let row = self.write_frontier_row();
3253        let updates = vec![(row, Diff::ONE)];
3254        self.send(IntrospectionType::ReplicaFrontiers, updates);
3255    }
3256
3257    /// Observe the given current write frontier and update the introspection state as necessary.
3258    fn observe_frontier(&mut self, write_frontier: &Antichain<Timestamp>) {
3259        if self.write_frontier == *write_frontier {
3260            return; // no change
3261        }
3262
3263        let retraction = self.write_frontier_row();
3264        self.write_frontier.clone_from(write_frontier);
3265        let insertion = self.write_frontier_row();
3266
3267        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3268        self.send(IntrospectionType::ReplicaFrontiers, updates);
3269    }
3270
3271    /// Return a `Row` reflecting the current replica write frontier.
3272    fn write_frontier_row(&self) -> Row {
3273        let write_frontier = self
3274            .write_frontier
3275            .as_option()
3276            .map_or(Datum::Null, |ts| ts.clone().into());
3277        Row::pack_slice(&[
3278            Datum::String(&self.collection_id.to_string()),
3279            Datum::String(&self.replica_id.to_string()),
3280            write_frontier,
3281        ])
3282    }
3283
3284    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3285        // Failure to send means the `ComputeController` has been dropped and doesn't care about
3286        // introspection updates anymore.
3287        let _ = self.introspection_tx.send((introspection_type, updates));
3288    }
3289}
3290
3291impl Drop for ReplicaCollectionIntrospection {
3292    fn drop(&mut self) {
3293        // Retract the write frontier.
3294        let row = self.write_frontier_row();
3295        let updates = vec![(row, Diff::MINUS_ONE)];
3296        self.send(IntrospectionType::ReplicaFrontiers, updates);
3297    }
3298}