mz_compute_client/controller/
instance.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller for a compute instance.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::ComputeInstanceId;
21use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
22use mz_compute_types::plan::render_plan::RenderPlan;
23use mz_compute_types::sinks::{
24    ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, MaterializedViewSinkConnection,
25};
26use mz_compute_types::sources::SourceInstanceDesc;
27use mz_controller_types::dyncfgs::{
28    ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, WALLCLOCK_LAG_RECORDING_INTERVAL,
29};
30use mz_dyncfg::ConfigSet;
31use mz_expr::RowSetFinishing;
32use mz_ore::cast::CastFrom;
33use mz_ore::channel::instrumented_unbounded_channel;
34use mz_ore::now::NowFn;
35use mz_ore::tracing::OpenTelemetryContext;
36use mz_ore::{soft_assert_or_log, soft_panic_or_log};
37use mz_persist_types::PersistLocation;
38use mz_repr::adt::timestamp::CheckedTimestamp;
39use mz_repr::refresh_schedule::RefreshSchedule;
40use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row};
41use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
42use mz_storage_types::read_holds::{self, ReadHold};
43use mz_storage_types::read_policy::ReadPolicy;
44use serde::Serialize;
45use thiserror::Error;
46use timely::PartialOrder;
47use timely::progress::frontier::MutableAntichain;
48use timely::progress::{Antichain, ChangeBatch, Timestamp};
49use tokio::sync::mpsc::error::SendError;
50use tokio::sync::{mpsc, oneshot};
51use uuid::Uuid;
52
53use crate::controller::error::{
54    CollectionLookupError, CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
55};
56use crate::controller::replica::{ReplicaClient, ReplicaConfig};
57use crate::controller::{
58    ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, PeekNotification,
59    ReplicaId, StorageCollections,
60};
61use crate::logging::LogVariant;
62use crate::metrics::IntCounter;
63use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
64use crate::protocol::command::{
65    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
66};
67use crate::protocol::history::ComputeCommandHistory;
68use crate::protocol::response::{
69    ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StatusResponse,
70    SubscribeBatch, SubscribeResponse,
71};
72
73#[derive(Error, Debug)]
74#[error("replica exists already: {0}")]
75pub(super) struct ReplicaExists(pub ReplicaId);
76
77#[derive(Error, Debug)]
78#[error("replica does not exist: {0}")]
79pub(super) struct ReplicaMissing(pub ReplicaId);
80
81#[derive(Error, Debug)]
82pub(super) enum DataflowCreationError {
83    #[error("collection does not exist: {0}")]
84    CollectionMissing(GlobalId),
85    #[error("replica does not exist: {0}")]
86    ReplicaMissing(ReplicaId),
87    #[error("dataflow definition lacks an as_of value")]
88    MissingAsOf,
89    #[error("subscribe dataflow has an empty as_of")]
90    EmptyAsOfForSubscribe,
91    #[error("copy to dataflow has an empty as_of")]
92    EmptyAsOfForCopyTo,
93    #[error("no read hold provided for dataflow import: {0}")]
94    ReadHoldMissing(GlobalId),
95    #[error("insufficient read hold provided for dataflow import: {0}")]
96    ReadHoldInsufficient(GlobalId),
97}
98
99impl From<CollectionMissing> for DataflowCreationError {
100    fn from(error: CollectionMissing) -> Self {
101        Self::CollectionMissing(error.0)
102    }
103}
104
105#[derive(Error, Debug)]
106pub(super) enum PeekError {
107    #[error("replica does not exist: {0}")]
108    ReplicaMissing(ReplicaId),
109    #[error("read hold ID does not match peeked collection: {0}")]
110    ReadHoldIdMismatch(GlobalId),
111    #[error("insufficient read hold provided: {0}")]
112    ReadHoldInsufficient(GlobalId),
113}
114
115#[derive(Error, Debug)]
116pub(super) enum ReadPolicyError {
117    #[error("collection does not exist: {0}")]
118    CollectionMissing(GlobalId),
119    #[error("collection is write-only: {0}")]
120    WriteOnlyCollection(GlobalId),
121}
122
123impl From<CollectionMissing> for ReadPolicyError {
124    fn from(error: CollectionMissing) -> Self {
125        Self::CollectionMissing(error.0)
126    }
127}
128
129/// A command sent to an [`Instance`] task.
130pub type Command<T> = Box<dyn FnOnce(&mut Instance<T>) + Send>;
131
132/// A client for an [`Instance`] task.
133#[derive(Clone, derivative::Derivative)]
134#[derivative(Debug)]
135pub(super) struct Client<T: ComputeControllerTimestamp> {
136    /// A sender for commands for the instance.
137    command_tx: mpsc::UnboundedSender<Command<T>>,
138    /// A sender for read hold changes for collections installed on the instance.
139    #[derivative(Debug = "ignore")]
140    read_hold_tx: read_holds::ChangeTx<T>,
141}
142
143impl<T: ComputeControllerTimestamp> Client<T> {
144    pub fn send(&self, command: Command<T>) -> Result<(), SendError<Command<T>>> {
145        self.command_tx.send(command)
146    }
147
148    pub fn read_hold_tx(&self) -> read_holds::ChangeTx<T> {
149        Arc::clone(&self.read_hold_tx)
150    }
151}
152
153impl<T> Client<T>
154where
155    T: ComputeControllerTimestamp,
156{
157    pub fn spawn(
158        id: ComputeInstanceId,
159        build_info: &'static BuildInfo,
160        storage: StorageCollections<T>,
161        peek_stash_persist_location: PersistLocation,
162        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
163        metrics: InstanceMetrics,
164        now: NowFn,
165        wallclock_lag: WallclockLagFn<T>,
166        dyncfg: Arc<ConfigSet>,
167        response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
168        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
169    ) -> Self {
170        let (command_tx, command_rx) = mpsc::unbounded_channel();
171
172        let read_hold_tx: read_holds::ChangeTx<_> = {
173            let command_tx = command_tx.clone();
174            Arc::new(move |id, change: ChangeBatch<_>| {
175                let cmd: Command<_> = {
176                    let change = change.clone();
177                    Box::new(move |i| i.apply_read_hold_change(id, change.clone()))
178                };
179                command_tx.send(cmd).map_err(|_| SendError((id, change)))
180            })
181        };
182
183        mz_ore::task::spawn(
184            || format!("compute-instance-{id}"),
185            Instance::new(
186                build_info,
187                storage,
188                peek_stash_persist_location,
189                arranged_logs,
190                metrics,
191                now,
192                wallclock_lag,
193                dyncfg,
194                command_rx,
195                response_tx,
196                Arc::clone(&read_hold_tx),
197                introspection_tx,
198            )
199            .run(),
200        );
201
202        Self {
203            command_tx,
204            read_hold_tx,
205        }
206    }
207}
208
209/// A response from a replica, composed of a replica ID, the replica's current epoch, and the
210/// compute response itself.
211pub(super) type ReplicaResponse<T> = (ReplicaId, u64, ComputeResponse<T>);
212
213/// The state we keep for a compute instance.
214pub(super) struct Instance<T: ComputeControllerTimestamp> {
215    /// Build info for spawning replicas
216    build_info: &'static BuildInfo,
217    /// A handle providing access to storage collections.
218    storage_collections: StorageCollections<T>,
219    /// Whether instance initialization has been completed.
220    initialized: bool,
221    /// Whether or not this instance is in read-only mode.
222    ///
223    /// When in read-only mode, neither the controller nor the instances
224    /// controlled by it are allowed to affect changes to external systems
225    /// (largely persist).
226    read_only: bool,
227    /// The workload class of this instance.
228    ///
229    /// This is currently only used to annotate metrics.
230    workload_class: Option<String>,
231    /// The replicas of this compute instance.
232    replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
233    /// Currently installed compute collections.
234    ///
235    /// New entries are added for all collections exported from dataflows created through
236    /// [`Instance::create_dataflow`].
237    ///
238    /// Entries are removed by [`Instance::cleanup_collections`]. See that method's documentation
239    /// about the conditions for removing collection state.
240    collections: BTreeMap<GlobalId, CollectionState<T>>,
241    /// IDs of log sources maintained by this compute instance.
242    log_sources: BTreeMap<LogVariant, GlobalId>,
243    /// Currently outstanding peeks.
244    ///
245    /// New entries are added for all peeks initiated through [`Instance::peek`].
246    ///
247    /// The entry for a peek is only removed once all replicas have responded to the peek. This is
248    /// currently required to ensure all replicas have stopped reading from the peeked collection's
249    /// inputs before we allow them to compact. database-issues#4822 tracks changing this so we only have to wait
250    /// for the first peek response.
251    peeks: BTreeMap<Uuid, PendingPeek<T>>,
252    /// Currently in-progress subscribes.
253    ///
254    /// New entries are added for all subscribes exported from dataflows created through
255    /// [`Instance::create_dataflow`].
256    ///
257    /// The entry for a subscribe is removed once at least one replica has reported the subscribe
258    /// to have advanced to the empty frontier or to have been dropped, implying that no further
259    /// updates will be emitted for this subscribe.
260    ///
261    /// Note that subscribes are tracked both in `collections` and `subscribes`. `collections`
262    /// keeps track of the subscribe's upper and since frontiers and ensures appropriate read holds
263    /// on the subscribe's input. `subscribes` is only used to track which updates have been
264    /// emitted, to decide if new ones should be emitted or suppressed.
265    subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>,
266    /// Tracks all in-progress COPY TOs.
267    ///
268    /// New entries are added for all s3 oneshot sinks (corresponding to a COPY TO) exported from
269    /// dataflows created through [`Instance::create_dataflow`].
270    ///
271    /// The entry for a copy to is removed once at least one replica has finished
272    /// or the exporting collection is dropped.
273    copy_tos: BTreeSet<GlobalId>,
274    /// The command history, used when introducing new replicas or restarting existing replicas.
275    history: ComputeCommandHistory<UIntGauge, T>,
276    /// Receiver for commands to be executed.
277    command_rx: mpsc::UnboundedReceiver<Command<T>>,
278    /// Sender for responses to be delivered.
279    response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
280    /// Sender for introspection updates to be recorded.
281    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
282    /// The registry the controller uses to report metrics.
283    metrics: InstanceMetrics,
284    /// Dynamic system configuration.
285    dyncfg: Arc<ConfigSet>,
286
287    /// The persist location where we can stash large peek results.
288    peek_stash_persist_location: PersistLocation,
289
290    /// A function that produces the current wallclock time.
291    now: NowFn,
292    /// A function that computes the lag between the given time and wallclock time.
293    wallclock_lag: WallclockLagFn<T>,
294    /// The last time wallclock lag introspection was recorded.
295    wallclock_lag_last_recorded: DateTime<Utc>,
296
297    /// Sender for updates to collection read holds.
298    ///
299    /// Copies of this sender are given to [`ReadHold`]s that are created in
300    /// [`CollectionState::new`].
301    read_hold_tx: read_holds::ChangeTx<T>,
302    /// A sender for responses from replicas.
303    replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
304    /// A receiver for responses from replicas.
305    replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse<T>, IntCounter>,
306}
307
308impl<T: ComputeControllerTimestamp> Instance<T> {
309    /// Acquire a handle to the collection state associated with `id`.
310    fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, CollectionMissing> {
311        self.collections.get(&id).ok_or(CollectionMissing(id))
312    }
313
314    /// Acquire a mutable handle to the collection state associated with `id`.
315    fn collection_mut(
316        &mut self,
317        id: GlobalId,
318    ) -> Result<&mut CollectionState<T>, CollectionMissing> {
319        self.collections.get_mut(&id).ok_or(CollectionMissing(id))
320    }
321
322    /// Acquire a handle to the collection state associated with `id`.
323    ///
324    /// # Panics
325    ///
326    /// Panics if the identified collection does not exist.
327    fn expect_collection(&self, id: GlobalId) -> &CollectionState<T> {
328        self.collections.get(&id).expect("collection must exist")
329    }
330
331    /// Acquire a mutable handle to the collection state associated with `id`.
332    ///
333    /// # Panics
334    ///
335    /// Panics if the identified collection does not exist.
336    fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T> {
337        self.collections
338            .get_mut(&id)
339            .expect("collection must exist")
340    }
341
342    fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState<T>)> {
343        self.collections.iter().map(|(id, coll)| (*id, coll))
344    }
345
346    /// Add a collection to the instance state.
347    ///
348    /// # Panics
349    ///
350    /// Panics if a collection with the same ID exists already.
351    fn add_collection(
352        &mut self,
353        id: GlobalId,
354        as_of: Antichain<T>,
355        shared: SharedCollectionState<T>,
356        storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
357        compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
358        replica_input_read_holds: Vec<ReadHold<T>>,
359        write_only: bool,
360        storage_sink: bool,
361        initial_as_of: Option<Antichain<T>>,
362        refresh_schedule: Option<RefreshSchedule>,
363    ) {
364        // Add global collection state.
365        let introspection = CollectionIntrospection::new(
366            id,
367            self.introspection_tx.clone(),
368            as_of.clone(),
369            storage_sink,
370            initial_as_of,
371            refresh_schedule,
372        );
373        let mut state = CollectionState::new(
374            id,
375            as_of.clone(),
376            shared,
377            storage_dependencies,
378            compute_dependencies,
379            Arc::clone(&self.read_hold_tx),
380            introspection,
381        );
382        // If the collection is write-only, clear its read policy to reflect that.
383        if write_only {
384            state.read_policy = None;
385        }
386
387        if let Some(previous) = self.collections.insert(id, state) {
388            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
389        }
390
391        // Add per-replica collection state.
392        for replica in self.replicas.values_mut() {
393            replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
394        }
395
396        // Update introspection.
397        self.report_dependency_updates(id, Diff::ONE);
398    }
399
400    fn remove_collection(&mut self, id: GlobalId) {
401        // Update introspection.
402        self.report_dependency_updates(id, Diff::MINUS_ONE);
403
404        // Remove per-replica collection state.
405        for replica in self.replicas.values_mut() {
406            replica.remove_collection(id);
407        }
408
409        // Remove global collection state.
410        self.collections.remove(&id);
411    }
412
413    fn add_replica_state(
414        &mut self,
415        id: ReplicaId,
416        client: ReplicaClient<T>,
417        config: ReplicaConfig,
418        epoch: u64,
419    ) {
420        let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
421
422        let metrics = self.metrics.for_replica(id);
423        let mut replica = ReplicaState::new(
424            id,
425            client,
426            config,
427            metrics,
428            self.introspection_tx.clone(),
429            epoch,
430        );
431
432        // Add per-replica collection state.
433        for (collection_id, collection) in &self.collections {
434            // Skip log collections not maintained by this replica.
435            if collection.log_collection && !log_ids.contains(collection_id) {
436                continue;
437            }
438
439            let as_of = if collection.log_collection {
440                // For log collections, we don't send a `CreateDataflow` command to the replica, so
441                // it doesn't know which as-of the controler chose and defaults to the minimum
442                // frontier instead. We need to initialize the controller-side tracking with the
443                // same frontier, to avoid observing regressions in the reported frontiers.
444                Antichain::from_elem(T::minimum())
445            } else {
446                collection.read_frontier().to_owned()
447            };
448
449            let input_read_holds = collection.storage_dependencies.values().cloned().collect();
450            replica.add_collection(*collection_id, as_of, input_read_holds);
451        }
452
453        self.replicas.insert(id, replica);
454    }
455
456    /// Enqueue the given response for delivery to the controller clients.
457    fn deliver_response(&self, response: ComputeControllerResponse<T>) {
458        // Failure to send means the `ComputeController` has been dropped and doesn't care about
459        // responses anymore.
460        let _ = self.response_tx.send(response);
461    }
462
463    /// Enqueue the given introspection updates for recording.
464    fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
465        // Failure to send means the `ComputeController` has been dropped and doesn't care about
466        // introspection updates anymore.
467        let _ = self.introspection_tx.send((type_, updates));
468    }
469
470    /// Returns whether the identified replica exists.
471    fn replica_exists(&self, id: ReplicaId) -> bool {
472        self.replicas.contains_key(&id)
473    }
474
475    /// Return the IDs of pending peeks targeting the specified replica.
476    fn peeks_targeting(
477        &self,
478        replica_id: ReplicaId,
479    ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)> {
480        self.peeks.iter().filter_map(move |(uuid, peek)| {
481            if peek.target_replica == Some(replica_id) {
482                Some((*uuid, peek))
483            } else {
484                None
485            }
486        })
487    }
488
489    /// Return the IDs of in-progress subscribes targeting the specified replica.
490    fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
491        self.subscribes.iter().filter_map(move |(id, subscribe)| {
492            let targeting = subscribe.target_replica == Some(replica_id);
493            targeting.then_some(*id)
494        })
495    }
496
497    /// Update introspection with the current collection frontiers.
498    ///
499    /// We could also do this directly in response to frontier changes, but doing it periodically
500    /// lets us avoid emitting some introspection updates that can be consolidated (e.g. a write
501    /// frontier updated immediately followed by a read frontier update).
502    ///
503    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
504    /// per second during normal operation.
505    fn update_frontier_introspection(&mut self) {
506        for collection in self.collections.values_mut() {
507            collection
508                .introspection
509                .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
510        }
511
512        for replica in self.replicas.values_mut() {
513            for collection in replica.collections.values_mut() {
514                collection
515                    .introspection
516                    .observe_frontier(&collection.write_frontier);
517            }
518        }
519    }
520
521    /// Refresh the controller state metrics for this instance.
522    ///
523    /// We could also do state metric updates directly in response to state changes, but that would
524    /// mean littering the code with metric update calls. Encapsulating state metric maintenance in
525    /// a single method is less noisy.
526    ///
527    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
528    /// per second during normal operation.
529    fn refresh_state_metrics(&self) {
530        let unscheduled_collections_count =
531            self.collections.values().filter(|c| !c.scheduled).count();
532        let connected_replica_count = self
533            .replicas
534            .values()
535            .filter(|r| r.client.is_connected())
536            .count();
537
538        self.metrics
539            .replica_count
540            .set(u64::cast_from(self.replicas.len()));
541        self.metrics
542            .collection_count
543            .set(u64::cast_from(self.collections.len()));
544        self.metrics
545            .collection_unscheduled_count
546            .set(u64::cast_from(unscheduled_collections_count));
547        self.metrics
548            .peek_count
549            .set(u64::cast_from(self.peeks.len()));
550        self.metrics
551            .subscribe_count
552            .set(u64::cast_from(self.subscribes.len()));
553        self.metrics
554            .copy_to_count
555            .set(u64::cast_from(self.copy_tos.len()));
556        self.metrics
557            .connected_replica_count
558            .set(u64::cast_from(connected_replica_count));
559    }
560
561    /// Refresh the wallclock lag introspection and metrics with the current lag values.
562    ///
563    /// This method produces wallclock lag metrics of two different shapes:
564    ///
565    /// * Histories: For each replica and each collection, we measure the lag of the write frontier
566    ///   behind the wallclock time every second. Every minute we emit the maximum lag observed
567    ///   over the last minute, together with the current time.
568    /// * Histograms: For each collection, we measure the lag of the write frontier behind
569    ///   wallclock time every second. Every minute we emit all lags observed over the last minute,
570    ///   together with the current histogram period.
571    ///
572    /// Histories are emitted to both Mz introspection and Prometheus, histograms only to
573    /// introspection. We treat lags of unreadable collections (i.e. collections that contain no
574    /// readable times) as undefined and set them to NULL in introspection and `u64::MAX` in
575    /// Prometheus.
576    ///
577    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
578    /// per second during normal operation.
579    fn refresh_wallclock_lag(&mut self) {
580        let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
581            Some(ts) => (self.wallclock_lag)(ts.clone()),
582            None => Duration::ZERO,
583        };
584
585        let now_ms = (self.now)();
586        let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
587        let histogram_labels = match &self.workload_class {
588            Some(wc) => [("workload_class", wc.clone())].into(),
589            None => BTreeMap::new(),
590        };
591
592        // First, iterate over all collections and collect histogram measurements.
593        // We keep a record of unreadable collections, so we can emit undefined lags for those here
594        // and below when we collect history measurements.
595        let mut unreadable_collections = BTreeSet::new();
596        for (id, collection) in &mut self.collections {
597            // We need to ask the storage controller for the read frontiers of storage collections.
598            let read_frontier = match self.storage_collections.collection_frontiers(*id) {
599                Ok(f) => f.read_capabilities,
600                Err(_) => collection.read_frontier(),
601            };
602            let write_frontier = collection.write_frontier();
603            let collection_unreadable = PartialOrder::less_equal(&write_frontier, &read_frontier);
604            if collection_unreadable {
605                unreadable_collections.insert(id);
606            }
607
608            if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
609                let bucket = if collection_unreadable {
610                    WallclockLag::Undefined
611                } else {
612                    let lag = frontier_lag(&write_frontier);
613                    let lag = lag.as_secs().next_power_of_two();
614                    WallclockLag::Seconds(lag)
615                };
616
617                let key = (histogram_period, bucket, histogram_labels.clone());
618                *stash.entry(key).or_default() += Diff::ONE;
619            }
620        }
621
622        // Second, iterate over all per-replica collections and collect history measurements.
623        for replica in self.replicas.values_mut() {
624            for (id, collection) in &mut replica.collections {
625                let lag = if unreadable_collections.contains(&id) {
626                    WallclockLag::Undefined
627                } else {
628                    let lag = frontier_lag(&collection.write_frontier);
629                    WallclockLag::Seconds(lag.as_secs())
630                };
631
632                if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
633                    *wallclock_lag_max = (*wallclock_lag_max).max(lag);
634                }
635
636                if let Some(metrics) = &mut collection.metrics {
637                    // No way to specify values as undefined in Prometheus metrics, so we use the
638                    // maximum value instead.
639                    let secs = lag.unwrap_seconds_or(u64::MAX);
640                    metrics.wallclock_lag.observe(secs);
641                };
642            }
643        }
644
645        // Record lags to persist, if it's time.
646        self.maybe_record_wallclock_lag();
647    }
648
649    /// Produce new wallclock lag introspection updates, provided enough time has passed since the
650    /// last recording.
651    //
652    /// We emit new introspection updates if the system time has passed into a new multiple of the
653    /// recording interval (typically 1 minute) since the last refresh. The storage controller uses
654    /// the same approach, ensuring that both controllers commit their lags at roughly the same
655    /// time, avoiding confusion caused by inconsistencies.
656    fn maybe_record_wallclock_lag(&mut self) {
657        if self.read_only {
658            return;
659        }
660
661        let duration_trunc = |datetime: DateTime<_>, interval| {
662            let td = TimeDelta::from_std(interval).ok()?;
663            datetime.duration_trunc(td).ok()
664        };
665
666        let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
667        let now_dt = mz_ore::now::to_datetime((self.now)());
668        let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
669            soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
670            let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
671            duration_trunc(now_dt, *default).unwrap()
672        });
673        if now_trunc <= self.wallclock_lag_last_recorded {
674            return;
675        }
676
677        let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
678
679        let mut history_updates = Vec::new();
680        for (replica_id, replica) in &mut self.replicas {
681            for (collection_id, collection) in &mut replica.collections {
682                let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
683                    continue;
684                };
685
686                let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
687                let row = Row::pack_slice(&[
688                    Datum::String(&collection_id.to_string()),
689                    Datum::String(&replica_id.to_string()),
690                    max_lag.into_interval_datum(),
691                    Datum::TimestampTz(now_ts),
692                ]);
693                history_updates.push((row, Diff::ONE));
694            }
695        }
696        if !history_updates.is_empty() {
697            self.deliver_introspection_updates(
698                IntrospectionType::WallclockLagHistory,
699                history_updates,
700            );
701        }
702
703        let mut histogram_updates = Vec::new();
704        let mut row_buf = Row::default();
705        for (collection_id, collection) in &mut self.collections {
706            let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
707                continue;
708            };
709
710            for ((period, lag, labels), count) in std::mem::take(stash) {
711                let mut packer = row_buf.packer();
712                packer.extend([
713                    Datum::TimestampTz(period.start),
714                    Datum::TimestampTz(period.end),
715                    Datum::String(&collection_id.to_string()),
716                    lag.into_uint64_datum(),
717                ]);
718                let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
719                packer.push_dict(labels);
720
721                histogram_updates.push((row_buf.clone(), count));
722            }
723        }
724        if !histogram_updates.is_empty() {
725            self.deliver_introspection_updates(
726                IntrospectionType::WallclockLagHistogram,
727                histogram_updates,
728            );
729        }
730
731        self.wallclock_lag_last_recorded = now_trunc;
732    }
733
734    /// Report updates (inserts or retractions) to the identified collection's dependencies.
735    ///
736    /// # Panics
737    ///
738    /// Panics if the identified collection does not exist.
739    fn report_dependency_updates(&self, id: GlobalId, diff: Diff) {
740        let collection = self.expect_collection(id);
741        let dependencies = collection.dependency_ids();
742
743        let updates = dependencies
744            .map(|dependency_id| {
745                let row = Row::pack_slice(&[
746                    Datum::String(&id.to_string()),
747                    Datum::String(&dependency_id.to_string()),
748                ]);
749                (row, diff)
750            })
751            .collect();
752
753        self.deliver_introspection_updates(IntrospectionType::ComputeDependencies, updates);
754    }
755
756    /// Returns `true` if the given collection is hydrated on at least one
757    /// replica.
758    ///
759    /// This also returns `true` in case this cluster does not have any
760    /// replicas.
761    #[mz_ore::instrument(level = "debug")]
762    pub fn collection_hydrated(
763        &self,
764        collection_id: GlobalId,
765    ) -> Result<bool, CollectionLookupError> {
766        if self.replicas.is_empty() {
767            return Ok(true);
768        }
769
770        for replica_state in self.replicas.values() {
771            let collection_state = replica_state
772                .collections
773                .get(&collection_id)
774                .ok_or(CollectionLookupError::CollectionMissing(collection_id))?;
775
776            if collection_state.hydrated() {
777                return Ok(true);
778            }
779        }
780
781        Ok(false)
782    }
783
784    /// Returns `true` if each non-transient, non-excluded collection is hydrated on at
785    /// least one replica.
786    ///
787    /// This also returns `true` in case this cluster does not have any
788    /// replicas.
789    #[mz_ore::instrument(level = "debug")]
790    pub fn collections_hydrated_on_replicas(
791        &self,
792        target_replica_ids: Option<Vec<ReplicaId>>,
793        exclude_collections: &BTreeSet<GlobalId>,
794    ) -> Result<bool, HydrationCheckBadTarget> {
795        if self.replicas.is_empty() {
796            return Ok(true);
797        }
798        let mut all_hydrated = true;
799        let target_replicas: BTreeSet<ReplicaId> = self
800            .replicas
801            .keys()
802            .filter_map(|id| match target_replica_ids {
803                None => Some(id.clone()),
804                Some(ref ids) if ids.contains(id) => Some(id.clone()),
805                Some(_) => None,
806            })
807            .collect();
808        if let Some(targets) = target_replica_ids {
809            if target_replicas.is_empty() {
810                return Err(HydrationCheckBadTarget(targets));
811            }
812        }
813
814        for (id, _collection) in self.collections_iter() {
815            if id.is_transient() || exclude_collections.contains(&id) {
816                continue;
817            }
818
819            let mut collection_hydrated = false;
820            for replica_state in self.replicas.values() {
821                if !target_replicas.contains(&replica_state.id) {
822                    continue;
823                }
824                let collection_state = replica_state
825                    .collections
826                    .get(&id)
827                    .expect("missing collection state");
828
829                if collection_state.hydrated() {
830                    collection_hydrated = true;
831                    break;
832                }
833            }
834
835            if !collection_hydrated {
836                tracing::info!("collection {id} is not hydrated on any replica");
837                all_hydrated = false;
838                // We continue with our loop instead of breaking out early, so
839                // that we log all non-hydrated replicas.
840            }
841        }
842
843        Ok(all_hydrated)
844    }
845
846    /// Returns `true` if all non-transient, non-excluded collections are hydrated on at least one
847    /// replica.
848    ///
849    /// This also returns `true` in case this cluster does not have any
850    /// replicas.
851    #[mz_ore::instrument(level = "debug")]
852    pub fn collections_hydrated(&self, exclude_collections: &BTreeSet<GlobalId>) -> bool {
853        self.collections_hydrated_on_replicas(None, exclude_collections)
854            .expect("Cannot error if target_replica_ids is None")
855    }
856
857    /// Clean up collection state that is not needed anymore.
858    ///
859    /// Three conditions need to be true before we can remove state for a collection:
860    ///
861    ///  1. A client must have explicitly dropped the collection. If that is not the case, clients
862    ///     can still reasonably assume that the controller knows about the collection and can
863    ///     answer queries about it.
864    ///  2. There must be no outstanding read capabilities on the collection. As long as someone
865    ///     still holds read capabilities on a collection, we need to keep it around to be able
866    ///     to properly handle downgrading of said capabilities.
867    ///  3. All replica frontiers for the collection must have advanced to the empty frontier.
868    ///     Advancement to the empty frontiers signals that replicas are done computing the
869    ///     collection and that they won't send more `ComputeResponse`s for it. As long as we might
870    ///     receive responses for a collection we want to keep it around to be able to validate and
871    ///     handle these responses.
872    fn cleanup_collections(&mut self) {
873        let to_remove: Vec<_> = self
874            .collections_iter()
875            .filter(|(id, collection)| {
876                collection.dropped
877                    && collection.shared.lock_read_capabilities(|c| c.is_empty())
878                    && self
879                        .replicas
880                        .values()
881                        .all(|r| r.collection_frontiers_empty(*id))
882            })
883            .map(|(id, _collection)| id)
884            .collect();
885
886        for id in to_remove {
887            self.remove_collection(id);
888        }
889    }
890
891    /// Returns the state of the [`Instance`] formatted as JSON.
892    ///
893    /// The returned value is not guaranteed to be stable and may change at any point in time.
894    #[mz_ore::instrument(level = "debug")]
895    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
896        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
897        // returned object as a tradeoff between usability and stability. `serde_json` will fail
898        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
899        // prevents a future unrelated change from silently breaking this method.
900
901        // Destructure `self` here so we don't forget to consider dumping newly added fields.
902        let Self {
903            build_info: _,
904            storage_collections: _,
905            peek_stash_persist_location: _,
906            initialized,
907            read_only,
908            workload_class,
909            replicas,
910            collections,
911            log_sources: _,
912            peeks,
913            subscribes,
914            copy_tos,
915            history: _,
916            command_rx: _,
917            response_tx: _,
918            introspection_tx: _,
919            metrics: _,
920            dyncfg: _,
921            now: _,
922            wallclock_lag: _,
923            wallclock_lag_last_recorded,
924            read_hold_tx: _,
925            replica_tx: _,
926            replica_rx: _,
927        } = self;
928
929        fn field(
930            key: &str,
931            value: impl Serialize,
932        ) -> Result<(String, serde_json::Value), anyhow::Error> {
933            let value = serde_json::to_value(value)?;
934            Ok((key.to_string(), value))
935        }
936
937        let replicas: BTreeMap<_, _> = replicas
938            .iter()
939            .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
940            .collect::<Result<_, anyhow::Error>>()?;
941        let collections: BTreeMap<_, _> = collections
942            .iter()
943            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
944            .collect();
945        let peeks: BTreeMap<_, _> = peeks
946            .iter()
947            .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
948            .collect();
949        let subscribes: BTreeMap<_, _> = subscribes
950            .iter()
951            .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
952            .collect();
953        let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
954        let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
955
956        let map = serde_json::Map::from_iter([
957            field("initialized", initialized)?,
958            field("read_only", read_only)?,
959            field("workload_class", workload_class)?,
960            field("replicas", replicas)?,
961            field("collections", collections)?,
962            field("peeks", peeks)?,
963            field("subscribes", subscribes)?,
964            field("copy_tos", copy_tos)?,
965            field("wallclock_lag_last_recorded", wallclock_lag_last_recorded)?,
966        ]);
967        Ok(serde_json::Value::Object(map))
968    }
969}
970
971impl<T> Instance<T>
972where
973    T: ComputeControllerTimestamp,
974{
975    fn new(
976        build_info: &'static BuildInfo,
977        storage: StorageCollections<T>,
978        peek_stash_persist_location: PersistLocation,
979        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
980        metrics: InstanceMetrics,
981        now: NowFn,
982        wallclock_lag: WallclockLagFn<T>,
983        dyncfg: Arc<ConfigSet>,
984        command_rx: mpsc::UnboundedReceiver<Command<T>>,
985        response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
986        read_hold_tx: read_holds::ChangeTx<T>,
987        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
988    ) -> Self {
989        let mut collections = BTreeMap::new();
990        let mut log_sources = BTreeMap::new();
991        for (log, id, shared) in arranged_logs {
992            let collection = CollectionState::new_log_collection(
993                id,
994                shared,
995                Arc::clone(&read_hold_tx),
996                introspection_tx.clone(),
997            );
998            collections.insert(id, collection);
999            log_sources.insert(log, id);
1000        }
1001
1002        let history = ComputeCommandHistory::new(metrics.for_history());
1003
1004        let send_count = metrics.response_send_count.clone();
1005        let recv_count = metrics.response_recv_count.clone();
1006        let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
1007
1008        let now_dt = mz_ore::now::to_datetime(now());
1009
1010        Self {
1011            build_info,
1012            storage_collections: storage,
1013            peek_stash_persist_location,
1014            initialized: false,
1015            read_only: true,
1016            workload_class: None,
1017            replicas: Default::default(),
1018            collections,
1019            log_sources,
1020            peeks: Default::default(),
1021            subscribes: Default::default(),
1022            copy_tos: Default::default(),
1023            history,
1024            command_rx,
1025            response_tx,
1026            introspection_tx,
1027            metrics,
1028            dyncfg,
1029            now,
1030            wallclock_lag,
1031            wallclock_lag_last_recorded: now_dt,
1032            read_hold_tx,
1033            replica_tx,
1034            replica_rx,
1035        }
1036    }
1037
1038    async fn run(mut self) {
1039        self.send(ComputeCommand::Hello {
1040            // The nonce is protocol iteration-specific and will be set in
1041            // `ReplicaTask::specialize_command`.
1042            nonce: Uuid::default(),
1043        });
1044
1045        let instance_config = InstanceConfig {
1046            peek_stash_persist_location: self.peek_stash_persist_location.clone(),
1047            // The remaining fields are replica-specific and will be set in
1048            // `ReplicaTask::specialize_command`.
1049            logging: Default::default(),
1050            expiration_offset: Default::default(),
1051        };
1052
1053        self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
1054
1055        loop {
1056            tokio::select! {
1057                command = self.command_rx.recv() => match command {
1058                    Some(cmd) => cmd(&mut self),
1059                    None => break,
1060                },
1061                response = self.replica_rx.recv() => match response {
1062                    Some(response) => self.handle_response(response),
1063                    None => unreachable!("self owns a sender side of the channel"),
1064                }
1065            }
1066        }
1067    }
1068
1069    /// Update instance configuration.
1070    #[mz_ore::instrument(level = "debug")]
1071    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
1072        if let Some(workload_class) = &config_params.workload_class {
1073            self.workload_class = workload_class.clone();
1074        }
1075
1076        let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
1077        self.send(command);
1078    }
1079
1080    /// Marks the end of any initialization commands.
1081    ///
1082    /// Intended to be called by `Controller`, rather than by other code.
1083    /// Calling this method repeatedly has no effect.
1084    #[mz_ore::instrument(level = "debug")]
1085    pub fn initialization_complete(&mut self) {
1086        // The compute protocol requires that `InitializationComplete` is sent only once.
1087        if !self.initialized {
1088            self.send(ComputeCommand::InitializationComplete);
1089            self.initialized = true;
1090        }
1091    }
1092
1093    /// Allows this instance to affect writes to external systems (persist).
1094    ///
1095    /// Calling this method repeatedly has no effect.
1096    #[mz_ore::instrument(level = "debug")]
1097    pub fn allow_writes(&mut self) {
1098        if self.read_only {
1099            self.read_only = false;
1100            self.send(ComputeCommand::AllowWrites);
1101        }
1102    }
1103
1104    /// Shut down this instance.
1105    ///
1106    /// This method runs various assertions ensuring the instance state is empty. It exists to help
1107    /// us find bugs where the client drops a compute instance that still has replicas or
1108    /// collections installed, and later assumes that said replicas/collections still exists.
1109    ///
1110    /// # Panics
1111    ///
1112    /// Panics if the compute instance still has active replicas.
1113    /// Panics if the compute instance still has collections installed.
1114    #[mz_ore::instrument(level = "debug")]
1115    pub fn shutdown(&mut self) {
1116        // Taking the `command_rx` ensures that the [`Instance::run`] loop terminates.
1117        let (_tx, rx) = mpsc::unbounded_channel();
1118        let mut command_rx = std::mem::replace(&mut self.command_rx, rx);
1119
1120        // Apply all outstanding read hold changes. This might cause read hold downgrades to be
1121        // added to `command_tx`, so we need to apply those in a loop.
1122        //
1123        // TODO(teskje): Make `Command` an enum and assert that all received commands are read
1124        // hold downgrades.
1125        while let Ok(cmd) = command_rx.try_recv() {
1126            cmd(self);
1127        }
1128
1129        // Collections might have been dropped but not cleaned up yet.
1130        self.cleanup_collections();
1131
1132        let stray_replicas: Vec<_> = self.replicas.keys().collect();
1133        soft_assert_or_log!(
1134            stray_replicas.is_empty(),
1135            "dropped instance still has provisioned replicas: {stray_replicas:?}",
1136        );
1137
1138        let collections = self.collections.iter();
1139        let stray_collections: Vec<_> = collections
1140            .filter(|(_, c)| !c.log_collection)
1141            .map(|(id, _)| id)
1142            .collect();
1143        soft_assert_or_log!(
1144            stray_collections.is_empty(),
1145            "dropped instance still has installed collections: {stray_collections:?}",
1146        );
1147    }
1148
1149    /// Sends a command to all replicas of this instance.
1150    #[mz_ore::instrument(level = "debug")]
1151    fn send(&mut self, cmd: ComputeCommand<T>) {
1152        // Record the command so that new replicas can be brought up to speed.
1153        self.history.push(cmd.clone());
1154
1155        // Clone the command for each active replica.
1156        for replica in self.replicas.values_mut() {
1157            // Swallow error, we'll notice because the replica task has stopped.
1158            let _ = replica.client.send(cmd.clone());
1159        }
1160    }
1161
1162    /// Add a new instance replica, by ID.
1163    #[mz_ore::instrument(level = "debug")]
1164    pub fn add_replica(
1165        &mut self,
1166        id: ReplicaId,
1167        mut config: ReplicaConfig,
1168        epoch: Option<u64>,
1169    ) -> Result<(), ReplicaExists> {
1170        if self.replica_exists(id) {
1171            return Err(ReplicaExists(id));
1172        }
1173
1174        config.logging.index_logs = self.log_sources.clone();
1175
1176        let epoch = epoch.unwrap_or(1);
1177        let metrics = self.metrics.for_replica(id);
1178        let client = ReplicaClient::spawn(
1179            id,
1180            self.build_info,
1181            config.clone(),
1182            epoch,
1183            metrics.clone(),
1184            Arc::clone(&self.dyncfg),
1185            self.replica_tx.clone(),
1186        );
1187
1188        // Take this opportunity to clean up the history we should present.
1189        self.history.reduce();
1190
1191        // Advance the uppers of source imports
1192        self.history.update_source_uppers(&self.storage_collections);
1193
1194        // Replay the commands at the client, creating new dataflow identifiers.
1195        for command in self.history.iter() {
1196            if client.send(command.clone()).is_err() {
1197                // We swallow the error here. On the next send, we will fail again, and
1198                // restart the connection as well as this rehydration.
1199                tracing::warn!("Replica {:?} connection terminated during hydration", id);
1200                break;
1201            }
1202        }
1203
1204        // Add replica to tracked state.
1205        self.add_replica_state(id, client, config, epoch);
1206
1207        Ok(())
1208    }
1209
1210    /// Remove an existing instance replica, by ID.
1211    #[mz_ore::instrument(level = "debug")]
1212    pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1213        self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1214
1215        // Subscribes targeting this replica either won't be served anymore (if the replica is
1216        // dropped) or might produce inconsistent output (if the target collection is an
1217        // introspection index). We produce an error to inform upstream.
1218        let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1219        for subscribe_id in to_drop {
1220            let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1221            let response = ComputeControllerResponse::SubscribeResponse(
1222                subscribe_id,
1223                SubscribeBatch {
1224                    lower: subscribe.frontier.clone(),
1225                    upper: subscribe.frontier,
1226                    updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1227                },
1228            );
1229            self.deliver_response(response);
1230        }
1231
1232        // Peeks targeting this replica might not be served anymore (if the replica is dropped).
1233        // If the replica has failed it might come back and respond to the peek later, but it still
1234        // seems like a good idea to cancel the peek to inform the caller about the failure. This
1235        // is consistent with how we handle targeted subscribes above.
1236        let mut peek_responses = Vec::new();
1237        let mut to_drop = Vec::new();
1238        for (uuid, peek) in self.peeks_targeting(id) {
1239            peek_responses.push(ComputeControllerResponse::PeekNotification(
1240                uuid,
1241                PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1242                peek.otel_ctx.clone(),
1243            ));
1244            to_drop.push(uuid);
1245        }
1246        for response in peek_responses {
1247            self.deliver_response(response);
1248        }
1249        for uuid in to_drop {
1250            let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1251            self.finish_peek(uuid, response);
1252        }
1253
1254        // We might have a chance to forward implied capabilities and reduce the cost of bringing
1255        // up the next replica, if the dropped replica was the only one in the cluster.
1256        self.forward_implied_capabilities();
1257
1258        Ok(())
1259    }
1260
1261    /// Rehydrate the given instance replica.
1262    ///
1263    /// # Panics
1264    ///
1265    /// Panics if the specified replica does not exist.
1266    fn rehydrate_replica(&mut self, id: ReplicaId) {
1267        let config = self.replicas[&id].config.clone();
1268        let epoch = self.replicas[&id].epoch + 1;
1269
1270        self.remove_replica(id).expect("replica must exist");
1271        let result = self.add_replica(id, config, Some(epoch));
1272
1273        match result {
1274            Ok(()) => (),
1275            Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1276        }
1277    }
1278
1279    /// Rehydrate any failed replicas of this instance.
1280    fn rehydrate_failed_replicas(&mut self) {
1281        let replicas = self.replicas.iter();
1282        let failed_replicas: Vec<_> = replicas
1283            .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1284            .collect();
1285
1286        for replica_id in failed_replicas {
1287            self.rehydrate_replica(replica_id);
1288        }
1289    }
1290
1291    /// Creates the described dataflow and initializes state for its output.
1292    ///
1293    /// This method expects a `DataflowDescription` with an `as_of` frontier specified, as well as
1294    /// for each imported collection a read hold in `import_read_holds` at at least the `as_of`.
1295    ///
1296    /// If a `subscribe_target_replica` is given, any subscribes exported by the dataflow are
1297    /// configured to target that replica, i.e., only subscribe responses sent by that replica are
1298    /// considered.
1299    #[mz_ore::instrument(level = "debug")]
1300    pub fn create_dataflow(
1301        &mut self,
1302        dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1303        import_read_holds: Vec<ReadHold<T>>,
1304        subscribe_target_replica: Option<ReplicaId>,
1305        mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState<T>>,
1306    ) -> Result<(), DataflowCreationError> {
1307        use DataflowCreationError::*;
1308
1309        if let Some(replica_id) = subscribe_target_replica {
1310            if !self.replica_exists(replica_id) {
1311                return Err(ReplicaMissing(replica_id));
1312            }
1313        }
1314
1315        // Simple sanity checks around `as_of`
1316        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1317        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1318            return Err(EmptyAsOfForSubscribe);
1319        }
1320        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1321            return Err(EmptyAsOfForCopyTo);
1322        }
1323
1324        // Collect all dependencies of the dataflow, and read holds on them at the `as_of`.
1325        let mut storage_dependencies = BTreeMap::new();
1326        let mut compute_dependencies = BTreeMap::new();
1327
1328        // When we install per-replica input read holds, we cannot use the `as_of` because of
1329        // reconciliation: Existing slow replicas might be reading from the inputs at times before
1330        // the `as_of` and we would rather not crash them by allowing their inputs to compact too
1331        // far. So instead we take read holds at the least time available.
1332        let mut replica_input_read_holds = Vec::new();
1333
1334        let mut import_read_holds: BTreeMap<_, _> =
1335            import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1336
1337        for &id in dataflow.source_imports.keys() {
1338            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1339            replica_input_read_holds.push(read_hold.clone());
1340
1341            read_hold
1342                .try_downgrade(as_of.clone())
1343                .map_err(|_| ReadHoldInsufficient(id))?;
1344            storage_dependencies.insert(id, read_hold);
1345        }
1346
1347        for &id in dataflow.index_imports.keys() {
1348            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1349            read_hold
1350                .try_downgrade(as_of.clone())
1351                .map_err(|_| ReadHoldInsufficient(id))?;
1352            compute_dependencies.insert(id, read_hold);
1353        }
1354
1355        // If the `as_of` is empty, we are not going to create a dataflow, so replicas won't read
1356        // from the inputs.
1357        if as_of.is_empty() {
1358            replica_input_read_holds = Default::default();
1359        }
1360
1361        // Install collection state for each of the exports.
1362        for export_id in dataflow.export_ids() {
1363            let shared = shared_collection_state
1364                .remove(&export_id)
1365                .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1366            let write_only = dataflow.sink_exports.contains_key(&export_id);
1367            let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1368
1369            self.add_collection(
1370                export_id,
1371                as_of.clone(),
1372                shared,
1373                storage_dependencies.clone(),
1374                compute_dependencies.clone(),
1375                replica_input_read_holds.clone(),
1376                write_only,
1377                storage_sink,
1378                dataflow.initial_storage_as_of.clone(),
1379                dataflow.refresh_schedule.clone(),
1380            );
1381
1382            // If the export is a storage sink, we can advance its write frontier to the write
1383            // frontier of the target storage collection.
1384            if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1385                self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1386            }
1387        }
1388
1389        // Initialize tracking of subscribes.
1390        for subscribe_id in dataflow.subscribe_ids() {
1391            self.subscribes
1392                .insert(subscribe_id, ActiveSubscribe::new(subscribe_target_replica));
1393        }
1394
1395        // Initialize tracking of copy tos.
1396        for copy_to_id in dataflow.copy_to_ids() {
1397            self.copy_tos.insert(copy_to_id);
1398        }
1399
1400        // Here we augment all imported sources and all exported sinks with the appropriate
1401        // storage metadata needed by the compute instance.
1402        let mut source_imports = BTreeMap::new();
1403        for (id, (si, monotonic, _upper)) in dataflow.source_imports {
1404            let frontiers = self
1405                .storage_collections
1406                .collection_frontiers(id)
1407                .expect("collection exists");
1408
1409            let collection_metadata = self
1410                .storage_collections
1411                .collection_metadata(id)
1412                .expect("we have a read hold on this collection");
1413
1414            let desc = SourceInstanceDesc {
1415                storage_metadata: collection_metadata.clone(),
1416                arguments: si.arguments,
1417                typ: si.typ.clone(),
1418            };
1419            source_imports.insert(id, (desc, monotonic, frontiers.write_frontier));
1420        }
1421
1422        let mut sink_exports = BTreeMap::new();
1423        for (id, se) in dataflow.sink_exports {
1424            let connection = match se.connection {
1425                ComputeSinkConnection::MaterializedView(conn) => {
1426                    let metadata = self
1427                        .storage_collections
1428                        .collection_metadata(id)
1429                        .map_err(|_| CollectionMissing(id))?
1430                        .clone();
1431                    let conn = MaterializedViewSinkConnection {
1432                        value_desc: conn.value_desc,
1433                        storage_metadata: metadata,
1434                    };
1435                    ComputeSinkConnection::MaterializedView(conn)
1436                }
1437                ComputeSinkConnection::ContinualTask(conn) => {
1438                    let metadata = self
1439                        .storage_collections
1440                        .collection_metadata(id)
1441                        .map_err(|_| DataflowCreationError::CollectionMissing(id))?
1442                        .clone();
1443                    let conn = ContinualTaskConnection {
1444                        input_id: conn.input_id,
1445                        storage_metadata: metadata,
1446                    };
1447                    ComputeSinkConnection::ContinualTask(conn)
1448                }
1449                ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1450                ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1451                    ComputeSinkConnection::CopyToS3Oneshot(conn)
1452                }
1453            };
1454            let desc = ComputeSinkDesc {
1455                from: se.from,
1456                from_desc: se.from_desc,
1457                connection,
1458                with_snapshot: se.with_snapshot,
1459                up_to: se.up_to,
1460                non_null_assertions: se.non_null_assertions,
1461                refresh_schedule: se.refresh_schedule,
1462            };
1463            sink_exports.insert(id, desc);
1464        }
1465
1466        // Flatten the dataflow plans into the representation expected by replicas.
1467        let objects_to_build = dataflow
1468            .objects_to_build
1469            .into_iter()
1470            .map(|object| BuildDesc {
1471                id: object.id,
1472                plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1473            })
1474            .collect();
1475
1476        let augmented_dataflow = DataflowDescription {
1477            source_imports,
1478            sink_exports,
1479            objects_to_build,
1480            // The rest of the fields are identical
1481            index_imports: dataflow.index_imports,
1482            index_exports: dataflow.index_exports,
1483            as_of: dataflow.as_of.clone(),
1484            until: dataflow.until,
1485            initial_storage_as_of: dataflow.initial_storage_as_of,
1486            refresh_schedule: dataflow.refresh_schedule,
1487            debug_name: dataflow.debug_name,
1488            time_dependence: dataflow.time_dependence,
1489        };
1490
1491        if augmented_dataflow.is_transient() {
1492            tracing::debug!(
1493                name = %augmented_dataflow.debug_name,
1494                import_ids = %augmented_dataflow.display_import_ids(),
1495                export_ids = %augmented_dataflow.display_export_ids(),
1496                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1497                until = ?augmented_dataflow.until.elements(),
1498                "creating dataflow",
1499            );
1500        } else {
1501            tracing::info!(
1502                name = %augmented_dataflow.debug_name,
1503                import_ids = %augmented_dataflow.display_import_ids(),
1504                export_ids = %augmented_dataflow.display_export_ids(),
1505                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1506                until = ?augmented_dataflow.until.elements(),
1507                "creating dataflow",
1508            );
1509        }
1510
1511        // Skip the actual dataflow creation for an empty `as_of`. (Happens e.g. for the
1512        // bootstrapping of a REFRESH AT mat view that is past its last refresh.)
1513        if as_of.is_empty() {
1514            tracing::info!(
1515                name = %augmented_dataflow.debug_name,
1516                "not sending `CreateDataflow`, because of empty `as_of`",
1517            );
1518        } else {
1519            let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1520            let dataflow = Box::new(augmented_dataflow);
1521            self.send(ComputeCommand::CreateDataflow(dataflow));
1522
1523            for id in collections {
1524                self.maybe_schedule_collection(id);
1525            }
1526        }
1527
1528        Ok(())
1529    }
1530
1531    /// Schedule the identified collection if all its inputs are available.
1532    ///
1533    /// # Panics
1534    ///
1535    /// Panics if the identified collection does not exist.
1536    fn maybe_schedule_collection(&mut self, id: GlobalId) {
1537        let collection = self.expect_collection(id);
1538
1539        // Don't schedule collections twice.
1540        if collection.scheduled {
1541            return;
1542        }
1543
1544        let as_of = collection.read_frontier();
1545
1546        // If the collection has an empty `as_of`, it was either never installed on the replica or
1547        // has since been dropped. In either case the replica does not expect any commands for it.
1548        if as_of.is_empty() {
1549            return;
1550        }
1551
1552        let ready = if id.is_transient() {
1553            // Always schedule transient collections immediately. The assumption is that those are
1554            // created by interactive user commands and we want to schedule them as quickly as
1555            // possible. Inputs might not yet be available, but when they become available, we
1556            // don't need to wait for the controller to become aware and for the scheduling check
1557            // to run again.
1558            true
1559        } else {
1560            // Ignore self-dependencies. Any self-dependencies do not need to be
1561            // available at the as_of for the dataflow to make progress, so we
1562            // can ignore them here. At the moment, only continual tasks have
1563            // self-dependencies, but this logic is correct for any dataflow, so
1564            // we don't special case it to CTs.
1565            let not_self_dep = |x: &GlobalId| *x != id;
1566
1567            // Check dependency frontiers to determine if all inputs are
1568            // available. An input is available when its frontier is greater
1569            // than the `as_of`, i.e., all input data up to and including the
1570            // `as_of` has been sealed.
1571            let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1572            let compute_frontiers = compute_deps.map(|id| {
1573                let dep = &self.expect_collection(id);
1574                dep.write_frontier()
1575            });
1576
1577            let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1578            let storage_frontiers = self
1579                .storage_collections
1580                .collections_frontiers(storage_deps.collect())
1581                .expect("must exist");
1582            let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1583
1584            let ready = compute_frontiers
1585                .chain(storage_frontiers)
1586                .all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1587
1588            ready
1589        };
1590
1591        if ready {
1592            self.send(ComputeCommand::Schedule(id));
1593            let collection = self.expect_collection_mut(id);
1594            collection.scheduled = true;
1595        }
1596    }
1597
1598    /// Schedule any unscheduled collections that are ready.
1599    fn schedule_collections(&mut self) {
1600        let ids: Vec<_> = self.collections.keys().copied().collect();
1601        for id in ids {
1602            self.maybe_schedule_collection(id);
1603        }
1604    }
1605
1606    /// Drops the read capability for the given collections and allows their resources to be
1607    /// reclaimed.
1608    #[mz_ore::instrument(level = "debug")]
1609    pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1610        for id in &ids {
1611            let collection = self.collection_mut(*id)?;
1612
1613            // Mark the collection as dropped to allow it to be removed from the controller state.
1614            collection.dropped = true;
1615
1616            // Drop the implied and warmup read holds to announce that clients are not
1617            // interested in the collection anymore.
1618            collection.implied_read_hold.release();
1619            collection.warmup_read_hold.release();
1620
1621            // If the collection is a subscribe, stop tracking it. This ensures that the controller
1622            // ceases to produce `SubscribeResponse`s for this subscribe.
1623            self.subscribes.remove(id);
1624            // If the collection is a copy to, stop tracking it. This ensures that the controller
1625            // ceases to produce `CopyToResponse`s` for this copy to.
1626            self.copy_tos.remove(id);
1627        }
1628
1629        Ok(())
1630    }
1631
1632    /// Initiate a peek request for the contents of `id` at `timestamp`.
1633    #[mz_ore::instrument(level = "debug")]
1634    pub fn peek(
1635        &mut self,
1636        peek_target: PeekTarget,
1637        literal_constraints: Option<Vec<Row>>,
1638        uuid: Uuid,
1639        timestamp: T,
1640        result_desc: RelationDesc,
1641        finishing: RowSetFinishing,
1642        map_filter_project: mz_expr::SafeMfpPlan,
1643        mut read_hold: ReadHold<T>,
1644        target_replica: Option<ReplicaId>,
1645        peek_response_tx: oneshot::Sender<PeekResponse>,
1646    ) -> Result<(), PeekError> {
1647        use PeekError::*;
1648
1649        // Downgrade the provided read hold to the peek time.
1650        let target_id = peek_target.id();
1651        if read_hold.id() != target_id {
1652            return Err(ReadHoldIdMismatch(read_hold.id()));
1653        }
1654        read_hold
1655            .try_downgrade(Antichain::from_elem(timestamp.clone()))
1656            .map_err(|_| ReadHoldInsufficient(target_id))?;
1657
1658        if let Some(target) = target_replica {
1659            if !self.replica_exists(target) {
1660                return Err(ReplicaMissing(target));
1661            }
1662        }
1663
1664        let otel_ctx = OpenTelemetryContext::obtain();
1665
1666        self.peeks.insert(
1667            uuid,
1668            PendingPeek {
1669                target_replica,
1670                // TODO(guswynn): can we just hold the `tracing::Span` here instead?
1671                otel_ctx: otel_ctx.clone(),
1672                requested_at: Instant::now(),
1673                read_hold,
1674                peek_response_tx,
1675                limit: finishing.limit.map(usize::cast_from),
1676                offset: finishing.offset,
1677            },
1678        );
1679
1680        let peek = Peek {
1681            literal_constraints,
1682            uuid,
1683            timestamp,
1684            finishing,
1685            map_filter_project,
1686            // Obtain an `OpenTelemetryContext` from the thread-local tracing
1687            // tree to forward it on to the compute worker.
1688            otel_ctx,
1689            target: peek_target,
1690            result_desc,
1691        };
1692        self.send(ComputeCommand::Peek(Box::new(peek)));
1693
1694        Ok(())
1695    }
1696
1697    /// Cancels an existing peek request.
1698    #[mz_ore::instrument(level = "debug")]
1699    pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1700        let Some(peek) = self.peeks.get_mut(&uuid) else {
1701            tracing::warn!("did not find pending peek for {uuid}");
1702            return;
1703        };
1704
1705        let duration = peek.requested_at.elapsed();
1706        self.metrics
1707            .observe_peek_response(&PeekResponse::Canceled, duration);
1708
1709        // Enqueue a notification for the cancellation.
1710        let otel_ctx = peek.otel_ctx.clone();
1711        otel_ctx.attach_as_parent();
1712
1713        self.deliver_response(ComputeControllerResponse::PeekNotification(
1714            uuid,
1715            PeekNotification::Canceled,
1716            otel_ctx,
1717        ));
1718
1719        // Finish the peek.
1720        // This will also propagate the cancellation to the replicas.
1721        self.finish_peek(uuid, reason);
1722    }
1723
1724    /// Assigns a read policy to specific identifiers.
1725    ///
1726    /// The policies are assigned in the order presented, and repeated identifiers should
1727    /// conclude with the last policy. Changing a policy will immediately downgrade the read
1728    /// capability if appropriate, but it will not "recover" the read capability if the prior
1729    /// capability is already ahead of it.
1730    ///
1731    /// Identifiers not present in `policies` retain their existing read policies.
1732    ///
1733    /// It is an error to attempt to set a read policy for a collection that is not readable in the
1734    /// context of compute. At this time, only indexes are readable compute collections.
1735    #[mz_ore::instrument(level = "debug")]
1736    pub fn set_read_policy(
1737        &mut self,
1738        policies: Vec<(GlobalId, ReadPolicy<T>)>,
1739    ) -> Result<(), ReadPolicyError> {
1740        // Do error checking upfront, to avoid introducing inconsistencies between a collection's
1741        // `implied_capability` and `read_capabilities`.
1742        for (id, _policy) in &policies {
1743            let collection = self.collection(*id)?;
1744            if collection.read_policy.is_none() {
1745                return Err(ReadPolicyError::WriteOnlyCollection(*id));
1746            }
1747        }
1748
1749        for (id, new_policy) in policies {
1750            let collection = self.expect_collection_mut(id);
1751            let new_since = new_policy.frontier(collection.write_frontier().borrow());
1752            let _ = collection.implied_read_hold.try_downgrade(new_since);
1753            collection.read_policy = Some(new_policy);
1754        }
1755
1756        Ok(())
1757    }
1758
1759    /// Advance the global write frontier of the given collection.
1760    ///
1761    /// Frontier regressions are gracefully ignored.
1762    ///
1763    /// # Panics
1764    ///
1765    /// Panics if the identified collection does not exist.
1766    #[mz_ore::instrument(level = "debug")]
1767    fn maybe_update_global_write_frontier(&mut self, id: GlobalId, new_frontier: Antichain<T>) {
1768        let collection = self.expect_collection_mut(id);
1769
1770        let advanced = collection.shared.lock_write_frontier(|f| {
1771            let advanced = PartialOrder::less_than(f, &new_frontier);
1772            if advanced {
1773                f.clone_from(&new_frontier);
1774            }
1775            advanced
1776        });
1777
1778        if !advanced {
1779            return;
1780        }
1781
1782        // Relax the implied read hold according to the read policy.
1783        let new_since = match &collection.read_policy {
1784            Some(read_policy) => {
1785                // For readable collections the read frontier is determined by applying the
1786                // client-provided read policy to the write frontier.
1787                read_policy.frontier(new_frontier.borrow())
1788            }
1789            None => {
1790                // Write-only collections cannot be read within the context of the compute
1791                // controller, so their read frontier only controls the read holds taken on their
1792                // inputs. We can safely downgrade the input read holds to any time less than the
1793                // write frontier.
1794                //
1795                // Note that some write-only collections (continual tasks) need to observe changes
1796                // at their current write frontier during hydration. Thus, we cannot downgrade the
1797                // read frontier to the write frontier and instead step it back by one.
1798                Antichain::from_iter(
1799                    new_frontier
1800                        .iter()
1801                        .map(|t| t.step_back().unwrap_or_else(T::minimum)),
1802                )
1803            }
1804        };
1805        let _ = collection.implied_read_hold.try_downgrade(new_since);
1806
1807        // Report the frontier advancement.
1808        self.deliver_response(ComputeControllerResponse::FrontierUpper {
1809            id,
1810            upper: new_frontier,
1811        });
1812    }
1813
1814    /// Apply a collection read hold change.
1815    fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch<T>) {
1816        let Some(collection) = self.collections.get_mut(&id) else {
1817            soft_panic_or_log!(
1818                "read hold change for absent collection (id={id}, changes={update:?})"
1819            );
1820            return;
1821        };
1822
1823        let new_since = collection.shared.lock_read_capabilities(|caps| {
1824            // Sanity check to prevent corrupted `read_capabilities`, which can cause hard-to-debug
1825            // issues (usually stuck read frontiers).
1826            let read_frontier = caps.frontier();
1827            for (time, diff) in update.iter() {
1828                let count = caps.count_for(time) + diff;
1829                assert!(
1830                    count >= 0,
1831                    "invalid read capabilities update: negative capability \
1832             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1833                );
1834                assert!(
1835                    count == 0 || read_frontier.less_equal(time),
1836                    "invalid read capabilities update: frontier regression \
1837             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1838                );
1839            }
1840
1841            // Apply read capability updates and learn about resulting changes to the read
1842            // frontier.
1843            let changes = caps.update_iter(update.drain());
1844
1845            let changed = changes.count() > 0;
1846            changed.then(|| caps.frontier().to_owned())
1847        });
1848
1849        let Some(new_since) = new_since else {
1850            return; // read frontier did not change
1851        };
1852
1853        // Propagate read frontier update to dependencies.
1854        for read_hold in collection.compute_dependencies.values_mut() {
1855            read_hold
1856                .try_downgrade(new_since.clone())
1857                .expect("frontiers don't regress");
1858        }
1859        for read_hold in collection.storage_dependencies.values_mut() {
1860            read_hold
1861                .try_downgrade(new_since.clone())
1862                .expect("frontiers don't regress");
1863        }
1864
1865        // Produce `AllowCompaction` command.
1866        self.send(ComputeCommand::AllowCompaction {
1867            id,
1868            frontier: new_since,
1869        });
1870    }
1871
1872    /// Fulfills a registered peek and cleans up associated state.
1873    ///
1874    /// As part of this we:
1875    ///  * Send a `PeekResponse` through the peek's response channel.
1876    ///  * Emit a `CancelPeek` command to instruct replicas to stop spending resources on this
1877    ///    peek, and to allow the `ComputeCommandHistory` to reduce away the corresponding `Peek`
1878    ///    command.
1879    ///  * Remove the read hold for this peek, unblocking compaction that might have waited on it.
1880    fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1881        let Some(peek) = self.peeks.remove(&uuid) else {
1882            return;
1883        };
1884
1885        // The recipient might not be interested in the peek response anymore, which is fine.
1886        let _ = peek.peek_response_tx.send(response);
1887
1888        // NOTE: We need to send the `CancelPeek` command _before_ we release the peek's read hold
1889        // (by dropping it), to avoid the edge case that caused database-issues#4812.
1890        self.send(ComputeCommand::CancelPeek { uuid });
1891
1892        drop(peek.read_hold);
1893    }
1894
1895    /// Handles a response from a replica. Replica IDs are re-used across replica restarts, so we
1896    /// use the replica epoch to drop stale responses.
1897    fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse<T>) {
1898        // Filter responses from non-existing or stale replicas.
1899        if self
1900            .replicas
1901            .get(&replica_id)
1902            .filter(|replica| replica.epoch == epoch)
1903            .is_none()
1904        {
1905            return;
1906        }
1907
1908        // Invariant: the replica exists and has the expected epoch.
1909
1910        match response {
1911            ComputeResponse::Frontiers(id, frontiers) => {
1912                self.handle_frontiers_response(id, frontiers, replica_id);
1913            }
1914            ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
1915                self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
1916            }
1917            ComputeResponse::CopyToResponse(id, response) => {
1918                self.handle_copy_to_response(id, response, replica_id);
1919            }
1920            ComputeResponse::SubscribeResponse(id, response) => {
1921                self.handle_subscribe_response(id, response, replica_id);
1922            }
1923            ComputeResponse::Status(response) => {
1924                self.handle_status_response(response, replica_id);
1925            }
1926        }
1927    }
1928
1929    /// Handle new frontiers, returning any compute response that needs to
1930    /// be sent to the client.
1931    fn handle_frontiers_response(
1932        &mut self,
1933        id: GlobalId,
1934        frontiers: FrontiersResponse<T>,
1935        replica_id: ReplicaId,
1936    ) {
1937        if !self.collections.contains_key(&id) {
1938            soft_panic_or_log!(
1939                "frontiers update for an unknown collection \
1940                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1941            );
1942            return;
1943        }
1944        let Some(replica) = self.replicas.get_mut(&replica_id) else {
1945            soft_panic_or_log!(
1946                "frontiers update for an unknown replica \
1947                 (replica_id={replica_id}, frontiers={frontiers:?})"
1948            );
1949            return;
1950        };
1951        let Some(replica_collection) = replica.collections.get_mut(&id) else {
1952            soft_panic_or_log!(
1953                "frontiers update for an unknown replica collection \
1954                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1955            );
1956            return;
1957        };
1958
1959        if let Some(new_frontier) = frontiers.input_frontier {
1960            replica_collection.update_input_frontier(new_frontier.clone());
1961        }
1962        if let Some(new_frontier) = frontiers.output_frontier {
1963            replica_collection.update_output_frontier(new_frontier.clone());
1964        }
1965        if let Some(new_frontier) = frontiers.write_frontier {
1966            replica_collection.update_write_frontier(new_frontier.clone());
1967            self.maybe_update_global_write_frontier(id, new_frontier);
1968        }
1969    }
1970
1971    #[mz_ore::instrument(level = "debug")]
1972    fn handle_peek_response(
1973        &mut self,
1974        uuid: Uuid,
1975        response: PeekResponse,
1976        otel_ctx: OpenTelemetryContext,
1977        replica_id: ReplicaId,
1978    ) {
1979        otel_ctx.attach_as_parent();
1980
1981        // We might not be tracking this peek anymore, because we have served a response already or
1982        // because it was canceled. If this is the case, we ignore the response.
1983        let Some(peek) = self.peeks.get(&uuid) else {
1984            return;
1985        };
1986
1987        // If the peek is targeting a replica, ignore responses from other replicas.
1988        let target_replica = peek.target_replica.unwrap_or(replica_id);
1989        if target_replica != replica_id {
1990            return;
1991        }
1992
1993        let duration = peek.requested_at.elapsed();
1994        self.metrics.observe_peek_response(&response, duration);
1995
1996        let notification = PeekNotification::new(&response, peek.offset, peek.limit);
1997        // NOTE: We use the `otel_ctx` from the response, not the pending peek, because we
1998        // currently want the parent to be whatever the compute worker did with this peek.
1999        self.deliver_response(ComputeControllerResponse::PeekNotification(
2000            uuid,
2001            notification,
2002            otel_ctx,
2003        ));
2004
2005        self.finish_peek(uuid, response)
2006    }
2007
2008    fn handle_copy_to_response(
2009        &mut self,
2010        sink_id: GlobalId,
2011        response: CopyToResponse,
2012        replica_id: ReplicaId,
2013    ) {
2014        if !self.collections.contains_key(&sink_id) {
2015            soft_panic_or_log!(
2016                "received response for an unknown copy-to \
2017                 (sink_id={sink_id}, replica_id={replica_id})",
2018            );
2019            return;
2020        }
2021        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2022            soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
2023            return;
2024        };
2025        let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
2026            soft_panic_or_log!(
2027                "copy-to response for an unknown replica collection \
2028                 (sink_id={sink_id}, replica_id={replica_id})"
2029            );
2030            return;
2031        };
2032
2033        // Downgrade the replica frontiers, to enable dropping of input read holds and clean up of
2034        // collection state.
2035        // TODO(database-issues#4701): report copy-to frontiers through `Frontiers` responses
2036        replica_collection.update_write_frontier(Antichain::new());
2037        replica_collection.update_input_frontier(Antichain::new());
2038        replica_collection.update_output_frontier(Antichain::new());
2039
2040        // We might not be tracking this COPY TO because we have already returned a response
2041        // from one of the replicas. In that case, we ignore the response.
2042        if !self.copy_tos.remove(&sink_id) {
2043            return;
2044        }
2045
2046        let result = match response {
2047            CopyToResponse::RowCount(count) => Ok(count),
2048            CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
2049            // We should never get here: Replicas only drop copy to collections in response
2050            // to the controller allowing them to do so, and when the controller drops a
2051            // copy to it also removes it from the list of tracked copy_tos (see
2052            // [`Instance::drop_collections`]).
2053            CopyToResponse::Dropped => {
2054                tracing::error!(
2055                    %sink_id, %replica_id,
2056                    "received `Dropped` response for a tracked copy to",
2057                );
2058                return;
2059            }
2060        };
2061
2062        self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2063    }
2064
2065    fn handle_subscribe_response(
2066        &mut self,
2067        subscribe_id: GlobalId,
2068        response: SubscribeResponse<T>,
2069        replica_id: ReplicaId,
2070    ) {
2071        if !self.collections.contains_key(&subscribe_id) {
2072            soft_panic_or_log!(
2073                "received response for an unknown subscribe \
2074                 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2075            );
2076            return;
2077        }
2078        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2079            soft_panic_or_log!(
2080                "subscribe response for an unknown replica (replica_id={replica_id})"
2081            );
2082            return;
2083        };
2084        let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2085            soft_panic_or_log!(
2086                "subscribe response for an unknown replica collection \
2087                 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2088            );
2089            return;
2090        };
2091
2092        // Always apply replica write frontier updates. Even if the subscribe is not tracked
2093        // anymore, there might still be replicas reading from its inputs, so we need to track the
2094        // frontiers until all replicas have advanced to the empty one.
2095        let write_frontier = match &response {
2096            SubscribeResponse::Batch(batch) => batch.upper.clone(),
2097            SubscribeResponse::DroppedAt(_) => Antichain::new(),
2098        };
2099
2100        // For subscribes we downgrade all replica frontiers based on write frontiers. This should
2101        // be fine because the input and output frontier of a subscribe track its write frontier.
2102        // TODO(database-issues#4701): report subscribe frontiers through `Frontiers` responses
2103        replica_collection.update_write_frontier(write_frontier.clone());
2104        replica_collection.update_input_frontier(write_frontier.clone());
2105        replica_collection.update_output_frontier(write_frontier.clone());
2106
2107        // If the subscribe is not tracked, or targets a different replica, there is nothing to do.
2108        let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2109            return;
2110        };
2111        let replica_targeted = subscribe.target_replica.unwrap_or(replica_id) == replica_id;
2112        if !replica_targeted {
2113            return;
2114        }
2115
2116        // Apply a global frontier update.
2117        // If this is a replica-targeted subscribe, it is important that we advance the global
2118        // frontier only based on responses from the targeted replica. Otherwise, another replica
2119        // could advance to the empty frontier, making us drop the subscribe on the targeted
2120        // replica prematurely.
2121        self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2122
2123        match response {
2124            SubscribeResponse::Batch(batch) => {
2125                let upper = batch.upper;
2126                let mut updates = batch.updates;
2127
2128                // If this batch advances the subscribe's frontier, we emit all updates at times
2129                // greater or equal to the last frontier (to avoid emitting duplicate updates).
2130                if PartialOrder::less_than(&subscribe.frontier, &upper) {
2131                    let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2132
2133                    if upper.is_empty() {
2134                        // This subscribe cannot produce more data. Stop tracking it.
2135                        self.subscribes.remove(&subscribe_id);
2136                    } else {
2137                        // This subscribe can produce more data. Update our tracking of it.
2138                        self.subscribes.insert(subscribe_id, subscribe);
2139                    }
2140
2141                    if let Ok(updates) = updates.as_mut() {
2142                        updates.retain(|(time, _data, _diff)| lower.less_equal(time));
2143                    }
2144                    self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2145                        subscribe_id,
2146                        SubscribeBatch {
2147                            lower,
2148                            upper,
2149                            updates,
2150                        },
2151                    ));
2152                }
2153            }
2154            SubscribeResponse::DroppedAt(frontier) => {
2155                // We should never get here: Replicas only drop subscribe collections in response
2156                // to the controller allowing them to do so, and when the controller drops a
2157                // subscribe it also removes it from the list of tracked subscribes (see
2158                // [`Instance::drop_collections`]).
2159                tracing::error!(
2160                    %subscribe_id,
2161                    %replica_id,
2162                    frontier = ?frontier.elements(),
2163                    "received `DroppedAt` response for a tracked subscribe",
2164                );
2165                self.subscribes.remove(&subscribe_id);
2166            }
2167        }
2168    }
2169
2170    fn handle_status_response(&self, response: StatusResponse, _replica_id: ReplicaId) {
2171        match response {
2172            StatusResponse::Placeholder => {}
2173        }
2174    }
2175
2176    /// Return the write frontiers of the dependencies of the given collection.
2177    fn dependency_write_frontiers<'b>(
2178        &'b self,
2179        collection: &'b CollectionState<T>,
2180    ) -> impl Iterator<Item = Antichain<T>> + 'b {
2181        let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2182            let collection = self.collections.get(&dep_id);
2183            collection.map(|c| c.write_frontier())
2184        });
2185        let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2186            let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2187            frontiers.map(|f| f.write_frontier)
2188        });
2189
2190        compute_frontiers.chain(storage_frontiers)
2191    }
2192
2193    /// Return the write frontiers of transitive storage dependencies of the given collection.
2194    fn transitive_storage_dependency_write_frontiers<'b>(
2195        &'b self,
2196        collection: &'b CollectionState<T>,
2197    ) -> impl Iterator<Item = Antichain<T>> + 'b {
2198        let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2199        let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2200        let mut done = BTreeSet::new();
2201
2202        while let Some(id) = todo.pop() {
2203            if done.contains(&id) {
2204                continue;
2205            }
2206            if let Some(dep) = self.collections.get(&id) {
2207                storage_ids.extend(dep.storage_dependency_ids());
2208                todo.extend(dep.compute_dependency_ids())
2209            }
2210            done.insert(id);
2211        }
2212
2213        let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2214            let frontiers = self.storage_collections.collection_frontiers(id).ok();
2215            frontiers.map(|f| f.write_frontier)
2216        });
2217
2218        storage_frontiers
2219    }
2220
2221    /// Downgrade the warmup capabilities of collections as much as possible.
2222    ///
2223    /// The only requirement we have for a collection's warmup capability is that it is for a time
2224    /// that is available in all of the collection's inputs. For each input the latest time that is
2225    /// the case for is `write_frontier - 1`. So the farthest we can downgrade a collection's
2226    /// warmup capability is the minimum of `write_frontier - 1` of all its inputs.
2227    ///
2228    /// This method expects to be periodically called as part of instance maintenance work.
2229    /// We would like to instead update the warmup capabilities synchronously in response to
2230    /// frontier updates of dependency collections, but that is not generally possible because we
2231    /// don't learn about frontier updates of storage collections synchronously. We could do
2232    /// synchronous updates for compute dependencies, but we refrain from doing for simplicity.
2233    fn downgrade_warmup_capabilities(&mut self) {
2234        let mut new_capabilities = BTreeMap::new();
2235        for (id, collection) in &self.collections {
2236            // For write-only collections that have advanced to the empty frontier, we can drop the
2237            // warmup capability entirely. There is no reason why we would need to hydrate those
2238            // collections again, so being able to warm them up is not useful.
2239            if collection.read_policy.is_none()
2240                && collection.shared.lock_write_frontier(|f| f.is_empty())
2241            {
2242                new_capabilities.insert(*id, Antichain::new());
2243                continue;
2244            }
2245
2246            let mut new_capability = Antichain::new();
2247            for frontier in self.dependency_write_frontiers(collection) {
2248                for time in frontier {
2249                    new_capability.insert(time.step_back().unwrap_or(time));
2250                }
2251            }
2252
2253            new_capabilities.insert(*id, new_capability);
2254        }
2255
2256        for (id, new_capability) in new_capabilities {
2257            let collection = self.expect_collection_mut(id);
2258            let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2259        }
2260    }
2261
2262    /// Forward the implied capabilities of collections, if possible.
2263    ///
2264    /// The implied capability of a collection controls (a) which times are still readable (for
2265    /// indexes) and (b) with which as-of the collection gets installed on a new replica. We are
2266    /// usually not allowed to advance an implied capability beyond the frontier that follows from
2267    /// the collection's read policy applied to its write frontier:
2268    ///
2269    ///  * For sink collections, some external consumer might rely on seeing all distinct times in
2270    ///    the input reflected in the output. If we'd forward the implied capability of a sink,
2271    ///    we'd risk skipping times in the output across replica restarts.
2272    ///  * For index collections, we might make the index unreadable by advancing its read frontier
2273    ///    beyond its write frontier.
2274    ///
2275    /// There is one case where forwarding an implied capability is fine though: an index installed
2276    /// on a cluster that has no replicas. Such indexes are not readable anyway until a new replica
2277    /// is added, so advancing its read frontier can't make it unreadable. We can thus advance the
2278    /// implied capability as long as we make sure that when a new replica is added, the expected
2279    /// relationship between write frontier, read policy, and implied capability can be restored
2280    /// immediately (modulo computation time).
2281    ///
2282    /// Forwarding implied capabilities is not necessary for the correct functioning of the
2283    /// controller but an optimization that is beneficial in two ways:
2284    ///
2285    ///  * It relaxes read holds on inputs to forwarded collections, allowing their compaction.
2286    ///  * It reduces the amount of historical detail new replicas need to process when computing
2287    ///    forwarded collections, as forwarding the implied capability also forwards the corresponding
2288    ///    dataflow as-of.
2289    fn forward_implied_capabilities(&mut self) {
2290        if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2291            return;
2292        }
2293        if !self.replicas.is_empty() {
2294            return;
2295        }
2296
2297        let mut new_capabilities = BTreeMap::new();
2298        for (id, collection) in &self.collections {
2299            let Some(read_policy) = &collection.read_policy else {
2300                // Collection is write-only, i.e. a sink.
2301                continue;
2302            };
2303
2304            // When a new replica is started, it will immediately be able to compute all collection
2305            // output up to the write frontier of its transitive storage inputs. So the new implied
2306            // read capability should be the read policy applied to that frontier.
2307            let mut dep_frontier = Antichain::new();
2308            for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2309                dep_frontier.extend(frontier);
2310            }
2311
2312            let new_capability = read_policy.frontier(dep_frontier.borrow());
2313            if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2314                new_capabilities.insert(*id, new_capability);
2315            }
2316        }
2317
2318        for (id, new_capability) in new_capabilities {
2319            let collection = self.expect_collection_mut(id);
2320            let _ = collection.implied_read_hold.try_downgrade(new_capability);
2321        }
2322    }
2323
2324    /// Process pending maintenance work.
2325    ///
2326    /// This method is invoked periodically by the global controller.
2327    /// It is a good place to perform maintenance work that arises from various controller state
2328    /// changes and that cannot conveniently be handled synchronously with those state changes.
2329    #[mz_ore::instrument(level = "debug")]
2330    pub fn maintain(&mut self) {
2331        self.rehydrate_failed_replicas();
2332        self.downgrade_warmup_capabilities();
2333        self.forward_implied_capabilities();
2334        self.schedule_collections();
2335        self.cleanup_collections();
2336        self.update_frontier_introspection();
2337        self.refresh_state_metrics();
2338        self.refresh_wallclock_lag();
2339    }
2340}
2341
2342/// State maintained about individual compute collections.
2343///
2344/// A compute collection is either an index, or a storage sink, or a subscribe, exported by a
2345/// compute dataflow.
2346#[derive(Debug)]
2347struct CollectionState<T: ComputeControllerTimestamp> {
2348    /// Whether this collection is a log collection.
2349    ///
2350    /// Log collections are special in that they are only maintained by a subset of all replicas.
2351    log_collection: bool,
2352    /// Whether this collection has been dropped by a controller client.
2353    ///
2354    /// The controller is allowed to remove the `CollectionState` for a collection only when
2355    /// `dropped == true`. Otherwise, clients might still expect to be able to query information
2356    /// about this collection.
2357    dropped: bool,
2358    /// Whether this collection has been scheduled, i.e., the controller has sent a `Schedule`
2359    /// command for it.
2360    scheduled: bool,
2361
2362    /// State shared with the `ComputeController`.
2363    shared: SharedCollectionState<T>,
2364
2365    /// A read hold maintaining the implicit capability of the collection.
2366    ///
2367    /// This capability is kept to ensure that the collection remains readable according to its
2368    /// `read_policy`. It also ensures that read holds on the collection's dependencies are kept at
2369    /// some time not greater than the collection's `write_frontier`, guaranteeing that the
2370    /// collection's next outputs can always be computed without skipping times.
2371    implied_read_hold: ReadHold<T>,
2372    /// A read hold held to enable dataflow warmup.
2373    ///
2374    /// Dataflow warmup is an optimization that allows dataflows to immediately start hydrating
2375    /// even when their next output time (as implied by the `write_frontier`) is in the future.
2376    /// By installing a read capability derived from the write frontiers of the collection's
2377    /// inputs, we ensure that the as-of of new dataflows installed for the collection is at a time
2378    /// that is immediately available, so hydration can begin immediately too.
2379    warmup_read_hold: ReadHold<T>,
2380    /// The policy to use to downgrade `self.implied_read_hold`.
2381    ///
2382    /// If `None`, the collection is a write-only collection (i.e. a sink). For write-only
2383    /// collections, the `implied_read_hold` is only required for maintaining read holds on the
2384    /// inputs, so we can immediately downgrade it to the `write_frontier`.
2385    read_policy: Option<ReadPolicy<T>>,
2386
2387    /// Storage identifiers on which this collection depends, and read holds this collection
2388    /// requires on them.
2389    storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2390    /// Compute identifiers on which this collection depends, and read holds this collection
2391    /// requires on them.
2392    compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2393
2394    /// Introspection state associated with this collection.
2395    introspection: CollectionIntrospection<T>,
2396
2397    /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
2398    /// introspection update.
2399    ///
2400    /// Keys are `(period, lag, labels)` triples, values are counts.
2401    ///
2402    /// If this is `None`, wallclock lag is not tracked for this collection.
2403    wallclock_lag_histogram_stash: Option<
2404        BTreeMap<
2405            (
2406                WallclockLagHistogramPeriod,
2407                WallclockLag,
2408                BTreeMap<&'static str, String>,
2409            ),
2410            Diff,
2411        >,
2412    >,
2413}
2414
2415impl<T: ComputeControllerTimestamp> CollectionState<T> {
2416    /// Creates a new collection state, with an initial read policy valid from `since`.
2417    fn new(
2418        collection_id: GlobalId,
2419        as_of: Antichain<T>,
2420        shared: SharedCollectionState<T>,
2421        storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2422        compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2423        read_hold_tx: read_holds::ChangeTx<T>,
2424        introspection: CollectionIntrospection<T>,
2425    ) -> Self {
2426        // A collection is not readable before the `as_of`.
2427        let since = as_of.clone();
2428        // A collection won't produce updates for times before the `as_of`.
2429        let upper = as_of;
2430
2431        // Ensure that the provided `shared` is valid for the given `as_of`.
2432        assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2433        assert!(shared.lock_write_frontier(|f| f == &upper));
2434
2435        // Initialize collection read holds.
2436        // Note that the implied read hold was already added to the `read_capabilities` when
2437        // `shared` was created, so we only need to add the warmup read hold here.
2438        let implied_read_hold =
2439            ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2440        let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2441
2442        let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2443        shared.lock_read_capabilities(|c| {
2444            c.update_iter(updates);
2445        });
2446
2447        // In an effort to keep the produced wallclock lag introspection data small and
2448        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
2449        // select indexes and subscribes.
2450        let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2451            true => None,
2452            false => Some(Default::default()),
2453        };
2454
2455        Self {
2456            log_collection: false,
2457            dropped: false,
2458            scheduled: false,
2459            shared,
2460            implied_read_hold,
2461            warmup_read_hold,
2462            read_policy: Some(ReadPolicy::ValidFrom(since)),
2463            storage_dependencies,
2464            compute_dependencies,
2465            introspection,
2466            wallclock_lag_histogram_stash,
2467        }
2468    }
2469
2470    /// Creates a new collection state for a log collection.
2471    fn new_log_collection(
2472        id: GlobalId,
2473        shared: SharedCollectionState<T>,
2474        read_hold_tx: read_holds::ChangeTx<T>,
2475        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2476    ) -> Self {
2477        let since = Antichain::from_elem(T::minimum());
2478        let introspection =
2479            CollectionIntrospection::new(id, introspection_tx, since.clone(), false, None, None);
2480        let mut state = Self::new(
2481            id,
2482            since,
2483            shared,
2484            Default::default(),
2485            Default::default(),
2486            read_hold_tx,
2487            introspection,
2488        );
2489        state.log_collection = true;
2490        // Log collections are created and scheduled implicitly as part of replica initialization.
2491        state.scheduled = true;
2492        state
2493    }
2494
2495    /// Reports the current read frontier.
2496    fn read_frontier(&self) -> Antichain<T> {
2497        self.shared
2498            .lock_read_capabilities(|c| c.frontier().to_owned())
2499    }
2500
2501    /// Reports the current write frontier.
2502    fn write_frontier(&self) -> Antichain<T> {
2503        self.shared.lock_write_frontier(|f| f.clone())
2504    }
2505
2506    fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2507        self.storage_dependencies.keys().copied()
2508    }
2509
2510    fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2511        self.compute_dependencies.keys().copied()
2512    }
2513
2514    /// Reports the IDs of the dependencies of this collection.
2515    fn dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2516        self.compute_dependency_ids()
2517            .chain(self.storage_dependency_ids())
2518    }
2519}
2520
2521/// Collection state shared with the `ComputeController`.
2522///
2523/// Having this allows certain controller APIs, such as `ComputeController::collection_frontiers`
2524/// and `ComputeController::acquire_read_hold` to be non-`async`. This comes at the cost of
2525/// complexity (by introducing shared mutable state) and performance (by introducing locking). We
2526/// should aim to reduce the amount of shared state over time, rather than expand it.
2527///
2528/// Note that [`SharedCollectionState`]s are initialized by the `ComputeController` prior to the
2529/// collection's creation in the [`Instance`]. This is to allow compute clients to query frontiers
2530/// and take new read holds immediately, without having to wait for the [`Instance`] to update.
2531#[derive(Clone, Debug)]
2532pub(super) struct SharedCollectionState<T> {
2533    /// Accumulation of read capabilities for the collection.
2534    ///
2535    /// This accumulation contains the capabilities held by all [`ReadHold`]s given out for the
2536    /// collection, including `implied_read_hold` and `warmup_read_hold`.
2537    ///
2538    /// NOTE: This field may only be modified by [`Instance::apply_read_hold_change`] and
2539    /// `ComputeController::acquire_read_hold`. Nobody else should modify read capabilities
2540    /// directly. Instead, collection users should manage read holds through [`ReadHold`] objects
2541    /// acquired through `ComputeController::acquire_read_hold`.
2542    ///
2543    /// TODO(teskje): Restructure the code to enforce the above in the type system.
2544    read_capabilities: Arc<Mutex<MutableAntichain<T>>>,
2545    /// The write frontier of this collection.
2546    write_frontier: Arc<Mutex<Antichain<T>>>,
2547}
2548
2549impl<T: Timestamp> SharedCollectionState<T> {
2550    pub fn new(as_of: Antichain<T>) -> Self {
2551        // A collection is not readable before the `as_of`.
2552        let since = as_of.clone();
2553        // A collection won't produce updates for times before the `as_of`.
2554        let upper = as_of;
2555
2556        // Initialize read capabilities to the `since`.
2557        // The is the implied read capability. The corresponding [`ReadHold`] is created in
2558        // [`CollectionState::new`].
2559        let mut read_capabilities = MutableAntichain::new();
2560        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2561
2562        Self {
2563            read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2564            write_frontier: Arc::new(Mutex::new(upper)),
2565        }
2566    }
2567
2568    pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2569    where
2570        F: FnOnce(&mut MutableAntichain<T>) -> R,
2571    {
2572        let mut caps = self.read_capabilities.lock().expect("poisoned");
2573        f(&mut *caps)
2574    }
2575
2576    pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2577    where
2578        F: FnOnce(&mut Antichain<T>) -> R,
2579    {
2580        let mut frontier = self.write_frontier.lock().expect("poisoned");
2581        f(&mut *frontier)
2582    }
2583}
2584
2585/// Manages certain introspection relations associated with a collection. Upon creation, it adds
2586/// rows to introspection relations. When dropped, it retracts its managed rows.
2587///
2588/// TODO: `ComputeDependencies` could be moved under this.
2589#[derive(Debug)]
2590struct CollectionIntrospection<T: ComputeControllerTimestamp> {
2591    /// The ID of the compute collection.
2592    collection_id: GlobalId,
2593    /// A channel through which introspection updates are delivered.
2594    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2595    /// Introspection state for `IntrospectionType::Frontiers`.
2596    ///
2597    /// `Some` if the collection does _not_ sink into a storage collection (i.e. is not an MV). If
2598    /// the collection sinks into storage, the storage controller reports its frontiers instead.
2599    frontiers: Option<FrontiersIntrospectionState<T>>,
2600    /// Introspection state for `IntrospectionType::ComputeMaterializedViewRefreshes`.
2601    ///
2602    /// `Some` if the collection is a REFRESH MV.
2603    refresh: Option<RefreshIntrospectionState<T>>,
2604}
2605
2606impl<T: ComputeControllerTimestamp> CollectionIntrospection<T> {
2607    fn new(
2608        collection_id: GlobalId,
2609        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2610        as_of: Antichain<T>,
2611        storage_sink: bool,
2612        initial_as_of: Option<Antichain<T>>,
2613        refresh_schedule: Option<RefreshSchedule>,
2614    ) -> Self {
2615        let refresh =
2616            match (refresh_schedule, initial_as_of) {
2617                (Some(refresh_schedule), Some(initial_as_of)) => Some(
2618                    RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2619                ),
2620                (refresh_schedule, _) => {
2621                    // If we have a `refresh_schedule`, then the collection is a MV, so we should also have
2622                    // an `initial_as_of`.
2623                    soft_assert_or_log!(
2624                        refresh_schedule.is_none(),
2625                        "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2626                    );
2627                    None
2628                }
2629            };
2630        let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2631
2632        let self_ = Self {
2633            collection_id,
2634            introspection_tx,
2635            frontiers,
2636            refresh,
2637        };
2638
2639        self_.report_initial_state();
2640        self_
2641    }
2642
2643    /// Reports the initial introspection state.
2644    fn report_initial_state(&self) {
2645        if let Some(frontiers) = &self.frontiers {
2646            let row = frontiers.row_for_collection(self.collection_id);
2647            let updates = vec![(row, Diff::ONE)];
2648            self.send(IntrospectionType::Frontiers, updates);
2649        }
2650
2651        if let Some(refresh) = &self.refresh {
2652            let row = refresh.row_for_collection(self.collection_id);
2653            let updates = vec![(row, Diff::ONE)];
2654            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2655        }
2656    }
2657
2658    /// Observe the given current collection frontiers and update the introspection state as
2659    /// necessary.
2660    fn observe_frontiers(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2661        self.update_frontier_introspection(read_frontier, write_frontier);
2662        self.update_refresh_introspection(write_frontier);
2663    }
2664
2665    fn update_frontier_introspection(
2666        &mut self,
2667        read_frontier: &Antichain<T>,
2668        write_frontier: &Antichain<T>,
2669    ) {
2670        let Some(frontiers) = &mut self.frontiers else {
2671            return;
2672        };
2673
2674        if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2675        {
2676            return; // no change
2677        };
2678
2679        let retraction = frontiers.row_for_collection(self.collection_id);
2680        frontiers.update(read_frontier, write_frontier);
2681        let insertion = frontiers.row_for_collection(self.collection_id);
2682        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2683        self.send(IntrospectionType::Frontiers, updates);
2684    }
2685
2686    fn update_refresh_introspection(&mut self, write_frontier: &Antichain<T>) {
2687        let Some(refresh) = &mut self.refresh else {
2688            return;
2689        };
2690
2691        let retraction = refresh.row_for_collection(self.collection_id);
2692        refresh.frontier_update(write_frontier);
2693        let insertion = refresh.row_for_collection(self.collection_id);
2694
2695        if retraction == insertion {
2696            return; // no change
2697        }
2698
2699        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2700        self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2701    }
2702
2703    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2704        // Failure to send means the `ComputeController` has been dropped and doesn't care about
2705        // introspection updates anymore.
2706        let _ = self.introspection_tx.send((introspection_type, updates));
2707    }
2708}
2709
2710impl<T: ComputeControllerTimestamp> Drop for CollectionIntrospection<T> {
2711    fn drop(&mut self) {
2712        // Retract collection frontiers.
2713        if let Some(frontiers) = &self.frontiers {
2714            let row = frontiers.row_for_collection(self.collection_id);
2715            let updates = vec![(row, Diff::MINUS_ONE)];
2716            self.send(IntrospectionType::Frontiers, updates);
2717        }
2718
2719        // Retract MV refresh state.
2720        if let Some(refresh) = &self.refresh {
2721            let retraction = refresh.row_for_collection(self.collection_id);
2722            let updates = vec![(retraction, Diff::MINUS_ONE)];
2723            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2724        }
2725    }
2726}
2727
2728#[derive(Debug)]
2729struct FrontiersIntrospectionState<T> {
2730    read_frontier: Antichain<T>,
2731    write_frontier: Antichain<T>,
2732}
2733
2734impl<T: ComputeControllerTimestamp> FrontiersIntrospectionState<T> {
2735    fn new(as_of: Antichain<T>) -> Self {
2736        Self {
2737            read_frontier: as_of.clone(),
2738            write_frontier: as_of,
2739        }
2740    }
2741
2742    /// Return a `Row` reflecting the current collection frontiers.
2743    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2744        let read_frontier = self
2745            .read_frontier
2746            .as_option()
2747            .map_or(Datum::Null, |ts| ts.clone().into());
2748        let write_frontier = self
2749            .write_frontier
2750            .as_option()
2751            .map_or(Datum::Null, |ts| ts.clone().into());
2752        Row::pack_slice(&[
2753            Datum::String(&collection_id.to_string()),
2754            read_frontier,
2755            write_frontier,
2756        ])
2757    }
2758
2759    /// Update the introspection state with the given new frontiers.
2760    fn update(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2761        if read_frontier != &self.read_frontier {
2762            self.read_frontier.clone_from(read_frontier);
2763        }
2764        if write_frontier != &self.write_frontier {
2765            self.write_frontier.clone_from(write_frontier);
2766        }
2767    }
2768}
2769
2770/// Information needed to compute introspection updates for a REFRESH materialized view when the
2771/// write frontier advances.
2772#[derive(Debug)]
2773struct RefreshIntrospectionState<T> {
2774    // Immutable properties of the MV
2775    refresh_schedule: RefreshSchedule,
2776    initial_as_of: Antichain<T>,
2777    // Refresh state
2778    next_refresh: Datum<'static>,           // Null or an MzTimestamp
2779    last_completed_refresh: Datum<'static>, // Null or an MzTimestamp
2780}
2781
2782impl<T> RefreshIntrospectionState<T> {
2783    /// Return a `Row` reflecting the current refresh introspection state.
2784    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2785        Row::pack_slice(&[
2786            Datum::String(&collection_id.to_string()),
2787            self.last_completed_refresh,
2788            self.next_refresh,
2789        ])
2790    }
2791}
2792
2793impl<T: ComputeControllerTimestamp> RefreshIntrospectionState<T> {
2794    /// Construct a new [`RefreshIntrospectionState`], and apply an initial `frontier_update()` at
2795    /// the `upper`.
2796    fn new(
2797        refresh_schedule: RefreshSchedule,
2798        initial_as_of: Antichain<T>,
2799        upper: &Antichain<T>,
2800    ) -> Self {
2801        let mut self_ = Self {
2802            refresh_schedule: refresh_schedule.clone(),
2803            initial_as_of: initial_as_of.clone(),
2804            next_refresh: Datum::Null,
2805            last_completed_refresh: Datum::Null,
2806        };
2807        self_.frontier_update(upper);
2808        self_
2809    }
2810
2811    /// Should be called whenever the write frontier of the collection advances. It updates the
2812    /// state that should be recorded in introspection relations, but doesn't send the updates yet.
2813    fn frontier_update(&mut self, write_frontier: &Antichain<T>) {
2814        if write_frontier.is_empty() {
2815            self.last_completed_refresh =
2816                if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2817                    last_refresh.into()
2818                } else {
2819                    // If there is no last refresh, then we have a `REFRESH EVERY`, in which case
2820                    // the saturating roundup puts a refresh at the maximum possible timestamp.
2821                    T::maximum().into()
2822                };
2823            self.next_refresh = Datum::Null;
2824        } else {
2825            if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2826                // We are before the first refresh.
2827                self.last_completed_refresh = Datum::Null;
2828                let initial_as_of = self.initial_as_of.as_option().expect(
2829                    "initial_as_of can't be [], because then there would be no refreshes at all",
2830                );
2831                let first_refresh = initial_as_of
2832                    .round_up(&self.refresh_schedule)
2833                    .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2834                soft_assert_or_log!(
2835                    first_refresh == *initial_as_of,
2836                    "initial_as_of should be set to the first refresh"
2837                );
2838                self.next_refresh = first_refresh.into();
2839            } else {
2840                // The first refresh has already happened.
2841                let write_frontier = write_frontier.as_option().expect("checked above");
2842                self.last_completed_refresh = write_frontier
2843                    .round_down_minus_1(&self.refresh_schedule)
2844                    .map_or_else(
2845                        || {
2846                            soft_panic_or_log!(
2847                                "rounding down should have returned the first refresh or later"
2848                            );
2849                            Datum::Null
2850                        },
2851                        |last_completed_refresh| last_completed_refresh.into(),
2852                    );
2853                self.next_refresh = write_frontier.clone().into();
2854            }
2855        }
2856    }
2857}
2858
2859/// A note of an outstanding peek response.
2860#[derive(Debug)]
2861struct PendingPeek<T: Timestamp> {
2862    /// For replica-targeted peeks, this specifies the replica whose response we should pass on.
2863    ///
2864    /// If this value is `None`, we pass on the first response.
2865    target_replica: Option<ReplicaId>,
2866    /// The OpenTelemetry context for this peek.
2867    otel_ctx: OpenTelemetryContext,
2868    /// The time at which the peek was requested.
2869    ///
2870    /// Used to track peek durations.
2871    requested_at: Instant,
2872    /// The read hold installed to serve this peek.
2873    read_hold: ReadHold<T>,
2874    /// The channel to send peek results.
2875    peek_response_tx: oneshot::Sender<PeekResponse>,
2876    /// An optional limit of the peek's result size.
2877    limit: Option<usize>,
2878    /// The offset into the peek's result.
2879    offset: usize,
2880}
2881
2882#[derive(Debug, Clone)]
2883struct ActiveSubscribe<T> {
2884    /// Current upper frontier of this subscribe.
2885    frontier: Antichain<T>,
2886    /// For replica-targeted subscribes, this specifies the replica whose responses we should pass on.
2887    ///
2888    /// If this value is `None`, we pass on the first response for each time slice.
2889    target_replica: Option<ReplicaId>,
2890}
2891
2892impl<T: ComputeControllerTimestamp> ActiveSubscribe<T> {
2893    fn new(target_replica: Option<ReplicaId>) -> Self {
2894        Self {
2895            frontier: Antichain::from_elem(T::minimum()),
2896            target_replica,
2897        }
2898    }
2899}
2900
2901/// State maintained about individual replicas.
2902#[derive(Debug)]
2903struct ReplicaState<T: ComputeControllerTimestamp> {
2904    /// The ID of the replica.
2905    id: ReplicaId,
2906    /// Client for the running replica task.
2907    client: ReplicaClient<T>,
2908    /// The replica configuration.
2909    config: ReplicaConfig,
2910    /// Replica metrics.
2911    metrics: ReplicaMetrics,
2912    /// A channel through which introspection updates are delivered.
2913    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2914    /// Per-replica collection state.
2915    collections: BTreeMap<GlobalId, ReplicaCollectionState<T>>,
2916    /// The epoch of the replica.
2917    epoch: u64,
2918}
2919
2920impl<T: ComputeControllerTimestamp> ReplicaState<T> {
2921    fn new(
2922        id: ReplicaId,
2923        client: ReplicaClient<T>,
2924        config: ReplicaConfig,
2925        metrics: ReplicaMetrics,
2926        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2927        epoch: u64,
2928    ) -> Self {
2929        Self {
2930            id,
2931            client,
2932            config,
2933            metrics,
2934            introspection_tx,
2935            epoch,
2936            collections: Default::default(),
2937        }
2938    }
2939
2940    /// Add a collection to the replica state.
2941    ///
2942    /// # Panics
2943    ///
2944    /// Panics if a collection with the same ID exists already.
2945    fn add_collection(
2946        &mut self,
2947        id: GlobalId,
2948        as_of: Antichain<T>,
2949        input_read_holds: Vec<ReadHold<T>>,
2950    ) {
2951        let metrics = self.metrics.for_collection(id);
2952        let introspection = ReplicaCollectionIntrospection::new(
2953            self.id,
2954            id,
2955            self.introspection_tx.clone(),
2956            as_of.clone(),
2957        );
2958        let mut state =
2959            ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
2960
2961        // In an effort to keep the produced wallclock lag introspection data small and
2962        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
2963        // select indexes and subscribes.
2964        if id.is_transient() {
2965            state.wallclock_lag_max = None;
2966        }
2967
2968        if let Some(previous) = self.collections.insert(id, state) {
2969            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
2970        }
2971    }
2972
2973    /// Remove state for a collection.
2974    fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState<T>> {
2975        self.collections.remove(&id)
2976    }
2977
2978    /// Returns whether all replica frontiers of the given collection are empty.
2979    fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
2980        self.collections.get(&id).map_or(true, |c| {
2981            c.write_frontier.is_empty()
2982                && c.input_frontier.is_empty()
2983                && c.output_frontier.is_empty()
2984        })
2985    }
2986
2987    /// Returns the state of the [`ReplicaState`] formatted as JSON.
2988    ///
2989    /// The returned value is not guaranteed to be stable and may change at any point in time.
2990    #[mz_ore::instrument(level = "debug")]
2991    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2992        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
2993        // returned object as a tradeoff between usability and stability. `serde_json` will fail
2994        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
2995        // prevents a future unrelated change from silently breaking this method.
2996
2997        // Destructure `self` here so we don't forget to consider dumping newly added fields.
2998        let Self {
2999            id,
3000            client: _,
3001            config: _,
3002            metrics: _,
3003            introspection_tx: _,
3004            epoch,
3005            collections,
3006        } = self;
3007
3008        fn field(
3009            key: &str,
3010            value: impl Serialize,
3011        ) -> Result<(String, serde_json::Value), anyhow::Error> {
3012            let value = serde_json::to_value(value)?;
3013            Ok((key.to_string(), value))
3014        }
3015
3016        let collections: BTreeMap<_, _> = collections
3017            .iter()
3018            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
3019            .collect();
3020
3021        let map = serde_json::Map::from_iter([
3022            field("id", id.to_string())?,
3023            field("collections", collections)?,
3024            field("epoch", epoch)?,
3025        ]);
3026        Ok(serde_json::Value::Object(map))
3027    }
3028}
3029
3030#[derive(Debug)]
3031struct ReplicaCollectionState<T: ComputeControllerTimestamp> {
3032    /// The replica write frontier of this collection.
3033    ///
3034    /// See [`FrontiersResponse::write_frontier`].
3035    write_frontier: Antichain<T>,
3036    /// The replica input frontier of this collection.
3037    ///
3038    /// See [`FrontiersResponse::input_frontier`].
3039    input_frontier: Antichain<T>,
3040    /// The replica output frontier of this collection.
3041    ///
3042    /// See [`FrontiersResponse::output_frontier`].
3043    output_frontier: Antichain<T>,
3044
3045    /// Metrics tracked for this collection.
3046    ///
3047    /// If this is `None`, no metrics are collected.
3048    metrics: Option<ReplicaCollectionMetrics>,
3049    /// As-of frontier with which this collection was installed on the replica.
3050    as_of: Antichain<T>,
3051    /// Tracks introspection state for this collection.
3052    introspection: ReplicaCollectionIntrospection<T>,
3053    /// Read holds on storage inputs to this collection.
3054    ///
3055    /// These read holds are kept to ensure that the replica is able to read from storage inputs at
3056    /// all times it hasn't read yet. We only need to install read holds for storage inputs since
3057    /// compaction of compute inputs is implicitly held back by Timely/DD.
3058    input_read_holds: Vec<ReadHold<T>>,
3059
3060    /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
3061    ///
3062    /// If this is `None`, wallclock lag is not tracked for this collection.
3063    wallclock_lag_max: Option<WallclockLag>,
3064}
3065
3066impl<T: ComputeControllerTimestamp> ReplicaCollectionState<T> {
3067    fn new(
3068        metrics: Option<ReplicaCollectionMetrics>,
3069        as_of: Antichain<T>,
3070        introspection: ReplicaCollectionIntrospection<T>,
3071        input_read_holds: Vec<ReadHold<T>>,
3072    ) -> Self {
3073        Self {
3074            write_frontier: as_of.clone(),
3075            input_frontier: as_of.clone(),
3076            output_frontier: as_of.clone(),
3077            metrics,
3078            as_of,
3079            introspection,
3080            input_read_holds,
3081            wallclock_lag_max: Some(WallclockLag::MIN),
3082        }
3083    }
3084
3085    /// Returns whether this collection is hydrated.
3086    fn hydrated(&self) -> bool {
3087        // If the observed frontier is greater than the collection's as-of, the collection has
3088        // produced some output and is therefore hydrated.
3089        //
3090        // We need to consider the edge case where the as-of is the empty frontier. Such an as-of
3091        // is not useful for indexes, because they wouldn't be readable. For write-only
3092        // collections, an empty as-of means that the collection has been fully written and no new
3093        // dataflow needs to be created for it. Consequently, no hydration will happen either.
3094        //
3095        // Based on this, we could respond in two ways:
3096        //  * `false`, as in "the dataflow was never created"
3097        //  * `true`, as in "the dataflow completed immediately"
3098        //
3099        // Since hydration is often used as a measure of dataflow progress and we don't want to
3100        // give the impression that certain dataflows are somehow stuck when they are not, we go
3101        // with the second interpretation here.
3102        self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3103    }
3104
3105    /// Updates the replica write frontier of this collection.
3106    fn update_write_frontier(&mut self, new_frontier: Antichain<T>) {
3107        if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3108            soft_panic_or_log!(
3109                "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3110                self.write_frontier,
3111            );
3112            return;
3113        } else if new_frontier == self.write_frontier {
3114            return;
3115        }
3116
3117        self.write_frontier = new_frontier;
3118    }
3119
3120    /// Updates the replica input frontier of this collection.
3121    fn update_input_frontier(&mut self, new_frontier: Antichain<T>) {
3122        if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3123            soft_panic_or_log!(
3124                "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3125                self.input_frontier,
3126            );
3127            return;
3128        } else if new_frontier == self.input_frontier {
3129            return;
3130        }
3131
3132        self.input_frontier = new_frontier;
3133
3134        // Relax our read holds on collection inputs.
3135        for read_hold in &mut self.input_read_holds {
3136            let result = read_hold.try_downgrade(self.input_frontier.clone());
3137            soft_assert_or_log!(
3138                result.is_ok(),
3139                "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3140                self.input_frontier,
3141            );
3142        }
3143    }
3144
3145    /// Updates the replica output frontier of this collection.
3146    fn update_output_frontier(&mut self, new_frontier: Antichain<T>) {
3147        if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3148            soft_panic_or_log!(
3149                "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3150                self.output_frontier,
3151            );
3152            return;
3153        } else if new_frontier == self.output_frontier {
3154            return;
3155        }
3156
3157        self.output_frontier = new_frontier;
3158    }
3159}
3160
3161/// Maintains the introspection state for a given replica and collection, and ensures that reported
3162/// introspection data is retracted when the collection is dropped.
3163#[derive(Debug)]
3164struct ReplicaCollectionIntrospection<T: ComputeControllerTimestamp> {
3165    /// The ID of the replica.
3166    replica_id: ReplicaId,
3167    /// The ID of the compute collection.
3168    collection_id: GlobalId,
3169    /// The collection's reported replica write frontier.
3170    write_frontier: Antichain<T>,
3171    /// A channel through which introspection updates are delivered.
3172    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3173}
3174
3175impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
3176    /// Create a new `HydrationState` and initialize introspection.
3177    fn new(
3178        replica_id: ReplicaId,
3179        collection_id: GlobalId,
3180        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3181        as_of: Antichain<T>,
3182    ) -> Self {
3183        let self_ = Self {
3184            replica_id,
3185            collection_id,
3186            write_frontier: as_of,
3187            introspection_tx,
3188        };
3189
3190        self_.report_initial_state();
3191        self_
3192    }
3193
3194    /// Reports the initial introspection state.
3195    fn report_initial_state(&self) {
3196        let row = self.write_frontier_row();
3197        let updates = vec![(row, Diff::ONE)];
3198        self.send(IntrospectionType::ReplicaFrontiers, updates);
3199    }
3200
3201    /// Observe the given current write frontier and update the introspection state as necessary.
3202    fn observe_frontier(&mut self, write_frontier: &Antichain<T>) {
3203        if self.write_frontier == *write_frontier {
3204            return; // no change
3205        }
3206
3207        let retraction = self.write_frontier_row();
3208        self.write_frontier.clone_from(write_frontier);
3209        let insertion = self.write_frontier_row();
3210
3211        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3212        self.send(IntrospectionType::ReplicaFrontiers, updates);
3213    }
3214
3215    /// Return a `Row` reflecting the current replica write frontier.
3216    fn write_frontier_row(&self) -> Row {
3217        let write_frontier = self
3218            .write_frontier
3219            .as_option()
3220            .map_or(Datum::Null, |ts| ts.clone().into());
3221        Row::pack_slice(&[
3222            Datum::String(&self.collection_id.to_string()),
3223            Datum::String(&self.replica_id.to_string()),
3224            write_frontier,
3225        ])
3226    }
3227
3228    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3229        // Failure to send means the `ComputeController` has been dropped and doesn't care about
3230        // introspection updates anymore.
3231        let _ = self.introspection_tx.send((introspection_type, updates));
3232    }
3233}
3234
3235impl<T: ComputeControllerTimestamp> Drop for ReplicaCollectionIntrospection<T> {
3236    fn drop(&mut self) {
3237        // Retract the write frontier.
3238        let row = self.write_frontier_row();
3239        let updates = vec![(row, Diff::MINUS_ONE)];
3240        self.send(IntrospectionType::ReplicaFrontiers, updates);
3241    }
3242}