Skip to main content

mz_compute_client/controller/
instance.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller for a compute instance.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
21use mz_compute_types::plan::render_plan::RenderPlan;
22use mz_compute_types::sinks::{
23    ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, MaterializedViewSinkConnection,
24};
25use mz_compute_types::sources::SourceInstanceDesc;
26use mz_controller_types::dyncfgs::{
27    ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, WALLCLOCK_LAG_RECORDING_INTERVAL,
28};
29use mz_dyncfg::ConfigSet;
30use mz_expr::RowSetFinishing;
31use mz_ore::cast::CastFrom;
32use mz_ore::channel::instrumented_unbounded_channel;
33use mz_ore::now::NowFn;
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{soft_assert_or_log, soft_panic_or_log};
36use mz_persist_types::PersistLocation;
37use mz_repr::adt::timestamp::CheckedTimestamp;
38use mz_repr::refresh_schedule::RefreshSchedule;
39use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row};
40use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
41use mz_storage_types::read_holds::{self, ReadHold};
42use mz_storage_types::read_policy::ReadPolicy;
43use thiserror::Error;
44use timely::PartialOrder;
45use timely::progress::frontier::MutableAntichain;
46use timely::progress::{Antichain, ChangeBatch, Timestamp};
47use tokio::sync::{mpsc, oneshot};
48use uuid::Uuid;
49
50use crate::controller::error::{
51    CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
52};
53use crate::controller::instance_client::PeekError;
54use crate::controller::replica::{ReplicaClient, ReplicaConfig};
55use crate::controller::{
56    ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, PeekNotification,
57    ReplicaId, StorageCollections,
58};
59use crate::logging::LogVariant;
60use crate::metrics::IntCounter;
61use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
62use crate::protocol::command::{
63    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
64};
65use crate::protocol::history::ComputeCommandHistory;
66use crate::protocol::response::{
67    ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StatusResponse,
68    SubscribeBatch, SubscribeResponse,
69};
70
71#[derive(Error, Debug)]
72#[error("replica exists already: {0}")]
73pub(super) struct ReplicaExists(pub ReplicaId);
74
75#[derive(Error, Debug)]
76#[error("replica does not exist: {0}")]
77pub(super) struct ReplicaMissing(pub ReplicaId);
78
79#[derive(Error, Debug)]
80pub(super) enum DataflowCreationError {
81    #[error("collection does not exist: {0}")]
82    CollectionMissing(GlobalId),
83    #[error("replica does not exist: {0}")]
84    ReplicaMissing(ReplicaId),
85    #[error("dataflow definition lacks an as_of value")]
86    MissingAsOf,
87    #[error("subscribe dataflow has an empty as_of")]
88    EmptyAsOfForSubscribe,
89    #[error("copy to dataflow has an empty as_of")]
90    EmptyAsOfForCopyTo,
91    #[error("no read hold provided for dataflow import: {0}")]
92    ReadHoldMissing(GlobalId),
93    #[error("insufficient read hold provided for dataflow import: {0}")]
94    ReadHoldInsufficient(GlobalId),
95}
96
97impl From<CollectionMissing> for DataflowCreationError {
98    fn from(error: CollectionMissing) -> Self {
99        Self::CollectionMissing(error.0)
100    }
101}
102
103#[derive(Error, Debug)]
104pub(super) enum ReadPolicyError {
105    #[error("collection does not exist: {0}")]
106    CollectionMissing(GlobalId),
107    #[error("collection is write-only: {0}")]
108    WriteOnlyCollection(GlobalId),
109}
110
111impl From<CollectionMissing> for ReadPolicyError {
112    fn from(error: CollectionMissing) -> Self {
113        Self::CollectionMissing(error.0)
114    }
115}
116
117/// A command sent to an [`Instance`] task.
118pub(super) type Command<T> = Box<dyn FnOnce(&mut Instance<T>) + Send>;
119
120/// A response from a replica, composed of a replica ID, the replica's current epoch, and the
121/// compute response itself.
122pub(super) type ReplicaResponse<T> = (ReplicaId, u64, ComputeResponse<T>);
123
124/// The state we keep for a compute instance.
125pub(super) struct Instance<T: ComputeControllerTimestamp> {
126    /// Build info for spawning replicas
127    build_info: &'static BuildInfo,
128    /// A handle providing access to storage collections.
129    storage_collections: StorageCollections<T>,
130    /// Whether instance initialization has been completed.
131    initialized: bool,
132    /// Whether this instance is in read-only mode.
133    ///
134    /// When in read-only mode, this instance will not update persistent state, such as
135    /// wallclock lag introspection.
136    read_only: bool,
137    /// The workload class of this instance.
138    ///
139    /// This is currently only used to annotate metrics.
140    workload_class: Option<String>,
141    /// The replicas of this compute instance.
142    replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
143    /// Currently installed compute collections.
144    ///
145    /// New entries are added for all collections exported from dataflows created through
146    /// [`Instance::create_dataflow`].
147    ///
148    /// Entries are removed by [`Instance::cleanup_collections`]. See that method's documentation
149    /// about the conditions for removing collection state.
150    collections: BTreeMap<GlobalId, CollectionState<T>>,
151    /// IDs of log sources maintained by this compute instance.
152    log_sources: BTreeMap<LogVariant, GlobalId>,
153    /// Currently outstanding peeks.
154    ///
155    /// New entries are added for all peeks initiated through [`Instance::peek`].
156    ///
157    /// The entry for a peek is only removed once all replicas have responded to the peek. This is
158    /// currently required to ensure all replicas have stopped reading from the peeked collection's
159    /// inputs before we allow them to compact. database-issues#4822 tracks changing this so we only have to wait
160    /// for the first peek response.
161    peeks: BTreeMap<Uuid, PendingPeek<T>>,
162    /// Currently in-progress subscribes.
163    ///
164    /// New entries are added for all subscribes exported from dataflows created through
165    /// [`Instance::create_dataflow`].
166    ///
167    /// The entry for a subscribe is removed once at least one replica has reported the subscribe
168    /// to have advanced to the empty frontier or to have been dropped, implying that no further
169    /// updates will be emitted for this subscribe.
170    ///
171    /// Note that subscribes are tracked both in `collections` and `subscribes`. `collections`
172    /// keeps track of the subscribe's upper and since frontiers and ensures appropriate read holds
173    /// on the subscribe's input. `subscribes` is only used to track which updates have been
174    /// emitted, to decide if new ones should be emitted or suppressed.
175    subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>,
176    /// Tracks all in-progress COPY TOs.
177    ///
178    /// New entries are added for all s3 oneshot sinks (corresponding to a COPY TO) exported from
179    /// dataflows created through [`Instance::create_dataflow`].
180    ///
181    /// The entry for a copy to is removed once at least one replica has finished
182    /// or the exporting collection is dropped.
183    copy_tos: BTreeSet<GlobalId>,
184    /// The command history, used when introducing new replicas or restarting existing replicas.
185    history: ComputeCommandHistory<UIntGauge, T>,
186    /// Receiver for commands to be executed.
187    command_rx: mpsc::UnboundedReceiver<Command<T>>,
188    /// Sender for responses to be delivered.
189    response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
190    /// Sender for introspection updates to be recorded.
191    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
192    /// The registry the controller uses to report metrics.
193    metrics: InstanceMetrics,
194    /// Dynamic system configuration.
195    dyncfg: Arc<ConfigSet>,
196
197    /// The persist location where we can stash large peek results.
198    peek_stash_persist_location: PersistLocation,
199
200    /// A function that produces the current wallclock time.
201    now: NowFn,
202    /// A function that computes the lag between the given time and wallclock time.
203    wallclock_lag: WallclockLagFn<T>,
204    /// The last time wallclock lag introspection was recorded.
205    wallclock_lag_last_recorded: DateTime<Utc>,
206
207    /// Sender for updates to collection read holds.
208    ///
209    /// Copies of this sender are given to [`ReadHold`]s that are created in
210    /// [`CollectionState::new`].
211    read_hold_tx: read_holds::ChangeTx<T>,
212    /// A sender for responses from replicas.
213    replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
214    /// A receiver for responses from replicas.
215    replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse<T>, IntCounter>,
216}
217
218impl<T: ComputeControllerTimestamp> Instance<T> {
219    /// Acquire a handle to the collection state associated with `id`.
220    fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, CollectionMissing> {
221        self.collections.get(&id).ok_or(CollectionMissing(id))
222    }
223
224    /// Acquire a mutable handle to the collection state associated with `id`.
225    fn collection_mut(
226        &mut self,
227        id: GlobalId,
228    ) -> Result<&mut CollectionState<T>, CollectionMissing> {
229        self.collections.get_mut(&id).ok_or(CollectionMissing(id))
230    }
231
232    /// Acquire a handle to the collection state associated with `id`.
233    ///
234    /// # Panics
235    ///
236    /// Panics if the identified collection does not exist.
237    fn expect_collection(&self, id: GlobalId) -> &CollectionState<T> {
238        self.collections.get(&id).expect("collection must exist")
239    }
240
241    /// Acquire a mutable handle to the collection state associated with `id`.
242    ///
243    /// # Panics
244    ///
245    /// Panics if the identified collection does not exist.
246    fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T> {
247        self.collections
248            .get_mut(&id)
249            .expect("collection must exist")
250    }
251
252    fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState<T>)> {
253        self.collections.iter().map(|(id, coll)| (*id, coll))
254    }
255
256    /// Returns an iterator over replicas that host the given collection.
257    ///
258    /// For replica-targeted collections, this returns only the target replica.
259    /// For non-targeted collections, this returns all replicas.
260    ///
261    /// Returns `Err` if the collection does not exist.
262    fn replicas_hosting(
263        &self,
264        id: GlobalId,
265    ) -> Result<impl Iterator<Item = &ReplicaState<T>>, CollectionMissing> {
266        let target = self.collection(id)?.target_replica;
267        Ok(self
268            .replicas
269            .values()
270            .filter(move |r| target.map_or(true, |t| t == r.id)))
271    }
272
273    /// Add a collection to the instance state.
274    ///
275    /// # Panics
276    ///
277    /// Panics if a collection with the same ID exists already.
278    fn add_collection(
279        &mut self,
280        id: GlobalId,
281        as_of: Antichain<T>,
282        shared: SharedCollectionState<T>,
283        storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
284        compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
285        replica_input_read_holds: Vec<ReadHold<T>>,
286        write_only: bool,
287        storage_sink: bool,
288        initial_as_of: Option<Antichain<T>>,
289        refresh_schedule: Option<RefreshSchedule>,
290        target_replica: Option<ReplicaId>,
291    ) {
292        // Add global collection state.
293        let dependency_ids: Vec<GlobalId> = compute_dependencies
294            .keys()
295            .chain(storage_dependencies.keys())
296            .copied()
297            .collect();
298        let introspection = CollectionIntrospection::new(
299            id,
300            self.introspection_tx.clone(),
301            as_of.clone(),
302            storage_sink,
303            initial_as_of,
304            refresh_schedule,
305            dependency_ids,
306        );
307        let mut state = CollectionState::new(
308            id,
309            as_of.clone(),
310            shared,
311            storage_dependencies,
312            compute_dependencies,
313            Arc::clone(&self.read_hold_tx),
314            introspection,
315        );
316        state.target_replica = target_replica;
317        // If the collection is write-only, clear its read policy to reflect that.
318        if write_only {
319            state.read_policy = None;
320        }
321
322        if let Some(previous) = self.collections.insert(id, state) {
323            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
324        }
325
326        // Add per-replica collection state.
327        for replica in self.replicas.values_mut() {
328            if target_replica.is_some_and(|id| id != replica.id) {
329                continue;
330            }
331            replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
332        }
333    }
334
335    fn remove_collection(&mut self, id: GlobalId) {
336        // Remove per-replica collection state.
337        for replica in self.replicas.values_mut() {
338            replica.remove_collection(id);
339        }
340
341        // Remove global collection state.
342        self.collections.remove(&id);
343    }
344
345    fn add_replica_state(
346        &mut self,
347        id: ReplicaId,
348        client: ReplicaClient<T>,
349        config: ReplicaConfig,
350        epoch: u64,
351    ) {
352        let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
353
354        let metrics = self.metrics.for_replica(id);
355        let mut replica = ReplicaState::new(
356            id,
357            client,
358            config,
359            metrics,
360            self.introspection_tx.clone(),
361            epoch,
362        );
363
364        // Add per-replica collection state.
365        for (collection_id, collection) in &self.collections {
366            // Skip log collections not maintained by this replica,
367            // and collections targeted at a different replica.
368            if (collection.log_collection && !log_ids.contains(collection_id))
369                || collection.target_replica.is_some_and(|rid| rid != id)
370            {
371                continue;
372            }
373
374            let as_of = if collection.log_collection {
375                // For log collections, we don't send a `CreateDataflow` command to the replica, so
376                // it doesn't know which as-of the controler chose and defaults to the minimum
377                // frontier instead. We need to initialize the controller-side tracking with the
378                // same frontier, to avoid observing regressions in the reported frontiers.
379                Antichain::from_elem(T::minimum())
380            } else {
381                collection.read_frontier().to_owned()
382            };
383
384            let input_read_holds = collection.storage_dependencies.values().cloned().collect();
385            replica.add_collection(*collection_id, as_of, input_read_holds);
386        }
387
388        self.replicas.insert(id, replica);
389    }
390
391    /// Enqueue the given response for delivery to the controller clients.
392    fn deliver_response(&self, response: ComputeControllerResponse<T>) {
393        // Failure to send means the `ComputeController` has been dropped and doesn't care about
394        // responses anymore.
395        let _ = self.response_tx.send(response);
396    }
397
398    /// Enqueue the given introspection updates for recording.
399    fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
400        // Failure to send means the `ComputeController` has been dropped and doesn't care about
401        // introspection updates anymore.
402        let _ = self.introspection_tx.send((type_, updates));
403    }
404
405    /// Returns whether the identified replica exists.
406    fn replica_exists(&self, id: ReplicaId) -> bool {
407        self.replicas.contains_key(&id)
408    }
409
410    /// Return the IDs of pending peeks targeting the specified replica.
411    fn peeks_targeting(
412        &self,
413        replica_id: ReplicaId,
414    ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)> {
415        self.peeks.iter().filter_map(move |(uuid, peek)| {
416            if peek.target_replica == Some(replica_id) {
417                Some((*uuid, peek))
418            } else {
419                None
420            }
421        })
422    }
423
424    /// Return the IDs of in-progress subscribes targeting the specified replica.
425    fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
426        self.subscribes.keys().copied().filter(move |id| {
427            let collection = self.expect_collection(*id);
428            collection.target_replica == Some(replica_id)
429        })
430    }
431
432    /// Update introspection with the current collection frontiers.
433    ///
434    /// We could also do this directly in response to frontier changes, but doing it periodically
435    /// lets us avoid emitting some introspection updates that can be consolidated (e.g. a write
436    /// frontier updated immediately followed by a read frontier update).
437    ///
438    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
439    /// per second during normal operation.
440    fn update_frontier_introspection(&mut self) {
441        for collection in self.collections.values_mut() {
442            collection
443                .introspection
444                .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
445        }
446
447        for replica in self.replicas.values_mut() {
448            for collection in replica.collections.values_mut() {
449                collection
450                    .introspection
451                    .observe_frontier(&collection.write_frontier);
452            }
453        }
454    }
455
456    /// Refresh the controller state metrics for this instance.
457    ///
458    /// We could also do state metric updates directly in response to state changes, but that would
459    /// mean littering the code with metric update calls. Encapsulating state metric maintenance in
460    /// a single method is less noisy.
461    ///
462    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
463    /// per second during normal operation.
464    fn refresh_state_metrics(&self) {
465        let unscheduled_collections_count =
466            self.collections.values().filter(|c| !c.scheduled).count();
467        let connected_replica_count = self
468            .replicas
469            .values()
470            .filter(|r| r.client.is_connected())
471            .count();
472
473        self.metrics
474            .replica_count
475            .set(u64::cast_from(self.replicas.len()));
476        self.metrics
477            .collection_count
478            .set(u64::cast_from(self.collections.len()));
479        self.metrics
480            .collection_unscheduled_count
481            .set(u64::cast_from(unscheduled_collections_count));
482        self.metrics
483            .peek_count
484            .set(u64::cast_from(self.peeks.len()));
485        self.metrics
486            .subscribe_count
487            .set(u64::cast_from(self.subscribes.len()));
488        self.metrics
489            .copy_to_count
490            .set(u64::cast_from(self.copy_tos.len()));
491        self.metrics
492            .connected_replica_count
493            .set(u64::cast_from(connected_replica_count));
494    }
495
496    /// Refresh the wallclock lag introspection and metrics with the current lag values.
497    ///
498    /// This method produces wallclock lag metrics of two different shapes:
499    ///
500    /// * Histories: For each replica and each collection, we measure the lag of the write frontier
501    ///   behind the wallclock time every second. Every minute we emit the maximum lag observed
502    ///   over the last minute, together with the current time.
503    /// * Histograms: For each collection, we measure the lag of the write frontier behind
504    ///   wallclock time every second. Every minute we emit all lags observed over the last minute,
505    ///   together with the current histogram period.
506    ///
507    /// Histories are emitted to both Mz introspection and Prometheus, histograms only to
508    /// introspection. We treat lags of unreadable collections (i.e. collections that contain no
509    /// readable times) as undefined and set them to NULL in introspection and `u64::MAX` in
510    /// Prometheus.
511    ///
512    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
513    /// per second during normal operation.
514    fn refresh_wallclock_lag(&mut self) {
515        let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
516            Some(ts) => (self.wallclock_lag)(ts.clone()),
517            None => Duration::ZERO,
518        };
519
520        let now_ms = (self.now)();
521        let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
522        let histogram_labels = match &self.workload_class {
523            Some(wc) => [("workload_class", wc.clone())].into(),
524            None => BTreeMap::new(),
525        };
526
527        // For collections that sink into storage, we need to ask the storage controller to know
528        // whether they're currently readable.
529        let readable_storage_collections: BTreeSet<_> = self
530            .collections
531            .keys()
532            .filter_map(|id| {
533                let frontiers = self.storage_collections.collection_frontiers(*id).ok()?;
534                PartialOrder::less_than(&frontiers.read_capabilities, &frontiers.write_frontier)
535                    .then_some(*id)
536            })
537            .collect();
538
539        // First, iterate over all collections and collect histogram measurements.
540        for (id, collection) in &mut self.collections {
541            let write_frontier = collection.write_frontier();
542            let readable = if self.storage_collections.check_exists(*id).is_ok() {
543                readable_storage_collections.contains(id)
544            } else {
545                PartialOrder::less_than(&collection.read_frontier(), &write_frontier)
546            };
547
548            if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
549                let bucket = if readable {
550                    let lag = frontier_lag(&write_frontier);
551                    let lag = lag.as_secs().next_power_of_two();
552                    WallclockLag::Seconds(lag)
553                } else {
554                    WallclockLag::Undefined
555                };
556
557                let key = (histogram_period, bucket, histogram_labels.clone());
558                *stash.entry(key).or_default() += Diff::ONE;
559            }
560        }
561
562        // Second, iterate over all per-replica collections and collect history measurements.
563        for replica in self.replicas.values_mut() {
564            for (id, collection) in &mut replica.collections {
565                // A per-replica collection is considered readable in the context of lag
566                // measurement if either:
567                //  (a) it sinks into a storage collection that is readable
568                //  (b) it is hydrated
569                let readable = readable_storage_collections.contains(id) || collection.hydrated();
570
571                let lag = if readable {
572                    let lag = frontier_lag(&collection.write_frontier);
573                    WallclockLag::Seconds(lag.as_secs())
574                } else {
575                    WallclockLag::Undefined
576                };
577
578                if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
579                    *wallclock_lag_max = (*wallclock_lag_max).max(lag);
580                }
581
582                if let Some(metrics) = &mut collection.metrics {
583                    // No way to specify values as undefined in Prometheus metrics, so we use the
584                    // maximum value instead.
585                    let secs = lag.unwrap_seconds_or(u64::MAX);
586                    metrics.wallclock_lag.observe(secs);
587                };
588            }
589        }
590
591        // Record lags to persist, if it's time.
592        self.maybe_record_wallclock_lag();
593    }
594
595    /// Produce new wallclock lag introspection updates, provided enough time has passed since the
596    /// last recording.
597    //
598    /// We emit new introspection updates if the system time has passed into a new multiple of the
599    /// recording interval (typically 1 minute) since the last refresh. The storage controller uses
600    /// the same approach, ensuring that both controllers commit their lags at roughly the same
601    /// time, avoiding confusion caused by inconsistencies.
602    fn maybe_record_wallclock_lag(&mut self) {
603        if self.read_only {
604            return;
605        }
606
607        let duration_trunc = |datetime: DateTime<_>, interval| {
608            let td = TimeDelta::from_std(interval).ok()?;
609            datetime.duration_trunc(td).ok()
610        };
611
612        let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
613        let now_dt = mz_ore::now::to_datetime((self.now)());
614        let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
615            soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
616            let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
617            duration_trunc(now_dt, *default).unwrap()
618        });
619        if now_trunc <= self.wallclock_lag_last_recorded {
620            return;
621        }
622
623        let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
624
625        let mut history_updates = Vec::new();
626        for (replica_id, replica) in &mut self.replicas {
627            for (collection_id, collection) in &mut replica.collections {
628                let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
629                    continue;
630                };
631
632                let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
633                let row = Row::pack_slice(&[
634                    Datum::String(&collection_id.to_string()),
635                    Datum::String(&replica_id.to_string()),
636                    max_lag.into_interval_datum(),
637                    Datum::TimestampTz(now_ts),
638                ]);
639                history_updates.push((row, Diff::ONE));
640            }
641        }
642        if !history_updates.is_empty() {
643            self.deliver_introspection_updates(
644                IntrospectionType::WallclockLagHistory,
645                history_updates,
646            );
647        }
648
649        let mut histogram_updates = Vec::new();
650        let mut row_buf = Row::default();
651        for (collection_id, collection) in &mut self.collections {
652            let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
653                continue;
654            };
655
656            for ((period, lag, labels), count) in std::mem::take(stash) {
657                let mut packer = row_buf.packer();
658                packer.extend([
659                    Datum::TimestampTz(period.start),
660                    Datum::TimestampTz(period.end),
661                    Datum::String(&collection_id.to_string()),
662                    lag.into_uint64_datum(),
663                ]);
664                let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
665                packer.push_dict(labels);
666
667                histogram_updates.push((row_buf.clone(), count));
668            }
669        }
670        if !histogram_updates.is_empty() {
671            self.deliver_introspection_updates(
672                IntrospectionType::WallclockLagHistogram,
673                histogram_updates,
674            );
675        }
676
677        self.wallclock_lag_last_recorded = now_trunc;
678    }
679
680    /// Returns `true` if the given collection is hydrated on at least one
681    /// replica.
682    ///
683    /// This also returns `true` in case this cluster does not have any
684    /// replicas that host the given collection.
685    #[mz_ore::instrument(level = "debug")]
686    pub fn collection_hydrated(&self, collection_id: GlobalId) -> Result<bool, CollectionMissing> {
687        let mut hosting_replicas = self.replicas_hosting(collection_id)?.peekable();
688        if hosting_replicas.peek().is_none() {
689            return Ok(true);
690        }
691        for replica_state in hosting_replicas {
692            let collection_state = replica_state
693                .collections
694                .get(&collection_id)
695                .expect("hosting replica must have per-replica collection state");
696
697            if collection_state.hydrated() {
698                return Ok(true);
699            }
700        }
701
702        Ok(false)
703    }
704
705    /// Returns `true` if each non-transient, non-excluded collection is hydrated on at
706    /// least one replica.
707    ///
708    /// This also returns `true` in case this cluster does not have any
709    /// replicas.
710    #[mz_ore::instrument(level = "debug")]
711    pub fn collections_hydrated_on_replicas(
712        &self,
713        target_replica_ids: Option<Vec<ReplicaId>>,
714        exclude_collections: &BTreeSet<GlobalId>,
715    ) -> Result<bool, HydrationCheckBadTarget> {
716        if self.replicas.is_empty() {
717            return Ok(true);
718        }
719        let mut all_hydrated = true;
720        let target_replicas: BTreeSet<ReplicaId> = self
721            .replicas
722            .keys()
723            .filter_map(|id| match target_replica_ids {
724                None => Some(id.clone()),
725                Some(ref ids) if ids.contains(id) => Some(id.clone()),
726                Some(_) => None,
727            })
728            .collect();
729        if let Some(targets) = target_replica_ids {
730            if target_replicas.is_empty() {
731                return Err(HydrationCheckBadTarget(targets));
732            }
733        }
734
735        for (id, _collection) in self.collections_iter() {
736            if id.is_transient() || exclude_collections.contains(&id) {
737                continue;
738            }
739
740            let mut collection_hydrated = false;
741            // `replicas_hosting` cannot fail here because `collections_iter`
742            // only yields collections that exist.
743            for replica_state in self.replicas_hosting(id).expect("collection must exist") {
744                if !target_replicas.contains(&replica_state.id) {
745                    continue;
746                }
747                let collection_state = replica_state
748                    .collections
749                    .get(&id)
750                    .expect("hosting replica must have per-replica collection state");
751
752                if collection_state.hydrated() {
753                    collection_hydrated = true;
754                    break;
755                }
756            }
757
758            if !collection_hydrated {
759                tracing::info!("collection {id} is not hydrated on any replica");
760                all_hydrated = false;
761                // We continue with our loop instead of breaking out early, so
762                // that we log all non-hydrated replicas.
763            }
764        }
765
766        Ok(all_hydrated)
767    }
768
769    /// Clean up collection state that is not needed anymore.
770    ///
771    /// Three conditions need to be true before we can remove state for a collection:
772    ///
773    ///  1. A client must have explicitly dropped the collection. If that is not the case, clients
774    ///     can still reasonably assume that the controller knows about the collection and can
775    ///     answer queries about it.
776    ///  2. There must be no outstanding read capabilities on the collection. As long as someone
777    ///     still holds read capabilities on a collection, we need to keep it around to be able
778    ///     to properly handle downgrading of said capabilities.
779    ///  3. All replica frontiers for the collection must have advanced to the empty frontier.
780    ///     Advancement to the empty frontiers signals that replicas are done computing the
781    ///     collection and that they won't send more `ComputeResponse`s for it. As long as we might
782    ///     receive responses for a collection we want to keep it around to be able to validate and
783    ///     handle these responses.
784    fn cleanup_collections(&mut self) {
785        let to_remove: Vec<_> = self
786            .collections_iter()
787            .filter(|(id, collection)| {
788                collection.dropped
789                    && collection.shared.lock_read_capabilities(|c| c.is_empty())
790                    && self
791                        .replicas
792                        .values()
793                        .all(|r| r.collection_frontiers_empty(*id))
794            })
795            .map(|(id, _collection)| id)
796            .collect();
797
798        for id in to_remove {
799            self.remove_collection(id);
800        }
801    }
802
803    /// Returns the state of the [`Instance`] formatted as JSON.
804    ///
805    /// The returned value is not guaranteed to be stable and may change at any point in time.
806    #[mz_ore::instrument(level = "debug")]
807    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
808        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
809        // returned object as a tradeoff between usability and stability. `serde_json` will fail
810        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
811        // prevents a future unrelated change from silently breaking this method.
812
813        // Destructure `self` here so we don't forget to consider dumping newly added fields.
814        let Self {
815            build_info: _,
816            storage_collections: _,
817            peek_stash_persist_location: _,
818            initialized,
819            read_only,
820            workload_class,
821            replicas,
822            collections,
823            log_sources: _,
824            peeks,
825            subscribes,
826            copy_tos,
827            history: _,
828            command_rx: _,
829            response_tx: _,
830            introspection_tx: _,
831            metrics: _,
832            dyncfg: _,
833            now: _,
834            wallclock_lag: _,
835            wallclock_lag_last_recorded,
836            read_hold_tx: _,
837            replica_tx: _,
838            replica_rx: _,
839        } = self;
840
841        let replicas: BTreeMap<_, _> = replicas
842            .iter()
843            .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
844            .collect::<Result<_, anyhow::Error>>()?;
845        let collections: BTreeMap<_, _> = collections
846            .iter()
847            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
848            .collect();
849        let peeks: BTreeMap<_, _> = peeks
850            .iter()
851            .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
852            .collect();
853        let subscribes: BTreeMap<_, _> = subscribes
854            .iter()
855            .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
856            .collect();
857        let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
858        let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
859
860        Ok(serde_json::json!({
861            "initialized": initialized,
862            "read_only": read_only,
863            "workload_class": workload_class,
864            "replicas": replicas,
865            "collections": collections,
866            "peeks": peeks,
867            "subscribes": subscribes,
868            "copy_tos": copy_tos,
869            "wallclock_lag_last_recorded": wallclock_lag_last_recorded,
870        }))
871    }
872
873    /// Reports the current write frontier for the identified compute collection.
874    pub(super) fn collection_write_frontier(
875        &self,
876        id: GlobalId,
877    ) -> Result<Antichain<T>, CollectionMissing> {
878        Ok(self.collection(id)?.write_frontier())
879    }
880}
881
882impl<T> Instance<T>
883where
884    T: ComputeControllerTimestamp,
885{
886    pub(super) fn new(
887        build_info: &'static BuildInfo,
888        storage: StorageCollections<T>,
889        peek_stash_persist_location: PersistLocation,
890        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
891        metrics: InstanceMetrics,
892        now: NowFn,
893        wallclock_lag: WallclockLagFn<T>,
894        dyncfg: Arc<ConfigSet>,
895        command_rx: mpsc::UnboundedReceiver<Command<T>>,
896        response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
897        read_hold_tx: read_holds::ChangeTx<T>,
898        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
899        read_only: bool,
900    ) -> Self {
901        let mut collections = BTreeMap::new();
902        let mut log_sources = BTreeMap::new();
903        for (log, id, shared) in arranged_logs {
904            let collection = CollectionState::new_log_collection(
905                id,
906                shared,
907                Arc::clone(&read_hold_tx),
908                introspection_tx.clone(),
909            );
910            collections.insert(id, collection);
911            log_sources.insert(log, id);
912        }
913
914        let history = ComputeCommandHistory::new(metrics.for_history());
915
916        let send_count = metrics.response_send_count.clone();
917        let recv_count = metrics.response_recv_count.clone();
918        let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
919
920        let now_dt = mz_ore::now::to_datetime(now());
921
922        Self {
923            build_info,
924            storage_collections: storage,
925            peek_stash_persist_location,
926            initialized: false,
927            read_only,
928            workload_class: None,
929            replicas: Default::default(),
930            collections,
931            log_sources,
932            peeks: Default::default(),
933            subscribes: Default::default(),
934            copy_tos: Default::default(),
935            history,
936            command_rx,
937            response_tx,
938            introspection_tx,
939            metrics,
940            dyncfg,
941            now,
942            wallclock_lag,
943            wallclock_lag_last_recorded: now_dt,
944            read_hold_tx,
945            replica_tx,
946            replica_rx,
947        }
948    }
949
950    pub(super) async fn run(mut self) {
951        self.send(ComputeCommand::Hello {
952            // The nonce is protocol iteration-specific and will be set in
953            // `ReplicaTask::specialize_command`.
954            nonce: Uuid::default(),
955        });
956
957        let instance_config = InstanceConfig {
958            peek_stash_persist_location: self.peek_stash_persist_location.clone(),
959            // The remaining fields are replica-specific and will be set in
960            // `ReplicaTask::specialize_command`.
961            logging: Default::default(),
962            expiration_offset: Default::default(),
963        };
964
965        self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
966
967        loop {
968            tokio::select! {
969                command = self.command_rx.recv() => match command {
970                    Some(cmd) => cmd(&mut self),
971                    None => break,
972                },
973                response = self.replica_rx.recv() => match response {
974                    Some(response) => self.handle_response(response),
975                    None => unreachable!("self owns a sender side of the channel"),
976                }
977            }
978        }
979    }
980
981    /// Update instance configuration.
982    #[mz_ore::instrument(level = "debug")]
983    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
984        if let Some(workload_class) = &config_params.workload_class {
985            self.workload_class = workload_class.clone();
986        }
987
988        let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
989        self.send(command);
990    }
991
992    /// Marks the end of any initialization commands.
993    ///
994    /// Intended to be called by `Controller`, rather than by other code.
995    /// Calling this method repeatedly has no effect.
996    #[mz_ore::instrument(level = "debug")]
997    pub fn initialization_complete(&mut self) {
998        // The compute protocol requires that `InitializationComplete` is sent only once.
999        if !self.initialized {
1000            self.send(ComputeCommand::InitializationComplete);
1001            self.initialized = true;
1002        }
1003    }
1004
1005    /// Allows collections to affect writes to external systems (persist).
1006    ///
1007    /// Calling this method repeatedly has no effect.
1008    #[mz_ore::instrument(level = "debug")]
1009    pub fn allow_writes(&mut self, collection_id: GlobalId) -> Result<(), CollectionMissing> {
1010        let collection = self.collection_mut(collection_id)?;
1011
1012        // Do not send redundant allow-writes commands.
1013        if !collection.read_only {
1014            return Ok(());
1015        }
1016
1017        // Don't send allow-writes for collections that are not installed.
1018        let as_of = collection.read_frontier();
1019
1020        // If the collection has an empty `as_of`, it was either never installed on the replica or
1021        // has since been dropped. In either case the replica does not expect any commands for it.
1022        if as_of.is_empty() {
1023            return Ok(());
1024        }
1025
1026        collection.read_only = false;
1027        self.send(ComputeCommand::AllowWrites(collection_id));
1028
1029        Ok(())
1030    }
1031
1032    /// Shut down this instance.
1033    ///
1034    /// This method asserts that the instance has no replicas left. It exists to help
1035    /// us find bugs where the client drops a compute instance that still has replicas
1036    /// installed, and later assumes that said replicas still exist.
1037    ///
1038    /// # Panics
1039    ///
1040    /// Soft-panics if the compute instance still has active replicas.
1041    #[mz_ore::instrument(level = "debug")]
1042    pub fn shutdown(&mut self) {
1043        // Taking the `command_rx` ensures that the [`Instance::run`] loop terminates.
1044        let (_tx, rx) = mpsc::unbounded_channel();
1045        self.command_rx = rx;
1046
1047        let stray_replicas: Vec<_> = self.replicas.keys().collect();
1048        soft_assert_or_log!(
1049            stray_replicas.is_empty(),
1050            "dropped instance still has provisioned replicas: {stray_replicas:?}",
1051        );
1052    }
1053
1054    /// Sends a command to replicas of this instance.
1055    #[mz_ore::instrument(level = "debug")]
1056    fn send(&mut self, cmd: ComputeCommand<T>) {
1057        // Record the command so that new replicas can be brought up to speed.
1058        self.history.push(cmd.clone());
1059
1060        let target_replica = self.target_replica(&cmd);
1061
1062        if let Some(rid) = target_replica {
1063            if let Some(replica) = self.replicas.get_mut(&rid) {
1064                let _ = replica.client.send(cmd);
1065            }
1066        } else {
1067            for replica in self.replicas.values_mut() {
1068                let _ = replica.client.send(cmd.clone());
1069            }
1070        }
1071    }
1072
1073    /// Determine the target replica for a compute command. Retrieves the
1074    /// collection named by the command, and returns the target replica if
1075    /// it is set, and None if not set, or the command doesn't name a collection.
1076    ///
1077    /// Panics if a create-dataflow command names collections that have different
1078    /// target replicas. It is an error to construct such an object and would
1079    /// indicate a bug in [`Self::create_dataflow`].
1080    fn target_replica(&self, cmd: &ComputeCommand<T>) -> Option<ReplicaId> {
1081        match &cmd {
1082            ComputeCommand::Schedule(id)
1083            | ComputeCommand::AllowWrites(id)
1084            | ComputeCommand::AllowCompaction { id, .. } => {
1085                self.expect_collection(*id).target_replica
1086            }
1087            ComputeCommand::CreateDataflow(desc) => {
1088                let mut target_replica = None;
1089                for id in desc.export_ids() {
1090                    if let Some(replica) = self.expect_collection(id).target_replica {
1091                        if target_replica.is_some() {
1092                            assert_eq!(target_replica, Some(replica));
1093                        }
1094                        target_replica = Some(replica);
1095                    }
1096                }
1097                target_replica
1098            }
1099            // Skip Peek as we don't allow replica-targeted indexes.
1100            ComputeCommand::Peek(_)
1101            | ComputeCommand::Hello { .. }
1102            | ComputeCommand::CreateInstance(_)
1103            | ComputeCommand::InitializationComplete
1104            | ComputeCommand::UpdateConfiguration(_)
1105            | ComputeCommand::CancelPeek { .. } => None,
1106        }
1107    }
1108
1109    /// Add a new instance replica, by ID.
1110    #[mz_ore::instrument(level = "debug")]
1111    pub fn add_replica(
1112        &mut self,
1113        id: ReplicaId,
1114        mut config: ReplicaConfig,
1115        epoch: Option<u64>,
1116    ) -> Result<(), ReplicaExists> {
1117        if self.replica_exists(id) {
1118            return Err(ReplicaExists(id));
1119        }
1120
1121        config.logging.index_logs = self.log_sources.clone();
1122
1123        let epoch = epoch.unwrap_or(1);
1124        let metrics = self.metrics.for_replica(id);
1125        let client = ReplicaClient::spawn(
1126            id,
1127            self.build_info,
1128            config.clone(),
1129            epoch,
1130            metrics.clone(),
1131            Arc::clone(&self.dyncfg),
1132            self.replica_tx.clone(),
1133        );
1134
1135        // Take this opportunity to clean up the history we should present.
1136        self.history.reduce();
1137
1138        // Advance the uppers of source imports
1139        self.history.update_source_uppers(&self.storage_collections);
1140
1141        // Replay the commands at the client, creating new dataflow identifiers.
1142        for command in self.history.iter() {
1143            // Skip `CreateDataflow` commands targeted at different replicas.
1144            if let Some(target_replica) = self.target_replica(command)
1145                && target_replica != id
1146            {
1147                continue;
1148            }
1149
1150            if client.send(command.clone()).is_err() {
1151                // We swallow the error here. On the next send, we will fail again, and
1152                // restart the connection as well as this rehydration.
1153                tracing::warn!("Replica {:?} connection terminated during hydration", id);
1154                break;
1155            }
1156        }
1157
1158        // Add replica to tracked state.
1159        self.add_replica_state(id, client, config, epoch);
1160
1161        Ok(())
1162    }
1163
1164    /// Remove an existing instance replica, by ID.
1165    #[mz_ore::instrument(level = "debug")]
1166    pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1167        self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1168
1169        // Subscribes targeting this replica either won't be served anymore (if the replica is
1170        // dropped) or might produce inconsistent output (if the target collection is an
1171        // introspection index). We produce an error to inform upstream.
1172        let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1173        for subscribe_id in to_drop {
1174            let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1175            let response = ComputeControllerResponse::SubscribeResponse(
1176                subscribe_id,
1177                SubscribeBatch {
1178                    lower: subscribe.frontier.clone(),
1179                    upper: subscribe.frontier,
1180                    updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1181                },
1182            );
1183            self.deliver_response(response);
1184        }
1185
1186        // Peeks targeting this replica might not be served anymore (if the replica is dropped).
1187        // If the replica has failed it might come back and respond to the peek later, but it still
1188        // seems like a good idea to cancel the peek to inform the caller about the failure. This
1189        // is consistent with how we handle targeted subscribes above.
1190        let mut peek_responses = Vec::new();
1191        let mut to_drop = Vec::new();
1192        for (uuid, peek) in self.peeks_targeting(id) {
1193            peek_responses.push(ComputeControllerResponse::PeekNotification(
1194                uuid,
1195                PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1196                peek.otel_ctx.clone(),
1197            ));
1198            to_drop.push(uuid);
1199        }
1200        for response in peek_responses {
1201            self.deliver_response(response);
1202        }
1203        for uuid in to_drop {
1204            let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1205            self.finish_peek(uuid, response);
1206        }
1207
1208        // We might have a chance to forward implied capabilities and reduce the cost of bringing
1209        // up the next replica, if the dropped replica was the only one in the cluster.
1210        self.forward_implied_capabilities();
1211
1212        Ok(())
1213    }
1214
1215    /// Rehydrate the given instance replica.
1216    ///
1217    /// # Panics
1218    ///
1219    /// Panics if the specified replica does not exist.
1220    fn rehydrate_replica(&mut self, id: ReplicaId) {
1221        let config = self.replicas[&id].config.clone();
1222        let epoch = self.replicas[&id].epoch + 1;
1223
1224        self.remove_replica(id).expect("replica must exist");
1225        let result = self.add_replica(id, config, Some(epoch));
1226
1227        match result {
1228            Ok(()) => (),
1229            Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1230        }
1231    }
1232
1233    /// Rehydrate any failed replicas of this instance.
1234    fn rehydrate_failed_replicas(&mut self) {
1235        let replicas = self.replicas.iter();
1236        let failed_replicas: Vec<_> = replicas
1237            .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1238            .collect();
1239
1240        for replica_id in failed_replicas {
1241            self.rehydrate_replica(replica_id);
1242        }
1243    }
1244
1245    /// Creates the described dataflow and initializes state for its output.
1246    ///
1247    /// This method expects a `DataflowDescription` with an `as_of` frontier specified, as well as
1248    /// for each imported collection a read hold in `import_read_holds` at at least the `as_of`.
1249    #[mz_ore::instrument(level = "debug")]
1250    pub fn create_dataflow(
1251        &mut self,
1252        dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1253        import_read_holds: Vec<ReadHold<T>>,
1254        mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState<T>>,
1255        target_replica: Option<ReplicaId>,
1256    ) -> Result<(), DataflowCreationError> {
1257        use DataflowCreationError::*;
1258
1259        // Validate that the target replica, if specified, exists.
1260        // A targeted dataflow is only installed on a single replica; if that
1261        // replica doesn't exist, we can't create the dataflow.
1262        if let Some(replica_id) = target_replica {
1263            if !self.replica_exists(replica_id) {
1264                return Err(ReplicaMissing(replica_id));
1265            }
1266        }
1267
1268        // Simple sanity checks around `as_of`
1269        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1270        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1271            return Err(EmptyAsOfForSubscribe);
1272        }
1273        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1274            return Err(EmptyAsOfForCopyTo);
1275        }
1276
1277        // Collect all dependencies of the dataflow, and read holds on them at the `as_of`.
1278        let mut storage_dependencies = BTreeMap::new();
1279        let mut compute_dependencies = BTreeMap::new();
1280
1281        // When we install per-replica input read holds, we cannot use the `as_of` because of
1282        // reconciliation: Existing slow replicas might be reading from the inputs at times before
1283        // the `as_of` and we would rather not crash them by allowing their inputs to compact too
1284        // far. So instead we take read holds at the least time available.
1285        let mut replica_input_read_holds = Vec::new();
1286
1287        let mut import_read_holds: BTreeMap<_, _> =
1288            import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1289
1290        for &id in dataflow.source_imports.keys() {
1291            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1292            replica_input_read_holds.push(read_hold.clone());
1293
1294            read_hold
1295                .try_downgrade(as_of.clone())
1296                .map_err(|_| ReadHoldInsufficient(id))?;
1297            storage_dependencies.insert(id, read_hold);
1298        }
1299
1300        for &id in dataflow.index_imports.keys() {
1301            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1302            read_hold
1303                .try_downgrade(as_of.clone())
1304                .map_err(|_| ReadHoldInsufficient(id))?;
1305            compute_dependencies.insert(id, read_hold);
1306        }
1307
1308        // If the `as_of` is empty, we are not going to create a dataflow, so replicas won't read
1309        // from the inputs.
1310        if as_of.is_empty() {
1311            replica_input_read_holds = Default::default();
1312        }
1313
1314        // Install collection state for each of the exports.
1315        for export_id in dataflow.export_ids() {
1316            let shared = shared_collection_state
1317                .remove(&export_id)
1318                .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1319            let write_only = dataflow.sink_exports.contains_key(&export_id);
1320            let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1321
1322            self.add_collection(
1323                export_id,
1324                as_of.clone(),
1325                shared,
1326                storage_dependencies.clone(),
1327                compute_dependencies.clone(),
1328                replica_input_read_holds.clone(),
1329                write_only,
1330                storage_sink,
1331                dataflow.initial_storage_as_of.clone(),
1332                dataflow.refresh_schedule.clone(),
1333                target_replica,
1334            );
1335
1336            // If the export is a storage sink, we can advance its write frontier to the write
1337            // frontier of the target storage collection.
1338            if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1339                self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1340            }
1341        }
1342
1343        // Initialize tracking of subscribes.
1344        for subscribe_id in dataflow.subscribe_ids() {
1345            self.subscribes
1346                .insert(subscribe_id, ActiveSubscribe::default());
1347        }
1348
1349        // Initialize tracking of copy tos.
1350        for copy_to_id in dataflow.copy_to_ids() {
1351            self.copy_tos.insert(copy_to_id);
1352        }
1353
1354        // Here we augment all imported sources and all exported sinks with the appropriate
1355        // storage metadata needed by the compute instance.
1356        let mut source_imports = BTreeMap::new();
1357        for (id, import) in dataflow.source_imports {
1358            let frontiers = self
1359                .storage_collections
1360                .collection_frontiers(id)
1361                .expect("collection exists");
1362
1363            let collection_metadata = self
1364                .storage_collections
1365                .collection_metadata(id)
1366                .expect("we have a read hold on this collection");
1367
1368            let desc = SourceInstanceDesc {
1369                storage_metadata: collection_metadata.clone(),
1370                arguments: import.desc.arguments,
1371                typ: import.desc.typ.clone(),
1372            };
1373            source_imports.insert(
1374                id,
1375                mz_compute_types::dataflows::SourceImport {
1376                    desc,
1377                    monotonic: import.monotonic,
1378                    with_snapshot: import.with_snapshot,
1379                    upper: frontiers.write_frontier,
1380                },
1381            );
1382        }
1383
1384        let mut sink_exports = BTreeMap::new();
1385        for (id, se) in dataflow.sink_exports {
1386            let connection = match se.connection {
1387                ComputeSinkConnection::MaterializedView(conn) => {
1388                    let metadata = self
1389                        .storage_collections
1390                        .collection_metadata(id)
1391                        .map_err(|_| CollectionMissing(id))?
1392                        .clone();
1393                    let conn = MaterializedViewSinkConnection {
1394                        value_desc: conn.value_desc,
1395                        storage_metadata: metadata,
1396                    };
1397                    ComputeSinkConnection::MaterializedView(conn)
1398                }
1399                ComputeSinkConnection::ContinualTask(conn) => {
1400                    let metadata = self
1401                        .storage_collections
1402                        .collection_metadata(id)
1403                        .map_err(|_| DataflowCreationError::CollectionMissing(id))?
1404                        .clone();
1405                    let conn = ContinualTaskConnection {
1406                        input_id: conn.input_id,
1407                        storage_metadata: metadata,
1408                    };
1409                    ComputeSinkConnection::ContinualTask(conn)
1410                }
1411                ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1412                ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1413                    ComputeSinkConnection::CopyToS3Oneshot(conn)
1414                }
1415            };
1416            let desc = ComputeSinkDesc {
1417                from: se.from,
1418                from_desc: se.from_desc,
1419                connection,
1420                with_snapshot: se.with_snapshot,
1421                up_to: se.up_to,
1422                non_null_assertions: se.non_null_assertions,
1423                refresh_schedule: se.refresh_schedule,
1424            };
1425            sink_exports.insert(id, desc);
1426        }
1427
1428        // Flatten the dataflow plans into the representation expected by replicas.
1429        let objects_to_build = dataflow
1430            .objects_to_build
1431            .into_iter()
1432            .map(|object| BuildDesc {
1433                id: object.id,
1434                plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1435            })
1436            .collect();
1437
1438        let augmented_dataflow = DataflowDescription {
1439            source_imports,
1440            sink_exports,
1441            objects_to_build,
1442            // The rest of the fields are identical
1443            index_imports: dataflow.index_imports,
1444            index_exports: dataflow.index_exports,
1445            as_of: dataflow.as_of.clone(),
1446            until: dataflow.until,
1447            initial_storage_as_of: dataflow.initial_storage_as_of,
1448            refresh_schedule: dataflow.refresh_schedule,
1449            debug_name: dataflow.debug_name,
1450            time_dependence: dataflow.time_dependence,
1451        };
1452
1453        if augmented_dataflow.is_transient() {
1454            tracing::debug!(
1455                name = %augmented_dataflow.debug_name,
1456                import_ids = %augmented_dataflow.display_import_ids(),
1457                export_ids = %augmented_dataflow.display_export_ids(),
1458                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1459                until = ?augmented_dataflow.until.elements(),
1460                "creating dataflow",
1461            );
1462        } else {
1463            tracing::info!(
1464                name = %augmented_dataflow.debug_name,
1465                import_ids = %augmented_dataflow.display_import_ids(),
1466                export_ids = %augmented_dataflow.display_export_ids(),
1467                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1468                until = ?augmented_dataflow.until.elements(),
1469                "creating dataflow",
1470            );
1471        }
1472
1473        // Skip the actual dataflow creation for an empty `as_of`. (Happens e.g. for the
1474        // bootstrapping of a REFRESH AT mat view that is past its last refresh.)
1475        if as_of.is_empty() {
1476            tracing::info!(
1477                name = %augmented_dataflow.debug_name,
1478                "not sending `CreateDataflow`, because of empty `as_of`",
1479            );
1480        } else {
1481            let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1482            self.send(ComputeCommand::CreateDataflow(Box::new(augmented_dataflow)));
1483
1484            for id in collections {
1485                self.maybe_schedule_collection(id);
1486            }
1487        }
1488
1489        Ok(())
1490    }
1491
1492    /// Schedule the identified collection if all its inputs are available.
1493    ///
1494    /// # Panics
1495    ///
1496    /// Panics if the identified collection does not exist.
1497    fn maybe_schedule_collection(&mut self, id: GlobalId) {
1498        let collection = self.expect_collection(id);
1499
1500        // Don't schedule collections twice.
1501        if collection.scheduled {
1502            return;
1503        }
1504
1505        let as_of = collection.read_frontier();
1506
1507        // If the collection has an empty `as_of`, it was either never installed on the replica or
1508        // has since been dropped. In either case the replica does not expect any commands for it.
1509        if as_of.is_empty() {
1510            return;
1511        }
1512
1513        let ready = if id.is_transient() {
1514            // Always schedule transient collections immediately. The assumption is that those are
1515            // created by interactive user commands and we want to schedule them as quickly as
1516            // possible. Inputs might not yet be available, but when they become available, we
1517            // don't need to wait for the controller to become aware and for the scheduling check
1518            // to run again.
1519            true
1520        } else {
1521            // Ignore self-dependencies. Any self-dependencies do not need to be
1522            // available at the as_of for the dataflow to make progress, so we
1523            // can ignore them here. At the moment, only continual tasks have
1524            // self-dependencies, but this logic is correct for any dataflow, so
1525            // we don't special case it to CTs.
1526            let not_self_dep = |x: &GlobalId| *x != id;
1527
1528            // Make sure we never schedule a collection before its input compute collections have
1529            // been scheduled. Scheduling in the wrong order can lead to deadlocks.
1530            let mut deps_scheduled = true;
1531
1532            // Check dependency frontiers to determine if all inputs are
1533            // available. An input is available when its frontier is greater
1534            // than the `as_of`, i.e., all input data up to and including the
1535            // `as_of` has been sealed.
1536            let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1537            let mut compute_frontiers = Vec::new();
1538            for id in compute_deps {
1539                let dep = &self.expect_collection(id);
1540                deps_scheduled &= dep.scheduled;
1541                compute_frontiers.push(dep.write_frontier());
1542            }
1543
1544            let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1545            let storage_frontiers = self
1546                .storage_collections
1547                .collections_frontiers(storage_deps.collect())
1548                .expect("must exist");
1549            let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1550
1551            let mut frontiers = compute_frontiers.into_iter().chain(storage_frontiers);
1552            let frontiers_ready =
1553                frontiers.all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1554
1555            deps_scheduled && frontiers_ready
1556        };
1557
1558        if ready {
1559            self.send(ComputeCommand::Schedule(id));
1560            let collection = self.expect_collection_mut(id);
1561            collection.scheduled = true;
1562        }
1563    }
1564
1565    /// Schedule any unscheduled collections that are ready.
1566    fn schedule_collections(&mut self) {
1567        let ids: Vec<_> = self.collections.keys().copied().collect();
1568        for id in ids {
1569            self.maybe_schedule_collection(id);
1570        }
1571    }
1572
1573    /// Drops the read capability for the given collections and allows their resources to be
1574    /// reclaimed.
1575    #[mz_ore::instrument(level = "debug")]
1576    pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1577        for id in &ids {
1578            let collection = self.collection_mut(*id)?;
1579
1580            // Mark the collection as dropped to allow it to be removed from the controller state.
1581            collection.dropped = true;
1582
1583            // Drop the implied and warmup read holds to announce that clients are not
1584            // interested in the collection anymore.
1585            collection.implied_read_hold.release();
1586            collection.warmup_read_hold.release();
1587
1588            // If the collection is a subscribe, stop tracking it. This ensures that the controller
1589            // ceases to produce `SubscribeResponse`s for this subscribe.
1590            self.subscribes.remove(id);
1591            // If the collection is a copy to, stop tracking it. This ensures that the controller
1592            // ceases to produce `CopyToResponse`s` for this copy to.
1593            self.copy_tos.remove(id);
1594        }
1595
1596        Ok(())
1597    }
1598
1599    /// Initiate a peek request for the contents of `id` at `timestamp`.
1600    ///
1601    /// If this returns an error, then it didn't modify any `Instance` state.
1602    #[mz_ore::instrument(level = "debug")]
1603    pub fn peek(
1604        &mut self,
1605        peek_target: PeekTarget,
1606        literal_constraints: Option<Vec<Row>>,
1607        uuid: Uuid,
1608        timestamp: T,
1609        result_desc: RelationDesc,
1610        finishing: RowSetFinishing,
1611        map_filter_project: mz_expr::SafeMfpPlan,
1612        mut read_hold: ReadHold<T>,
1613        target_replica: Option<ReplicaId>,
1614        peek_response_tx: oneshot::Sender<PeekResponse>,
1615    ) -> Result<(), PeekError> {
1616        use PeekError::*;
1617
1618        let target_id = peek_target.id();
1619
1620        // Downgrade the provided read hold to the peek time.
1621        if read_hold.id() != target_id {
1622            return Err(ReadHoldIdMismatch(read_hold.id()));
1623        }
1624        read_hold
1625            .try_downgrade(Antichain::from_elem(timestamp.clone()))
1626            .map_err(|_| ReadHoldInsufficient(target_id))?;
1627
1628        if let Some(target) = target_replica {
1629            if !self.replica_exists(target) {
1630                return Err(ReplicaMissing(target));
1631            }
1632        }
1633
1634        let otel_ctx = OpenTelemetryContext::obtain();
1635
1636        self.peeks.insert(
1637            uuid,
1638            PendingPeek {
1639                target_replica,
1640                // TODO(guswynn): can we just hold the `tracing::Span` here instead?
1641                otel_ctx: otel_ctx.clone(),
1642                requested_at: Instant::now(),
1643                read_hold,
1644                peek_response_tx,
1645                limit: finishing.limit.map(usize::cast_from),
1646                offset: finishing.offset,
1647            },
1648        );
1649
1650        let peek = Peek {
1651            literal_constraints,
1652            uuid,
1653            timestamp,
1654            finishing,
1655            map_filter_project,
1656            // Obtain an `OpenTelemetryContext` from the thread-local tracing
1657            // tree to forward it on to the compute worker.
1658            otel_ctx,
1659            target: peek_target,
1660            result_desc,
1661        };
1662        self.send(ComputeCommand::Peek(Box::new(peek)));
1663
1664        Ok(())
1665    }
1666
1667    /// Cancels an existing peek request.
1668    #[mz_ore::instrument(level = "debug")]
1669    pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1670        let Some(peek) = self.peeks.get_mut(&uuid) else {
1671            tracing::warn!("did not find pending peek for {uuid}");
1672            return;
1673        };
1674
1675        let duration = peek.requested_at.elapsed();
1676        self.metrics
1677            .observe_peek_response(&PeekResponse::Canceled, duration);
1678
1679        // Enqueue a notification for the cancellation.
1680        let otel_ctx = peek.otel_ctx.clone();
1681        otel_ctx.attach_as_parent();
1682
1683        self.deliver_response(ComputeControllerResponse::PeekNotification(
1684            uuid,
1685            PeekNotification::Canceled,
1686            otel_ctx,
1687        ));
1688
1689        // Finish the peek.
1690        // This will also propagate the cancellation to the replicas.
1691        self.finish_peek(uuid, reason);
1692    }
1693
1694    /// Assigns a read policy to specific identifiers.
1695    ///
1696    /// The policies are assigned in the order presented, and repeated identifiers should
1697    /// conclude with the last policy. Changing a policy will immediately downgrade the read
1698    /// capability if appropriate, but it will not "recover" the read capability if the prior
1699    /// capability is already ahead of it.
1700    ///
1701    /// Identifiers not present in `policies` retain their existing read policies.
1702    ///
1703    /// It is an error to attempt to set a read policy for a collection that is not readable in the
1704    /// context of compute. At this time, only indexes are readable compute collections.
1705    #[mz_ore::instrument(level = "debug")]
1706    pub fn set_read_policy(
1707        &mut self,
1708        policies: Vec<(GlobalId, ReadPolicy<T>)>,
1709    ) -> Result<(), ReadPolicyError> {
1710        // Do error checking upfront, to avoid introducing inconsistencies between a collection's
1711        // `implied_capability` and `read_capabilities`.
1712        for (id, _policy) in &policies {
1713            let collection = self.collection(*id)?;
1714            if collection.read_policy.is_none() {
1715                return Err(ReadPolicyError::WriteOnlyCollection(*id));
1716            }
1717        }
1718
1719        for (id, new_policy) in policies {
1720            let collection = self.expect_collection_mut(id);
1721            let new_since = new_policy.frontier(collection.write_frontier().borrow());
1722            let _ = collection.implied_read_hold.try_downgrade(new_since);
1723            collection.read_policy = Some(new_policy);
1724        }
1725
1726        Ok(())
1727    }
1728
1729    /// Advance the global write frontier of the given collection.
1730    ///
1731    /// Frontier regressions are gracefully ignored.
1732    ///
1733    /// # Panics
1734    ///
1735    /// Panics if the identified collection does not exist.
1736    #[mz_ore::instrument(level = "debug")]
1737    fn maybe_update_global_write_frontier(&mut self, id: GlobalId, new_frontier: Antichain<T>) {
1738        let collection = self.expect_collection_mut(id);
1739
1740        let advanced = collection.shared.lock_write_frontier(|f| {
1741            let advanced = PartialOrder::less_than(f, &new_frontier);
1742            if advanced {
1743                f.clone_from(&new_frontier);
1744            }
1745            advanced
1746        });
1747
1748        if !advanced {
1749            return;
1750        }
1751
1752        // Relax the implied read hold according to the read policy.
1753        let new_since = match &collection.read_policy {
1754            Some(read_policy) => {
1755                // For readable collections the read frontier is determined by applying the
1756                // client-provided read policy to the write frontier.
1757                read_policy.frontier(new_frontier.borrow())
1758            }
1759            None => {
1760                // Write-only collections cannot be read within the context of the compute
1761                // controller, so their read frontier only controls the read holds taken on their
1762                // inputs. We can safely downgrade the input read holds to any time less than the
1763                // write frontier.
1764                //
1765                // Note that some write-only collections (continual tasks) need to observe changes
1766                // at their current write frontier during hydration. Thus, we cannot downgrade the
1767                // read frontier to the write frontier and instead step it back by one.
1768                Antichain::from_iter(
1769                    new_frontier
1770                        .iter()
1771                        .map(|t| t.step_back().unwrap_or_else(T::minimum)),
1772                )
1773            }
1774        };
1775        let _ = collection.implied_read_hold.try_downgrade(new_since);
1776
1777        // Report the frontier advancement.
1778        self.deliver_response(ComputeControllerResponse::FrontierUpper {
1779            id,
1780            upper: new_frontier,
1781        });
1782    }
1783
1784    /// Apply a collection read hold change.
1785    pub(super) fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch<T>) {
1786        let Some(collection) = self.collections.get_mut(&id) else {
1787            soft_panic_or_log!(
1788                "read hold change for absent collection (id={id}, changes={update:?})"
1789            );
1790            return;
1791        };
1792
1793        let new_since = collection.shared.lock_read_capabilities(|caps| {
1794            // Sanity check to prevent corrupted `read_capabilities`, which can cause hard-to-debug
1795            // issues (usually stuck read frontiers).
1796            let read_frontier = caps.frontier();
1797            for (time, diff) in update.iter() {
1798                let count = caps.count_for(time) + diff;
1799                assert!(
1800                    count >= 0,
1801                    "invalid read capabilities update: negative capability \
1802             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1803                );
1804                assert!(
1805                    count == 0 || read_frontier.less_equal(time),
1806                    "invalid read capabilities update: frontier regression \
1807             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1808                );
1809            }
1810
1811            // Apply read capability updates and learn about resulting changes to the read
1812            // frontier.
1813            let changes = caps.update_iter(update.drain());
1814
1815            let changed = changes.count() > 0;
1816            changed.then(|| caps.frontier().to_owned())
1817        });
1818
1819        let Some(new_since) = new_since else {
1820            return; // read frontier did not change
1821        };
1822
1823        // Propagate read frontier update to dependencies.
1824        for read_hold in collection.compute_dependencies.values_mut() {
1825            read_hold
1826                .try_downgrade(new_since.clone())
1827                .expect("frontiers don't regress");
1828        }
1829        for read_hold in collection.storage_dependencies.values_mut() {
1830            read_hold
1831                .try_downgrade(new_since.clone())
1832                .expect("frontiers don't regress");
1833        }
1834
1835        // Produce `AllowCompaction` command.
1836        self.send(ComputeCommand::AllowCompaction {
1837            id,
1838            frontier: new_since,
1839        });
1840    }
1841
1842    /// Fulfills a registered peek and cleans up associated state.
1843    ///
1844    /// As part of this we:
1845    ///  * Send a `PeekResponse` through the peek's response channel.
1846    ///  * Emit a `CancelPeek` command to instruct replicas to stop spending resources on this
1847    ///    peek, and to allow the `ComputeCommandHistory` to reduce away the corresponding `Peek`
1848    ///    command.
1849    ///  * Remove the read hold for this peek, unblocking compaction that might have waited on it.
1850    fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1851        let Some(peek) = self.peeks.remove(&uuid) else {
1852            return;
1853        };
1854
1855        // The recipient might not be interested in the peek response anymore, which is fine.
1856        let _ = peek.peek_response_tx.send(response);
1857
1858        // NOTE: We need to send the `CancelPeek` command _before_ we release the peek's read hold
1859        // (by dropping it), to avoid the edge case that caused database-issues#4812.
1860        self.send(ComputeCommand::CancelPeek { uuid });
1861
1862        drop(peek.read_hold);
1863    }
1864
1865    /// Handles a response from a replica. Replica IDs are re-used across replica restarts, so we
1866    /// use the replica epoch to drop stale responses.
1867    fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse<T>) {
1868        // Filter responses from non-existing or stale replicas.
1869        if self
1870            .replicas
1871            .get(&replica_id)
1872            .filter(|replica| replica.epoch == epoch)
1873            .is_none()
1874        {
1875            return;
1876        }
1877
1878        // Invariant: the replica exists and has the expected epoch.
1879
1880        match response {
1881            ComputeResponse::Frontiers(id, frontiers) => {
1882                self.handle_frontiers_response(id, frontiers, replica_id);
1883            }
1884            ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
1885                self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
1886            }
1887            ComputeResponse::CopyToResponse(id, response) => {
1888                self.handle_copy_to_response(id, response, replica_id);
1889            }
1890            ComputeResponse::SubscribeResponse(id, response) => {
1891                self.handle_subscribe_response(id, response, replica_id);
1892            }
1893            ComputeResponse::Status(response) => {
1894                self.handle_status_response(response, replica_id);
1895            }
1896        }
1897    }
1898
1899    /// Handle new frontiers, returning any compute response that needs to
1900    /// be sent to the client.
1901    fn handle_frontiers_response(
1902        &mut self,
1903        id: GlobalId,
1904        frontiers: FrontiersResponse<T>,
1905        replica_id: ReplicaId,
1906    ) {
1907        if !self.collections.contains_key(&id) {
1908            soft_panic_or_log!(
1909                "frontiers update for an unknown collection \
1910                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1911            );
1912            return;
1913        }
1914        let Some(replica) = self.replicas.get_mut(&replica_id) else {
1915            soft_panic_or_log!(
1916                "frontiers update for an unknown replica \
1917                 (replica_id={replica_id}, frontiers={frontiers:?})"
1918            );
1919            return;
1920        };
1921        let Some(replica_collection) = replica.collections.get_mut(&id) else {
1922            soft_panic_or_log!(
1923                "frontiers update for an unknown replica collection \
1924                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1925            );
1926            return;
1927        };
1928
1929        if let Some(new_frontier) = frontiers.input_frontier {
1930            replica_collection.update_input_frontier(new_frontier.clone());
1931        }
1932        if let Some(new_frontier) = frontiers.output_frontier {
1933            replica_collection.update_output_frontier(new_frontier.clone());
1934        }
1935        if let Some(new_frontier) = frontiers.write_frontier {
1936            replica_collection.update_write_frontier(new_frontier.clone());
1937            self.maybe_update_global_write_frontier(id, new_frontier);
1938        }
1939    }
1940
1941    #[mz_ore::instrument(level = "debug")]
1942    fn handle_peek_response(
1943        &mut self,
1944        uuid: Uuid,
1945        response: PeekResponse,
1946        otel_ctx: OpenTelemetryContext,
1947        replica_id: ReplicaId,
1948    ) {
1949        otel_ctx.attach_as_parent();
1950
1951        // We might not be tracking this peek anymore, because we have served a response already or
1952        // because it was canceled. If this is the case, we ignore the response.
1953        let Some(peek) = self.peeks.get(&uuid) else {
1954            return;
1955        };
1956
1957        // If the peek is targeting a replica, ignore responses from other replicas.
1958        let target_replica = peek.target_replica.unwrap_or(replica_id);
1959        if target_replica != replica_id {
1960            return;
1961        }
1962
1963        let duration = peek.requested_at.elapsed();
1964        self.metrics.observe_peek_response(&response, duration);
1965
1966        let notification = PeekNotification::new(&response, peek.offset, peek.limit);
1967        // NOTE: We use the `otel_ctx` from the response, not the pending peek, because we
1968        // currently want the parent to be whatever the compute worker did with this peek.
1969        self.deliver_response(ComputeControllerResponse::PeekNotification(
1970            uuid,
1971            notification,
1972            otel_ctx,
1973        ));
1974
1975        self.finish_peek(uuid, response)
1976    }
1977
1978    fn handle_copy_to_response(
1979        &mut self,
1980        sink_id: GlobalId,
1981        response: CopyToResponse,
1982        replica_id: ReplicaId,
1983    ) {
1984        if !self.collections.contains_key(&sink_id) {
1985            soft_panic_or_log!(
1986                "received response for an unknown copy-to \
1987                 (sink_id={sink_id}, replica_id={replica_id})",
1988            );
1989            return;
1990        }
1991        let Some(replica) = self.replicas.get_mut(&replica_id) else {
1992            soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
1993            return;
1994        };
1995        let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
1996            soft_panic_or_log!(
1997                "copy-to response for an unknown replica collection \
1998                 (sink_id={sink_id}, replica_id={replica_id})"
1999            );
2000            return;
2001        };
2002
2003        // Downgrade the replica frontiers, to enable dropping of input read holds and clean up of
2004        // collection state.
2005        // TODO(database-issues#4701): report copy-to frontiers through `Frontiers` responses
2006        replica_collection.update_write_frontier(Antichain::new());
2007        replica_collection.update_input_frontier(Antichain::new());
2008        replica_collection.update_output_frontier(Antichain::new());
2009
2010        // We might not be tracking this COPY TO because we have already returned a response
2011        // from one of the replicas. In that case, we ignore the response.
2012        if !self.copy_tos.remove(&sink_id) {
2013            return;
2014        }
2015
2016        let result = match response {
2017            CopyToResponse::RowCount(count) => Ok(count),
2018            CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
2019            // We should never get here: Replicas only drop copy to collections in response
2020            // to the controller allowing them to do so, and when the controller drops a
2021            // copy to it also removes it from the list of tracked copy_tos (see
2022            // [`Instance::drop_collections`]).
2023            CopyToResponse::Dropped => {
2024                tracing::error!(
2025                    %sink_id, %replica_id,
2026                    "received `Dropped` response for a tracked copy to",
2027                );
2028                return;
2029            }
2030        };
2031
2032        self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2033    }
2034
2035    fn handle_subscribe_response(
2036        &mut self,
2037        subscribe_id: GlobalId,
2038        response: SubscribeResponse<T>,
2039        replica_id: ReplicaId,
2040    ) {
2041        if !self.collections.contains_key(&subscribe_id) {
2042            soft_panic_or_log!(
2043                "received response for an unknown subscribe \
2044                 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2045            );
2046            return;
2047        }
2048        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2049            soft_panic_or_log!(
2050                "subscribe response for an unknown replica (replica_id={replica_id})"
2051            );
2052            return;
2053        };
2054        let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2055            soft_panic_or_log!(
2056                "subscribe response for an unknown replica collection \
2057                 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2058            );
2059            return;
2060        };
2061
2062        // Always apply replica write frontier updates. Even if the subscribe is not tracked
2063        // anymore, there might still be replicas reading from its inputs, so we need to track the
2064        // frontiers until all replicas have advanced to the empty one.
2065        let write_frontier = match &response {
2066            SubscribeResponse::Batch(batch) => batch.upper.clone(),
2067            SubscribeResponse::DroppedAt(_) => Antichain::new(),
2068        };
2069
2070        // For subscribes we downgrade all replica frontiers based on write frontiers. This should
2071        // be fine because the input and output frontier of a subscribe track its write frontier.
2072        // TODO(database-issues#4701): report subscribe frontiers through `Frontiers` responses
2073        replica_collection.update_write_frontier(write_frontier.clone());
2074        replica_collection.update_input_frontier(write_frontier.clone());
2075        replica_collection.update_output_frontier(write_frontier.clone());
2076
2077        // If the subscribe is not tracked, or targets a different replica, there is nothing to do.
2078        let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2079            return;
2080        };
2081
2082        // Apply a global frontier update.
2083        // If this is a replica-targeted subscribe, it is important that we advance the global
2084        // frontier only based on responses from the targeted replica. Otherwise, another replica
2085        // could advance to the empty frontier, making us drop the subscribe on the targeted
2086        // replica prematurely.
2087        self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2088
2089        match response {
2090            SubscribeResponse::Batch(batch) => {
2091                let upper = batch.upper;
2092                let mut updates = batch.updates;
2093
2094                // If this batch advances the subscribe's frontier, we emit all updates at times
2095                // greater or equal to the last frontier (to avoid emitting duplicate updates).
2096                if PartialOrder::less_than(&subscribe.frontier, &upper) {
2097                    let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2098
2099                    if upper.is_empty() {
2100                        // This subscribe cannot produce more data. Stop tracking it.
2101                        self.subscribes.remove(&subscribe_id);
2102                    } else {
2103                        // This subscribe can produce more data. Update our tracking of it.
2104                        self.subscribes.insert(subscribe_id, subscribe);
2105                    }
2106
2107                    if let Ok(updates) = updates.as_mut() {
2108                        updates.retain_mut(|updates| {
2109                            let offset = updates.times().partition_point(|t| {
2110                                // True for times that are strictly less than lower (and should be skipped)
2111                                // and false otherwise.
2112                                !lower.less_equal(t)
2113                            });
2114                            let (_, past_lower) = std::mem::take(updates).split_at(offset);
2115                            *updates = past_lower;
2116                            updates.len() > 0
2117                        });
2118                    }
2119                    self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2120                        subscribe_id,
2121                        SubscribeBatch {
2122                            lower,
2123                            upper,
2124                            updates,
2125                        },
2126                    ));
2127                }
2128            }
2129            SubscribeResponse::DroppedAt(frontier) => {
2130                // We should never get here: Replicas only drop subscribe collections in response
2131                // to the controller allowing them to do so, and when the controller drops a
2132                // subscribe it also removes it from the list of tracked subscribes (see
2133                // [`Instance::drop_collections`]).
2134                tracing::error!(
2135                    %subscribe_id,
2136                    %replica_id,
2137                    frontier = ?frontier.elements(),
2138                    "received `DroppedAt` response for a tracked subscribe",
2139                );
2140                self.subscribes.remove(&subscribe_id);
2141            }
2142        }
2143    }
2144
2145    fn handle_status_response(&self, response: StatusResponse, _replica_id: ReplicaId) {
2146        match response {
2147            StatusResponse::Placeholder => {}
2148        }
2149    }
2150
2151    /// Return the write frontiers of the dependencies of the given collection.
2152    fn dependency_write_frontiers<'b>(
2153        &'b self,
2154        collection: &'b CollectionState<T>,
2155    ) -> impl Iterator<Item = Antichain<T>> + 'b {
2156        let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2157            let collection = self.collections.get(&dep_id);
2158            collection.map(|c| c.write_frontier())
2159        });
2160        let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2161            let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2162            frontiers.map(|f| f.write_frontier)
2163        });
2164
2165        compute_frontiers.chain(storage_frontiers)
2166    }
2167
2168    /// Return the write frontiers of transitive storage dependencies of the given collection.
2169    fn transitive_storage_dependency_write_frontiers<'b>(
2170        &'b self,
2171        collection: &'b CollectionState<T>,
2172    ) -> impl Iterator<Item = Antichain<T>> + 'b {
2173        let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2174        let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2175        let mut done = BTreeSet::new();
2176
2177        while let Some(id) = todo.pop() {
2178            if done.contains(&id) {
2179                continue;
2180            }
2181            if let Some(dep) = self.collections.get(&id) {
2182                storage_ids.extend(dep.storage_dependency_ids());
2183                todo.extend(dep.compute_dependency_ids())
2184            }
2185            done.insert(id);
2186        }
2187
2188        let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2189            let frontiers = self.storage_collections.collection_frontiers(id).ok();
2190            frontiers.map(|f| f.write_frontier)
2191        });
2192
2193        storage_frontiers
2194    }
2195
2196    /// Downgrade the warmup capabilities of collections as much as possible.
2197    ///
2198    /// The only requirement we have for a collection's warmup capability is that it is for a time
2199    /// that is available in all of the collection's inputs. For each input the latest time that is
2200    /// the case for is `write_frontier - 1`. So the farthest we can downgrade a collection's
2201    /// warmup capability is the minimum of `write_frontier - 1` of all its inputs.
2202    ///
2203    /// This method expects to be periodically called as part of instance maintenance work.
2204    /// We would like to instead update the warmup capabilities synchronously in response to
2205    /// frontier updates of dependency collections, but that is not generally possible because we
2206    /// don't learn about frontier updates of storage collections synchronously. We could do
2207    /// synchronous updates for compute dependencies, but we refrain from doing for simplicity.
2208    fn downgrade_warmup_capabilities(&mut self) {
2209        let mut new_capabilities = BTreeMap::new();
2210        for (id, collection) in &self.collections {
2211            // For write-only collections that have advanced to the empty frontier, we can drop the
2212            // warmup capability entirely. There is no reason why we would need to hydrate those
2213            // collections again, so being able to warm them up is not useful.
2214            if collection.read_policy.is_none()
2215                && collection.shared.lock_write_frontier(|f| f.is_empty())
2216            {
2217                new_capabilities.insert(*id, Antichain::new());
2218                continue;
2219            }
2220
2221            let mut new_capability = Antichain::new();
2222            for frontier in self.dependency_write_frontiers(collection) {
2223                for time in frontier {
2224                    new_capability.insert(time.step_back().unwrap_or(time));
2225                }
2226            }
2227
2228            new_capabilities.insert(*id, new_capability);
2229        }
2230
2231        for (id, new_capability) in new_capabilities {
2232            let collection = self.expect_collection_mut(id);
2233            let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2234        }
2235    }
2236
2237    /// Forward the implied capabilities of collections, if possible.
2238    ///
2239    /// The implied capability of a collection controls (a) which times are still readable (for
2240    /// indexes) and (b) with which as-of the collection gets installed on a new replica. We are
2241    /// usually not allowed to advance an implied capability beyond the frontier that follows from
2242    /// the collection's read policy applied to its write frontier:
2243    ///
2244    ///  * For sink collections, some external consumer might rely on seeing all distinct times in
2245    ///    the input reflected in the output. If we'd forward the implied capability of a sink,
2246    ///    we'd risk skipping times in the output across replica restarts.
2247    ///  * For index collections, we might make the index unreadable by advancing its read frontier
2248    ///    beyond its write frontier.
2249    ///
2250    /// There is one case where forwarding an implied capability is fine though: an index installed
2251    /// on a cluster that has no replicas. Such indexes are not readable anyway until a new replica
2252    /// is added, so advancing its read frontier can't make it unreadable. We can thus advance the
2253    /// implied capability as long as we make sure that when a new replica is added, the expected
2254    /// relationship between write frontier, read policy, and implied capability can be restored
2255    /// immediately (modulo computation time).
2256    ///
2257    /// Forwarding implied capabilities is not necessary for the correct functioning of the
2258    /// controller but an optimization that is beneficial in two ways:
2259    ///
2260    ///  * It relaxes read holds on inputs to forwarded collections, allowing their compaction.
2261    ///  * It reduces the amount of historical detail new replicas need to process when computing
2262    ///    forwarded collections, as forwarding the implied capability also forwards the corresponding
2263    ///    dataflow as-of.
2264    fn forward_implied_capabilities(&mut self) {
2265        if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2266            return;
2267        }
2268        if !self.replicas.is_empty() {
2269            return;
2270        }
2271
2272        let mut new_capabilities = BTreeMap::new();
2273        for (id, collection) in &self.collections {
2274            let Some(read_policy) = &collection.read_policy else {
2275                // Collection is write-only, i.e. a sink.
2276                continue;
2277            };
2278
2279            // When a new replica is started, it will immediately be able to compute all collection
2280            // output up to the write frontier of its transitive storage inputs. So the new implied
2281            // read capability should be the read policy applied to that frontier.
2282            let mut dep_frontier = Antichain::new();
2283            for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2284                dep_frontier.extend(frontier);
2285            }
2286
2287            let new_capability = read_policy.frontier(dep_frontier.borrow());
2288            if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2289                new_capabilities.insert(*id, new_capability);
2290            }
2291        }
2292
2293        for (id, new_capability) in new_capabilities {
2294            let collection = self.expect_collection_mut(id);
2295            let _ = collection.implied_read_hold.try_downgrade(new_capability);
2296        }
2297    }
2298
2299    /// Acquires a `ReadHold` for the identified compute collection.
2300    ///
2301    /// This mirrors the logic used by the controller-side `InstanceState::acquire_read_hold`,
2302    /// but executes on the instance task itself.
2303    pub(super) fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
2304        // Similarly to InstanceState::acquire_read_hold and StorageCollections::acquire_read_holds,
2305        // we acquire read holds at the earliest possible time rather than returning a copy
2306        // of the implied read hold. This is so that dependents can acquire read holds on
2307        // compute dependencies at frontiers that are held back by other read holds the caller
2308        // has previously taken.
2309        let collection = self.collection(id)?;
2310        let since = collection.shared.lock_read_capabilities(|caps| {
2311            let since = caps.frontier().to_owned();
2312            caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
2313            since
2314        });
2315        let hold = ReadHold::new(id, since, Arc::clone(&self.read_hold_tx));
2316        Ok(hold)
2317    }
2318
2319    /// Process pending maintenance work.
2320    ///
2321    /// This method is invoked periodically by the global controller.
2322    /// It is a good place to perform maintenance work that arises from various controller state
2323    /// changes and that cannot conveniently be handled synchronously with those state changes.
2324    #[mz_ore::instrument(level = "debug")]
2325    pub fn maintain(&mut self) {
2326        self.rehydrate_failed_replicas();
2327        self.downgrade_warmup_capabilities();
2328        self.forward_implied_capabilities();
2329        self.schedule_collections();
2330        self.cleanup_collections();
2331        self.update_frontier_introspection();
2332        self.refresh_state_metrics();
2333        self.refresh_wallclock_lag();
2334    }
2335}
2336
2337/// State maintained about individual compute collections.
2338///
2339/// A compute collection is either an index, or a storage sink, or a subscribe, exported by a
2340/// compute dataflow.
2341#[derive(Debug)]
2342struct CollectionState<T: ComputeControllerTimestamp> {
2343    /// If set, this collection is only maintained by the specified replica.
2344    target_replica: Option<ReplicaId>,
2345    /// Whether this collection is a log collection.
2346    ///
2347    /// Log collections are special in that they are only maintained by a subset of all replicas.
2348    log_collection: bool,
2349    /// Whether this collection has been dropped by a controller client.
2350    ///
2351    /// The controller is allowed to remove the `CollectionState` for a collection only when
2352    /// `dropped == true`. Otherwise, clients might still expect to be able to query information
2353    /// about this collection.
2354    dropped: bool,
2355    /// Whether this collection has been scheduled, i.e., the controller has sent a `Schedule`
2356    /// command for it.
2357    scheduled: bool,
2358
2359    /// Whether this collection is in read-only mode.
2360    ///
2361    /// When in read-only mode, the dataflow is not allowed to affect external state (largely persist).
2362    read_only: bool,
2363
2364    /// State shared with the `ComputeController`.
2365    shared: SharedCollectionState<T>,
2366
2367    /// A read hold maintaining the implicit capability of the collection.
2368    ///
2369    /// This capability is kept to ensure that the collection remains readable according to its
2370    /// `read_policy`. It also ensures that read holds on the collection's dependencies are kept at
2371    /// some time not greater than the collection's `write_frontier`, guaranteeing that the
2372    /// collection's next outputs can always be computed without skipping times.
2373    implied_read_hold: ReadHold<T>,
2374    /// A read hold held to enable dataflow warmup.
2375    ///
2376    /// Dataflow warmup is an optimization that allows dataflows to immediately start hydrating
2377    /// even when their next output time (as implied by the `write_frontier`) is in the future.
2378    /// By installing a read capability derived from the write frontiers of the collection's
2379    /// inputs, we ensure that the as-of of new dataflows installed for the collection is at a time
2380    /// that is immediately available, so hydration can begin immediately too.
2381    warmup_read_hold: ReadHold<T>,
2382    /// The policy to use to downgrade `self.implied_read_hold`.
2383    ///
2384    /// If `None`, the collection is a write-only collection (i.e. a sink). For write-only
2385    /// collections, the `implied_read_hold` is only required for maintaining read holds on the
2386    /// inputs, so we can immediately downgrade it to the `write_frontier`.
2387    read_policy: Option<ReadPolicy<T>>,
2388
2389    /// Storage identifiers on which this collection depends, and read holds this collection
2390    /// requires on them.
2391    storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2392    /// Compute identifiers on which this collection depends, and read holds this collection
2393    /// requires on them.
2394    compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2395
2396    /// Introspection state associated with this collection.
2397    introspection: CollectionIntrospection<T>,
2398
2399    /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
2400    /// introspection update.
2401    ///
2402    /// Keys are `(period, lag, labels)` triples, values are counts.
2403    ///
2404    /// If this is `None`, wallclock lag is not tracked for this collection.
2405    wallclock_lag_histogram_stash: Option<
2406        BTreeMap<
2407            (
2408                WallclockLagHistogramPeriod,
2409                WallclockLag,
2410                BTreeMap<&'static str, String>,
2411            ),
2412            Diff,
2413        >,
2414    >,
2415}
2416
2417impl<T: ComputeControllerTimestamp> CollectionState<T> {
2418    /// Creates a new collection state, with an initial read policy valid from `since`.
2419    fn new(
2420        collection_id: GlobalId,
2421        as_of: Antichain<T>,
2422        shared: SharedCollectionState<T>,
2423        storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2424        compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2425        read_hold_tx: read_holds::ChangeTx<T>,
2426        introspection: CollectionIntrospection<T>,
2427    ) -> Self {
2428        // A collection is not readable before the `as_of`.
2429        let since = as_of.clone();
2430        // A collection won't produce updates for times before the `as_of`.
2431        let upper = as_of;
2432
2433        // Ensure that the provided `shared` is valid for the given `as_of`.
2434        assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2435        assert!(shared.lock_write_frontier(|f| f == &upper));
2436
2437        // Initialize collection read holds.
2438        // Note that the implied read hold was already added to the `read_capabilities` when
2439        // `shared` was created, so we only need to add the warmup read hold here.
2440        let implied_read_hold =
2441            ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2442        let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2443
2444        let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2445        shared.lock_read_capabilities(|c| {
2446            c.update_iter(updates);
2447        });
2448
2449        // In an effort to keep the produced wallclock lag introspection data small and
2450        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
2451        // select indexes and subscribes.
2452        let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2453            true => None,
2454            false => Some(Default::default()),
2455        };
2456
2457        Self {
2458            target_replica: None,
2459            log_collection: false,
2460            dropped: false,
2461            scheduled: false,
2462            read_only: true,
2463            shared,
2464            implied_read_hold,
2465            warmup_read_hold,
2466            read_policy: Some(ReadPolicy::ValidFrom(since)),
2467            storage_dependencies,
2468            compute_dependencies,
2469            introspection,
2470            wallclock_lag_histogram_stash,
2471        }
2472    }
2473
2474    /// Creates a new collection state for a log collection.
2475    fn new_log_collection(
2476        id: GlobalId,
2477        shared: SharedCollectionState<T>,
2478        read_hold_tx: read_holds::ChangeTx<T>,
2479        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2480    ) -> Self {
2481        let since = Antichain::from_elem(T::minimum());
2482        let introspection = CollectionIntrospection::new(
2483            id,
2484            introspection_tx,
2485            since.clone(),
2486            false,
2487            None,
2488            None,
2489            Vec::new(),
2490        );
2491        let mut state = Self::new(
2492            id,
2493            since,
2494            shared,
2495            Default::default(),
2496            Default::default(),
2497            read_hold_tx,
2498            introspection,
2499        );
2500        state.log_collection = true;
2501        // Log collections are created and scheduled implicitly as part of replica initialization.
2502        state.scheduled = true;
2503        state
2504    }
2505
2506    /// Reports the current read frontier.
2507    fn read_frontier(&self) -> Antichain<T> {
2508        self.shared
2509            .lock_read_capabilities(|c| c.frontier().to_owned())
2510    }
2511
2512    /// Reports the current write frontier.
2513    fn write_frontier(&self) -> Antichain<T> {
2514        self.shared.lock_write_frontier(|f| f.clone())
2515    }
2516
2517    fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2518        self.storage_dependencies.keys().copied()
2519    }
2520
2521    fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2522        self.compute_dependencies.keys().copied()
2523    }
2524}
2525
2526/// Collection state shared with the `ComputeController`.
2527///
2528/// Having this allows certain controller APIs, such as `ComputeController::collection_frontiers`
2529/// and `ComputeController::acquire_read_hold` to be non-`async`. This comes at the cost of
2530/// complexity (by introducing shared mutable state) and performance (by introducing locking). We
2531/// should aim to reduce the amount of shared state over time, rather than expand it.
2532///
2533/// Note that [`SharedCollectionState`]s are initialized by the `ComputeController` prior to the
2534/// collection's creation in the [`Instance`]. This is to allow compute clients to query frontiers
2535/// and take new read holds immediately, without having to wait for the [`Instance`] to update.
2536#[derive(Clone, Debug)]
2537pub(super) struct SharedCollectionState<T> {
2538    /// Accumulation of read capabilities for the collection.
2539    ///
2540    /// This accumulation contains the capabilities held by all [`ReadHold`]s given out for the
2541    /// collection, including `implied_read_hold` and `warmup_read_hold`.
2542    ///
2543    /// NOTE: This field may only be modified by [`Instance::apply_read_hold_change`],
2544    /// [`Instance::acquire_read_hold`], and `ComputeController::acquire_read_hold`.
2545    /// Nobody else should modify read capabilities directly. Instead, collection users should
2546    /// manage read holds through [`ReadHold`] objects acquired through
2547    /// `ComputeController::acquire_read_hold`.
2548    ///
2549    /// TODO(teskje): Restructure the code to enforce the above in the type system.
2550    read_capabilities: Arc<Mutex<MutableAntichain<T>>>,
2551    /// The write frontier of this collection.
2552    write_frontier: Arc<Mutex<Antichain<T>>>,
2553}
2554
2555impl<T: Timestamp> SharedCollectionState<T> {
2556    pub fn new(as_of: Antichain<T>) -> Self {
2557        // A collection is not readable before the `as_of`.
2558        let since = as_of.clone();
2559        // A collection won't produce updates for times before the `as_of`.
2560        let upper = as_of;
2561
2562        // Initialize read capabilities to the `since`.
2563        // The is the implied read capability. The corresponding [`ReadHold`] is created in
2564        // [`CollectionState::new`].
2565        let mut read_capabilities = MutableAntichain::new();
2566        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2567
2568        Self {
2569            read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2570            write_frontier: Arc::new(Mutex::new(upper)),
2571        }
2572    }
2573
2574    pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2575    where
2576        F: FnOnce(&mut MutableAntichain<T>) -> R,
2577    {
2578        let mut caps = self.read_capabilities.lock().expect("poisoned");
2579        f(&mut *caps)
2580    }
2581
2582    pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2583    where
2584        F: FnOnce(&mut Antichain<T>) -> R,
2585    {
2586        let mut frontier = self.write_frontier.lock().expect("poisoned");
2587        f(&mut *frontier)
2588    }
2589}
2590
2591/// Manages certain introspection relations associated with a collection. Upon creation, it adds
2592/// rows to introspection relations. When dropped, it retracts its managed rows.
2593#[derive(Debug)]
2594struct CollectionIntrospection<T: ComputeControllerTimestamp> {
2595    /// The ID of the compute collection.
2596    collection_id: GlobalId,
2597    /// A channel through which introspection updates are delivered.
2598    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2599    /// Introspection state for `IntrospectionType::Frontiers`.
2600    ///
2601    /// `Some` if the collection does _not_ sink into a storage collection (i.e. is not an MV). If
2602    /// the collection sinks into storage, the storage controller reports its frontiers instead.
2603    frontiers: Option<FrontiersIntrospectionState<T>>,
2604    /// Introspection state for `IntrospectionType::ComputeMaterializedViewRefreshes`.
2605    ///
2606    /// `Some` if the collection is a REFRESH MV.
2607    refresh: Option<RefreshIntrospectionState<T>>,
2608    /// The IDs of the collection's dependencies, for `IntrospectionType::ComputeDependencies`.
2609    dependency_ids: Vec<GlobalId>,
2610}
2611
2612impl<T: ComputeControllerTimestamp> CollectionIntrospection<T> {
2613    fn new(
2614        collection_id: GlobalId,
2615        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2616        as_of: Antichain<T>,
2617        storage_sink: bool,
2618        initial_as_of: Option<Antichain<T>>,
2619        refresh_schedule: Option<RefreshSchedule>,
2620        dependency_ids: Vec<GlobalId>,
2621    ) -> Self {
2622        let refresh =
2623            match (refresh_schedule, initial_as_of) {
2624                (Some(refresh_schedule), Some(initial_as_of)) => Some(
2625                    RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2626                ),
2627                (refresh_schedule, _) => {
2628                    // If we have a `refresh_schedule`, then the collection is a MV, so we should also have
2629                    // an `initial_as_of`.
2630                    soft_assert_or_log!(
2631                        refresh_schedule.is_none(),
2632                        "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2633                    );
2634                    None
2635                }
2636            };
2637        let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2638
2639        let self_ = Self {
2640            collection_id,
2641            introspection_tx,
2642            frontiers,
2643            refresh,
2644            dependency_ids,
2645        };
2646
2647        self_.report_initial_state();
2648        self_
2649    }
2650
2651    /// Reports the initial introspection state.
2652    fn report_initial_state(&self) {
2653        if let Some(frontiers) = &self.frontiers {
2654            let row = frontiers.row_for_collection(self.collection_id);
2655            let updates = vec![(row, Diff::ONE)];
2656            self.send(IntrospectionType::Frontiers, updates);
2657        }
2658
2659        if let Some(refresh) = &self.refresh {
2660            let row = refresh.row_for_collection(self.collection_id);
2661            let updates = vec![(row, Diff::ONE)];
2662            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2663        }
2664
2665        if !self.dependency_ids.is_empty() {
2666            let updates = self.dependency_rows(Diff::ONE);
2667            self.send(IntrospectionType::ComputeDependencies, updates);
2668        }
2669    }
2670
2671    /// Produces rows for the `ComputeDependencies` introspection relation.
2672    fn dependency_rows(&self, diff: Diff) -> Vec<(Row, Diff)> {
2673        self.dependency_ids
2674            .iter()
2675            .map(|dependency_id| {
2676                let row = Row::pack_slice(&[
2677                    Datum::String(&self.collection_id.to_string()),
2678                    Datum::String(&dependency_id.to_string()),
2679                ]);
2680                (row, diff)
2681            })
2682            .collect()
2683    }
2684
2685    /// Observe the given current collection frontiers and update the introspection state as
2686    /// necessary.
2687    fn observe_frontiers(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2688        self.update_frontier_introspection(read_frontier, write_frontier);
2689        self.update_refresh_introspection(write_frontier);
2690    }
2691
2692    fn update_frontier_introspection(
2693        &mut self,
2694        read_frontier: &Antichain<T>,
2695        write_frontier: &Antichain<T>,
2696    ) {
2697        let Some(frontiers) = &mut self.frontiers else {
2698            return;
2699        };
2700
2701        if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2702        {
2703            return; // no change
2704        };
2705
2706        let retraction = frontiers.row_for_collection(self.collection_id);
2707        frontiers.update(read_frontier, write_frontier);
2708        let insertion = frontiers.row_for_collection(self.collection_id);
2709        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2710        self.send(IntrospectionType::Frontiers, updates);
2711    }
2712
2713    fn update_refresh_introspection(&mut self, write_frontier: &Antichain<T>) {
2714        let Some(refresh) = &mut self.refresh else {
2715            return;
2716        };
2717
2718        let retraction = refresh.row_for_collection(self.collection_id);
2719        refresh.frontier_update(write_frontier);
2720        let insertion = refresh.row_for_collection(self.collection_id);
2721
2722        if retraction == insertion {
2723            return; // no change
2724        }
2725
2726        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2727        self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2728    }
2729
2730    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2731        // Failure to send means the `ComputeController` has been dropped and doesn't care about
2732        // introspection updates anymore.
2733        let _ = self.introspection_tx.send((introspection_type, updates));
2734    }
2735}
2736
2737impl<T: ComputeControllerTimestamp> Drop for CollectionIntrospection<T> {
2738    fn drop(&mut self) {
2739        // Retract collection frontiers.
2740        if let Some(frontiers) = &self.frontiers {
2741            let row = frontiers.row_for_collection(self.collection_id);
2742            let updates = vec![(row, Diff::MINUS_ONE)];
2743            self.send(IntrospectionType::Frontiers, updates);
2744        }
2745
2746        // Retract MV refresh state.
2747        if let Some(refresh) = &self.refresh {
2748            let retraction = refresh.row_for_collection(self.collection_id);
2749            let updates = vec![(retraction, Diff::MINUS_ONE)];
2750            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2751        }
2752
2753        // Retract collection dependencies.
2754        if !self.dependency_ids.is_empty() {
2755            let updates = self.dependency_rows(Diff::MINUS_ONE);
2756            self.send(IntrospectionType::ComputeDependencies, updates);
2757        }
2758    }
2759}
2760
2761#[derive(Debug)]
2762struct FrontiersIntrospectionState<T> {
2763    read_frontier: Antichain<T>,
2764    write_frontier: Antichain<T>,
2765}
2766
2767impl<T: ComputeControllerTimestamp> FrontiersIntrospectionState<T> {
2768    fn new(as_of: Antichain<T>) -> Self {
2769        Self {
2770            read_frontier: as_of.clone(),
2771            write_frontier: as_of,
2772        }
2773    }
2774
2775    /// Return a `Row` reflecting the current collection frontiers.
2776    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2777        let read_frontier = self
2778            .read_frontier
2779            .as_option()
2780            .map_or(Datum::Null, |ts| ts.clone().into());
2781        let write_frontier = self
2782            .write_frontier
2783            .as_option()
2784            .map_or(Datum::Null, |ts| ts.clone().into());
2785        Row::pack_slice(&[
2786            Datum::String(&collection_id.to_string()),
2787            read_frontier,
2788            write_frontier,
2789        ])
2790    }
2791
2792    /// Update the introspection state with the given new frontiers.
2793    fn update(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2794        if read_frontier != &self.read_frontier {
2795            self.read_frontier.clone_from(read_frontier);
2796        }
2797        if write_frontier != &self.write_frontier {
2798            self.write_frontier.clone_from(write_frontier);
2799        }
2800    }
2801}
2802
2803/// Information needed to compute introspection updates for a REFRESH materialized view when the
2804/// write frontier advances.
2805#[derive(Debug)]
2806struct RefreshIntrospectionState<T> {
2807    // Immutable properties of the MV
2808    refresh_schedule: RefreshSchedule,
2809    initial_as_of: Antichain<T>,
2810    // Refresh state
2811    next_refresh: Datum<'static>,           // Null or an MzTimestamp
2812    last_completed_refresh: Datum<'static>, // Null or an MzTimestamp
2813}
2814
2815impl<T> RefreshIntrospectionState<T> {
2816    /// Return a `Row` reflecting the current refresh introspection state.
2817    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2818        Row::pack_slice(&[
2819            Datum::String(&collection_id.to_string()),
2820            self.last_completed_refresh,
2821            self.next_refresh,
2822        ])
2823    }
2824}
2825
2826impl<T: ComputeControllerTimestamp> RefreshIntrospectionState<T> {
2827    /// Construct a new [`RefreshIntrospectionState`], and apply an initial `frontier_update()` at
2828    /// the `upper`.
2829    fn new(
2830        refresh_schedule: RefreshSchedule,
2831        initial_as_of: Antichain<T>,
2832        upper: &Antichain<T>,
2833    ) -> Self {
2834        let mut self_ = Self {
2835            refresh_schedule: refresh_schedule.clone(),
2836            initial_as_of: initial_as_of.clone(),
2837            next_refresh: Datum::Null,
2838            last_completed_refresh: Datum::Null,
2839        };
2840        self_.frontier_update(upper);
2841        self_
2842    }
2843
2844    /// Should be called whenever the write frontier of the collection advances. It updates the
2845    /// state that should be recorded in introspection relations, but doesn't send the updates yet.
2846    fn frontier_update(&mut self, write_frontier: &Antichain<T>) {
2847        if write_frontier.is_empty() {
2848            self.last_completed_refresh =
2849                if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2850                    last_refresh.into()
2851                } else {
2852                    // If there is no last refresh, then we have a `REFRESH EVERY`, in which case
2853                    // the saturating roundup puts a refresh at the maximum possible timestamp.
2854                    T::maximum().into()
2855                };
2856            self.next_refresh = Datum::Null;
2857        } else {
2858            if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2859                // We are before the first refresh.
2860                self.last_completed_refresh = Datum::Null;
2861                let initial_as_of = self.initial_as_of.as_option().expect(
2862                    "initial_as_of can't be [], because then there would be no refreshes at all",
2863                );
2864                let first_refresh = initial_as_of
2865                    .round_up(&self.refresh_schedule)
2866                    .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2867                soft_assert_or_log!(
2868                    first_refresh == *initial_as_of,
2869                    "initial_as_of should be set to the first refresh"
2870                );
2871                self.next_refresh = first_refresh.into();
2872            } else {
2873                // The first refresh has already happened.
2874                let write_frontier = write_frontier.as_option().expect("checked above");
2875                self.last_completed_refresh = write_frontier
2876                    .round_down_minus_1(&self.refresh_schedule)
2877                    .map_or_else(
2878                        || {
2879                            soft_panic_or_log!(
2880                                "rounding down should have returned the first refresh or later"
2881                            );
2882                            Datum::Null
2883                        },
2884                        |last_completed_refresh| last_completed_refresh.into(),
2885                    );
2886                self.next_refresh = write_frontier.clone().into();
2887            }
2888        }
2889    }
2890}
2891
2892/// A note of an outstanding peek response.
2893#[derive(Debug)]
2894struct PendingPeek<T: Timestamp> {
2895    /// For replica-targeted peeks, this specifies the replica whose response we should pass on.
2896    ///
2897    /// If this value is `None`, we pass on the first response.
2898    target_replica: Option<ReplicaId>,
2899    /// The OpenTelemetry context for this peek.
2900    otel_ctx: OpenTelemetryContext,
2901    /// The time at which the peek was requested.
2902    ///
2903    /// Used to track peek durations.
2904    requested_at: Instant,
2905    /// The read hold installed to serve this peek.
2906    read_hold: ReadHold<T>,
2907    /// The channel to send peek results.
2908    peek_response_tx: oneshot::Sender<PeekResponse>,
2909    /// An optional limit of the peek's result size.
2910    limit: Option<usize>,
2911    /// The offset into the peek's result.
2912    offset: usize,
2913}
2914
2915#[derive(Debug, Clone)]
2916struct ActiveSubscribe<T> {
2917    /// Current upper frontier of this subscribe.
2918    frontier: Antichain<T>,
2919}
2920
2921impl<T: ComputeControllerTimestamp> Default for ActiveSubscribe<T> {
2922    fn default() -> Self {
2923        Self {
2924            frontier: Antichain::from_elem(T::minimum()),
2925        }
2926    }
2927}
2928
2929/// State maintained about individual replicas.
2930#[derive(Debug)]
2931struct ReplicaState<T: ComputeControllerTimestamp> {
2932    /// The ID of the replica.
2933    id: ReplicaId,
2934    /// Client for the running replica task.
2935    client: ReplicaClient<T>,
2936    /// The replica configuration.
2937    config: ReplicaConfig,
2938    /// Replica metrics.
2939    metrics: ReplicaMetrics,
2940    /// A channel through which introspection updates are delivered.
2941    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2942    /// Per-replica collection state.
2943    collections: BTreeMap<GlobalId, ReplicaCollectionState<T>>,
2944    /// The epoch of the replica.
2945    epoch: u64,
2946}
2947
2948impl<T: ComputeControllerTimestamp> ReplicaState<T> {
2949    fn new(
2950        id: ReplicaId,
2951        client: ReplicaClient<T>,
2952        config: ReplicaConfig,
2953        metrics: ReplicaMetrics,
2954        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2955        epoch: u64,
2956    ) -> Self {
2957        Self {
2958            id,
2959            client,
2960            config,
2961            metrics,
2962            introspection_tx,
2963            epoch,
2964            collections: Default::default(),
2965        }
2966    }
2967
2968    /// Add a collection to the replica state.
2969    ///
2970    /// # Panics
2971    ///
2972    /// Panics if a collection with the same ID exists already.
2973    fn add_collection(
2974        &mut self,
2975        id: GlobalId,
2976        as_of: Antichain<T>,
2977        input_read_holds: Vec<ReadHold<T>>,
2978    ) {
2979        let metrics = self.metrics.for_collection(id);
2980        let introspection = ReplicaCollectionIntrospection::new(
2981            self.id,
2982            id,
2983            self.introspection_tx.clone(),
2984            as_of.clone(),
2985        );
2986        let mut state =
2987            ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
2988
2989        // In an effort to keep the produced wallclock lag introspection data small and
2990        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
2991        // select indexes and subscribes.
2992        if id.is_transient() {
2993            state.wallclock_lag_max = None;
2994        }
2995
2996        if let Some(previous) = self.collections.insert(id, state) {
2997            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
2998        }
2999    }
3000
3001    /// Remove state for a collection.
3002    fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState<T>> {
3003        self.collections.remove(&id)
3004    }
3005
3006    /// Returns whether all replica frontiers of the given collection are empty.
3007    fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
3008        self.collections.get(&id).map_or(true, |c| {
3009            c.write_frontier.is_empty()
3010                && c.input_frontier.is_empty()
3011                && c.output_frontier.is_empty()
3012        })
3013    }
3014
3015    /// Returns the state of the [`ReplicaState`] formatted as JSON.
3016    ///
3017    /// The returned value is not guaranteed to be stable and may change at any point in time.
3018    #[mz_ore::instrument(level = "debug")]
3019    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3020        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
3021        // returned object as a tradeoff between usability and stability. `serde_json` will fail
3022        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
3023        // prevents a future unrelated change from silently breaking this method.
3024
3025        // Destructure `self` here so we don't forget to consider dumping newly added fields.
3026        let Self {
3027            id,
3028            client: _,
3029            config: _,
3030            metrics: _,
3031            introspection_tx: _,
3032            epoch,
3033            collections,
3034        } = self;
3035
3036        let collections: BTreeMap<_, _> = collections
3037            .iter()
3038            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
3039            .collect();
3040
3041        Ok(serde_json::json!({
3042            "id": id.to_string(),
3043            "collections": collections,
3044            "epoch": epoch,
3045        }))
3046    }
3047}
3048
3049#[derive(Debug)]
3050struct ReplicaCollectionState<T: ComputeControllerTimestamp> {
3051    /// The replica write frontier of this collection.
3052    ///
3053    /// See [`FrontiersResponse::write_frontier`].
3054    write_frontier: Antichain<T>,
3055    /// The replica input frontier of this collection.
3056    ///
3057    /// See [`FrontiersResponse::input_frontier`].
3058    input_frontier: Antichain<T>,
3059    /// The replica output frontier of this collection.
3060    ///
3061    /// See [`FrontiersResponse::output_frontier`].
3062    output_frontier: Antichain<T>,
3063
3064    /// Metrics tracked for this collection.
3065    ///
3066    /// If this is `None`, no metrics are collected.
3067    metrics: Option<ReplicaCollectionMetrics>,
3068    /// As-of frontier with which this collection was installed on the replica.
3069    as_of: Antichain<T>,
3070    /// Tracks introspection state for this collection.
3071    introspection: ReplicaCollectionIntrospection<T>,
3072    /// Read holds on storage inputs to this collection.
3073    ///
3074    /// These read holds are kept to ensure that the replica is able to read from storage inputs at
3075    /// all times it hasn't read yet. We only need to install read holds for storage inputs since
3076    /// compaction of compute inputs is implicitly held back by Timely/DD.
3077    input_read_holds: Vec<ReadHold<T>>,
3078
3079    /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
3080    ///
3081    /// If this is `None`, wallclock lag is not tracked for this collection.
3082    wallclock_lag_max: Option<WallclockLag>,
3083}
3084
3085impl<T: ComputeControllerTimestamp> ReplicaCollectionState<T> {
3086    fn new(
3087        metrics: Option<ReplicaCollectionMetrics>,
3088        as_of: Antichain<T>,
3089        introspection: ReplicaCollectionIntrospection<T>,
3090        input_read_holds: Vec<ReadHold<T>>,
3091    ) -> Self {
3092        Self {
3093            write_frontier: as_of.clone(),
3094            input_frontier: as_of.clone(),
3095            output_frontier: as_of.clone(),
3096            metrics,
3097            as_of,
3098            introspection,
3099            input_read_holds,
3100            wallclock_lag_max: Some(WallclockLag::MIN),
3101        }
3102    }
3103
3104    /// Returns whether this collection is hydrated.
3105    fn hydrated(&self) -> bool {
3106        // If the observed frontier is greater than the collection's as-of, the collection has
3107        // produced some output and is therefore hydrated.
3108        //
3109        // We need to consider the edge case where the as-of is the empty frontier. Such an as-of
3110        // is not useful for indexes, because they wouldn't be readable. For write-only
3111        // collections, an empty as-of means that the collection has been fully written and no new
3112        // dataflow needs to be created for it. Consequently, no hydration will happen either.
3113        //
3114        // Based on this, we could respond in two ways:
3115        //  * `false`, as in "the dataflow was never created"
3116        //  * `true`, as in "the dataflow completed immediately"
3117        //
3118        // Since hydration is often used as a measure of dataflow progress and we don't want to
3119        // give the impression that certain dataflows are somehow stuck when they are not, we go
3120        // with the second interpretation here.
3121        self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3122    }
3123
3124    /// Updates the replica write frontier of this collection.
3125    fn update_write_frontier(&mut self, new_frontier: Antichain<T>) {
3126        if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3127            soft_panic_or_log!(
3128                "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3129                self.write_frontier,
3130            );
3131            return;
3132        } else if new_frontier == self.write_frontier {
3133            return;
3134        }
3135
3136        self.write_frontier = new_frontier;
3137    }
3138
3139    /// Updates the replica input frontier of this collection.
3140    fn update_input_frontier(&mut self, new_frontier: Antichain<T>) {
3141        if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3142            soft_panic_or_log!(
3143                "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3144                self.input_frontier,
3145            );
3146            return;
3147        } else if new_frontier == self.input_frontier {
3148            return;
3149        }
3150
3151        self.input_frontier = new_frontier;
3152
3153        // Relax our read holds on collection inputs.
3154        for read_hold in &mut self.input_read_holds {
3155            let result = read_hold.try_downgrade(self.input_frontier.clone());
3156            soft_assert_or_log!(
3157                result.is_ok(),
3158                "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3159                self.input_frontier,
3160            );
3161        }
3162    }
3163
3164    /// Updates the replica output frontier of this collection.
3165    fn update_output_frontier(&mut self, new_frontier: Antichain<T>) {
3166        if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3167            soft_panic_or_log!(
3168                "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3169                self.output_frontier,
3170            );
3171            return;
3172        } else if new_frontier == self.output_frontier {
3173            return;
3174        }
3175
3176        self.output_frontier = new_frontier;
3177    }
3178}
3179
3180/// Maintains the introspection state for a given replica and collection, and ensures that reported
3181/// introspection data is retracted when the collection is dropped.
3182#[derive(Debug)]
3183struct ReplicaCollectionIntrospection<T: ComputeControllerTimestamp> {
3184    /// The ID of the replica.
3185    replica_id: ReplicaId,
3186    /// The ID of the compute collection.
3187    collection_id: GlobalId,
3188    /// The collection's reported replica write frontier.
3189    write_frontier: Antichain<T>,
3190    /// A channel through which introspection updates are delivered.
3191    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3192}
3193
3194impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
3195    /// Create a new `HydrationState` and initialize introspection.
3196    fn new(
3197        replica_id: ReplicaId,
3198        collection_id: GlobalId,
3199        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3200        as_of: Antichain<T>,
3201    ) -> Self {
3202        let self_ = Self {
3203            replica_id,
3204            collection_id,
3205            write_frontier: as_of,
3206            introspection_tx,
3207        };
3208
3209        self_.report_initial_state();
3210        self_
3211    }
3212
3213    /// Reports the initial introspection state.
3214    fn report_initial_state(&self) {
3215        let row = self.write_frontier_row();
3216        let updates = vec![(row, Diff::ONE)];
3217        self.send(IntrospectionType::ReplicaFrontiers, updates);
3218    }
3219
3220    /// Observe the given current write frontier and update the introspection state as necessary.
3221    fn observe_frontier(&mut self, write_frontier: &Antichain<T>) {
3222        if self.write_frontier == *write_frontier {
3223            return; // no change
3224        }
3225
3226        let retraction = self.write_frontier_row();
3227        self.write_frontier.clone_from(write_frontier);
3228        let insertion = self.write_frontier_row();
3229
3230        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3231        self.send(IntrospectionType::ReplicaFrontiers, updates);
3232    }
3233
3234    /// Return a `Row` reflecting the current replica write frontier.
3235    fn write_frontier_row(&self) -> Row {
3236        let write_frontier = self
3237            .write_frontier
3238            .as_option()
3239            .map_or(Datum::Null, |ts| ts.clone().into());
3240        Row::pack_slice(&[
3241            Datum::String(&self.collection_id.to_string()),
3242            Datum::String(&self.replica_id.to_string()),
3243            write_frontier,
3244        ])
3245    }
3246
3247    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3248        // Failure to send means the `ComputeController` has been dropped and doesn't care about
3249        // introspection updates anymore.
3250        let _ = self.introspection_tx.send((introspection_type, updates));
3251    }
3252}
3253
3254impl<T: ComputeControllerTimestamp> Drop for ReplicaCollectionIntrospection<T> {
3255    fn drop(&mut self) {
3256        // Retract the write frontier.
3257        let row = self.write_frontier_row();
3258        let updates = vec![(row, Diff::MINUS_ONE)];
3259        self.send(IntrospectionType::ReplicaFrontiers, updates);
3260    }
3261}