Skip to main content

mz_compute_client/controller/
instance.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller for a compute instance.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
21use mz_compute_types::plan::render_plan::RenderPlan;
22use mz_compute_types::sinks::{
23    ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, MaterializedViewSinkConnection,
24};
25use mz_compute_types::sources::SourceInstanceDesc;
26use mz_controller_types::dyncfgs::{
27    ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, WALLCLOCK_LAG_RECORDING_INTERVAL,
28};
29use mz_dyncfg::ConfigSet;
30use mz_expr::RowSetFinishing;
31use mz_ore::cast::CastFrom;
32use mz_ore::channel::instrumented_unbounded_channel;
33use mz_ore::now::NowFn;
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{soft_assert_or_log, soft_panic_or_log};
36use mz_persist_types::PersistLocation;
37use mz_repr::adt::timestamp::CheckedTimestamp;
38use mz_repr::refresh_schedule::RefreshSchedule;
39use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row};
40use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
41use mz_storage_types::read_holds::{self, ReadHold};
42use mz_storage_types::read_policy::ReadPolicy;
43use thiserror::Error;
44use timely::PartialOrder;
45use timely::progress::frontier::MutableAntichain;
46use timely::progress::{Antichain, ChangeBatch, Timestamp};
47use tokio::sync::{mpsc, oneshot};
48use uuid::Uuid;
49
50use crate::controller::error::{
51    CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
52};
53use crate::controller::instance_client::PeekError;
54use crate::controller::replica::{ReplicaClient, ReplicaConfig};
55use crate::controller::{
56    ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, PeekNotification,
57    ReplicaId, StorageCollections,
58};
59use crate::logging::LogVariant;
60use crate::metrics::IntCounter;
61use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
62use crate::protocol::command::{
63    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
64};
65use crate::protocol::history::ComputeCommandHistory;
66use crate::protocol::response::{
67    ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StatusResponse,
68    SubscribeBatch, SubscribeResponse,
69};
70
71#[derive(Error, Debug)]
72#[error("replica exists already: {0}")]
73pub(super) struct ReplicaExists(pub ReplicaId);
74
75#[derive(Error, Debug)]
76#[error("replica does not exist: {0}")]
77pub(super) struct ReplicaMissing(pub ReplicaId);
78
79#[derive(Error, Debug)]
80pub(super) enum DataflowCreationError {
81    #[error("collection does not exist: {0}")]
82    CollectionMissing(GlobalId),
83    #[error("replica does not exist: {0}")]
84    ReplicaMissing(ReplicaId),
85    #[error("dataflow definition lacks an as_of value")]
86    MissingAsOf,
87    #[error("subscribe dataflow has an empty as_of")]
88    EmptyAsOfForSubscribe,
89    #[error("copy to dataflow has an empty as_of")]
90    EmptyAsOfForCopyTo,
91    #[error("no read hold provided for dataflow import: {0}")]
92    ReadHoldMissing(GlobalId),
93    #[error("insufficient read hold provided for dataflow import: {0}")]
94    ReadHoldInsufficient(GlobalId),
95}
96
97impl From<CollectionMissing> for DataflowCreationError {
98    fn from(error: CollectionMissing) -> Self {
99        Self::CollectionMissing(error.0)
100    }
101}
102
103#[derive(Error, Debug)]
104pub(super) enum ReadPolicyError {
105    #[error("collection does not exist: {0}")]
106    CollectionMissing(GlobalId),
107    #[error("collection is write-only: {0}")]
108    WriteOnlyCollection(GlobalId),
109}
110
111impl From<CollectionMissing> for ReadPolicyError {
112    fn from(error: CollectionMissing) -> Self {
113        Self::CollectionMissing(error.0)
114    }
115}
116
117/// A command sent to an [`Instance`] task.
118pub(super) type Command<T> = Box<dyn FnOnce(&mut Instance<T>) + Send>;
119
120/// A response from a replica, composed of a replica ID, the replica's current epoch, and the
121/// compute response itself.
122pub(super) type ReplicaResponse<T> = (ReplicaId, u64, ComputeResponse<T>);
123
124/// The state we keep for a compute instance.
125pub(super) struct Instance<T: ComputeControllerTimestamp> {
126    /// Build info for spawning replicas
127    build_info: &'static BuildInfo,
128    /// A handle providing access to storage collections.
129    storage_collections: StorageCollections<T>,
130    /// Whether instance initialization has been completed.
131    initialized: bool,
132    /// Whether this instance is in read-only mode.
133    ///
134    /// When in read-only mode, this instance will not update persistent state, such as
135    /// wallclock lag introspection.
136    read_only: bool,
137    /// The workload class of this instance.
138    ///
139    /// This is currently only used to annotate metrics.
140    workload_class: Option<String>,
141    /// The replicas of this compute instance.
142    replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
143    /// Currently installed compute collections.
144    ///
145    /// New entries are added for all collections exported from dataflows created through
146    /// [`Instance::create_dataflow`].
147    ///
148    /// Entries are removed by [`Instance::cleanup_collections`]. See that method's documentation
149    /// about the conditions for removing collection state.
150    collections: BTreeMap<GlobalId, CollectionState<T>>,
151    /// IDs of log sources maintained by this compute instance.
152    log_sources: BTreeMap<LogVariant, GlobalId>,
153    /// Currently outstanding peeks.
154    ///
155    /// New entries are added for all peeks initiated through [`Instance::peek`].
156    ///
157    /// The entry for a peek is only removed once all replicas have responded to the peek. This is
158    /// currently required to ensure all replicas have stopped reading from the peeked collection's
159    /// inputs before we allow them to compact. database-issues#4822 tracks changing this so we only have to wait
160    /// for the first peek response.
161    peeks: BTreeMap<Uuid, PendingPeek<T>>,
162    /// Currently in-progress subscribes.
163    ///
164    /// New entries are added for all subscribes exported from dataflows created through
165    /// [`Instance::create_dataflow`].
166    ///
167    /// The entry for a subscribe is removed once at least one replica has reported the subscribe
168    /// to have advanced to the empty frontier or to have been dropped, implying that no further
169    /// updates will be emitted for this subscribe.
170    ///
171    /// Note that subscribes are tracked both in `collections` and `subscribes`. `collections`
172    /// keeps track of the subscribe's upper and since frontiers and ensures appropriate read holds
173    /// on the subscribe's input. `subscribes` is only used to track which updates have been
174    /// emitted, to decide if new ones should be emitted or suppressed.
175    subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>,
176    /// Tracks all in-progress COPY TOs.
177    ///
178    /// New entries are added for all s3 oneshot sinks (corresponding to a COPY TO) exported from
179    /// dataflows created through [`Instance::create_dataflow`].
180    ///
181    /// The entry for a copy to is removed once at least one replica has finished
182    /// or the exporting collection is dropped.
183    copy_tos: BTreeSet<GlobalId>,
184    /// The command history, used when introducing new replicas or restarting existing replicas.
185    history: ComputeCommandHistory<UIntGauge, T>,
186    /// Receiver for commands to be executed.
187    command_rx: mpsc::UnboundedReceiver<Command<T>>,
188    /// Sender for responses to be delivered.
189    response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
190    /// Sender for introspection updates to be recorded.
191    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
192    /// The registry the controller uses to report metrics.
193    metrics: InstanceMetrics,
194    /// Dynamic system configuration.
195    dyncfg: Arc<ConfigSet>,
196
197    /// The persist location where we can stash large peek results.
198    peek_stash_persist_location: PersistLocation,
199
200    /// A function that produces the current wallclock time.
201    now: NowFn,
202    /// A function that computes the lag between the given time and wallclock time.
203    wallclock_lag: WallclockLagFn<T>,
204    /// The last time wallclock lag introspection was recorded.
205    wallclock_lag_last_recorded: DateTime<Utc>,
206
207    /// Sender for updates to collection read holds.
208    ///
209    /// Copies of this sender are given to [`ReadHold`]s that are created in
210    /// [`CollectionState::new`].
211    read_hold_tx: read_holds::ChangeTx<T>,
212    /// A sender for responses from replicas.
213    replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
214    /// A receiver for responses from replicas.
215    replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse<T>, IntCounter>,
216}
217
218impl<T: ComputeControllerTimestamp> Instance<T> {
219    /// Acquire a handle to the collection state associated with `id`.
220    fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, CollectionMissing> {
221        self.collections.get(&id).ok_or(CollectionMissing(id))
222    }
223
224    /// Acquire a mutable handle to the collection state associated with `id`.
225    fn collection_mut(
226        &mut self,
227        id: GlobalId,
228    ) -> Result<&mut CollectionState<T>, CollectionMissing> {
229        self.collections.get_mut(&id).ok_or(CollectionMissing(id))
230    }
231
232    /// Acquire a handle to the collection state associated with `id`.
233    ///
234    /// # Panics
235    ///
236    /// Panics if the identified collection does not exist.
237    fn expect_collection(&self, id: GlobalId) -> &CollectionState<T> {
238        self.collections.get(&id).expect("collection must exist")
239    }
240
241    /// Acquire a mutable handle to the collection state associated with `id`.
242    ///
243    /// # Panics
244    ///
245    /// Panics if the identified collection does not exist.
246    fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T> {
247        self.collections
248            .get_mut(&id)
249            .expect("collection must exist")
250    }
251
252    fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState<T>)> {
253        self.collections.iter().map(|(id, coll)| (*id, coll))
254    }
255
256    /// Add a collection to the instance state.
257    ///
258    /// # Panics
259    ///
260    /// Panics if a collection with the same ID exists already.
261    fn add_collection(
262        &mut self,
263        id: GlobalId,
264        as_of: Antichain<T>,
265        shared: SharedCollectionState<T>,
266        storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
267        compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
268        replica_input_read_holds: Vec<ReadHold<T>>,
269        write_only: bool,
270        storage_sink: bool,
271        initial_as_of: Option<Antichain<T>>,
272        refresh_schedule: Option<RefreshSchedule>,
273    ) {
274        // Add global collection state.
275        let introspection = CollectionIntrospection::new(
276            id,
277            self.introspection_tx.clone(),
278            as_of.clone(),
279            storage_sink,
280            initial_as_of,
281            refresh_schedule,
282        );
283        let mut state = CollectionState::new(
284            id,
285            as_of.clone(),
286            shared,
287            storage_dependencies,
288            compute_dependencies,
289            Arc::clone(&self.read_hold_tx),
290            introspection,
291        );
292        // If the collection is write-only, clear its read policy to reflect that.
293        if write_only {
294            state.read_policy = None;
295        }
296
297        if let Some(previous) = self.collections.insert(id, state) {
298            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
299        }
300
301        // Add per-replica collection state.
302        for replica in self.replicas.values_mut() {
303            replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
304        }
305
306        // Update introspection.
307        self.report_dependency_updates(id, Diff::ONE);
308    }
309
310    fn remove_collection(&mut self, id: GlobalId) {
311        // Update introspection.
312        self.report_dependency_updates(id, Diff::MINUS_ONE);
313
314        // Remove per-replica collection state.
315        for replica in self.replicas.values_mut() {
316            replica.remove_collection(id);
317        }
318
319        // Remove global collection state.
320        self.collections.remove(&id);
321    }
322
323    fn add_replica_state(
324        &mut self,
325        id: ReplicaId,
326        client: ReplicaClient<T>,
327        config: ReplicaConfig,
328        epoch: u64,
329    ) {
330        let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
331
332        let metrics = self.metrics.for_replica(id);
333        let mut replica = ReplicaState::new(
334            id,
335            client,
336            config,
337            metrics,
338            self.introspection_tx.clone(),
339            epoch,
340        );
341
342        // Add per-replica collection state.
343        for (collection_id, collection) in &self.collections {
344            // Skip log collections not maintained by this replica.
345            if collection.log_collection && !log_ids.contains(collection_id) {
346                continue;
347            }
348
349            let as_of = if collection.log_collection {
350                // For log collections, we don't send a `CreateDataflow` command to the replica, so
351                // it doesn't know which as-of the controler chose and defaults to the minimum
352                // frontier instead. We need to initialize the controller-side tracking with the
353                // same frontier, to avoid observing regressions in the reported frontiers.
354                Antichain::from_elem(T::minimum())
355            } else {
356                collection.read_frontier().to_owned()
357            };
358
359            let input_read_holds = collection.storage_dependencies.values().cloned().collect();
360            replica.add_collection(*collection_id, as_of, input_read_holds);
361        }
362
363        self.replicas.insert(id, replica);
364    }
365
366    /// Enqueue the given response for delivery to the controller clients.
367    fn deliver_response(&self, response: ComputeControllerResponse<T>) {
368        // Failure to send means the `ComputeController` has been dropped and doesn't care about
369        // responses anymore.
370        let _ = self.response_tx.send(response);
371    }
372
373    /// Enqueue the given introspection updates for recording.
374    fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
375        // Failure to send means the `ComputeController` has been dropped and doesn't care about
376        // introspection updates anymore.
377        let _ = self.introspection_tx.send((type_, updates));
378    }
379
380    /// Returns whether the identified replica exists.
381    fn replica_exists(&self, id: ReplicaId) -> bool {
382        self.replicas.contains_key(&id)
383    }
384
385    /// Return the IDs of pending peeks targeting the specified replica.
386    fn peeks_targeting(
387        &self,
388        replica_id: ReplicaId,
389    ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)> {
390        self.peeks.iter().filter_map(move |(uuid, peek)| {
391            if peek.target_replica == Some(replica_id) {
392                Some((*uuid, peek))
393            } else {
394                None
395            }
396        })
397    }
398
399    /// Return the IDs of in-progress subscribes targeting the specified replica.
400    fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
401        self.subscribes.iter().filter_map(move |(id, subscribe)| {
402            let targeting = subscribe.target_replica == Some(replica_id);
403            targeting.then_some(*id)
404        })
405    }
406
407    /// Update introspection with the current collection frontiers.
408    ///
409    /// We could also do this directly in response to frontier changes, but doing it periodically
410    /// lets us avoid emitting some introspection updates that can be consolidated (e.g. a write
411    /// frontier updated immediately followed by a read frontier update).
412    ///
413    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
414    /// per second during normal operation.
415    fn update_frontier_introspection(&mut self) {
416        for collection in self.collections.values_mut() {
417            collection
418                .introspection
419                .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
420        }
421
422        for replica in self.replicas.values_mut() {
423            for collection in replica.collections.values_mut() {
424                collection
425                    .introspection
426                    .observe_frontier(&collection.write_frontier);
427            }
428        }
429    }
430
431    /// Refresh the controller state metrics for this instance.
432    ///
433    /// We could also do state metric updates directly in response to state changes, but that would
434    /// mean littering the code with metric update calls. Encapsulating state metric maintenance in
435    /// a single method is less noisy.
436    ///
437    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
438    /// per second during normal operation.
439    fn refresh_state_metrics(&self) {
440        let unscheduled_collections_count =
441            self.collections.values().filter(|c| !c.scheduled).count();
442        let connected_replica_count = self
443            .replicas
444            .values()
445            .filter(|r| r.client.is_connected())
446            .count();
447
448        self.metrics
449            .replica_count
450            .set(u64::cast_from(self.replicas.len()));
451        self.metrics
452            .collection_count
453            .set(u64::cast_from(self.collections.len()));
454        self.metrics
455            .collection_unscheduled_count
456            .set(u64::cast_from(unscheduled_collections_count));
457        self.metrics
458            .peek_count
459            .set(u64::cast_from(self.peeks.len()));
460        self.metrics
461            .subscribe_count
462            .set(u64::cast_from(self.subscribes.len()));
463        self.metrics
464            .copy_to_count
465            .set(u64::cast_from(self.copy_tos.len()));
466        self.metrics
467            .connected_replica_count
468            .set(u64::cast_from(connected_replica_count));
469    }
470
471    /// Refresh the wallclock lag introspection and metrics with the current lag values.
472    ///
473    /// This method produces wallclock lag metrics of two different shapes:
474    ///
475    /// * Histories: For each replica and each collection, we measure the lag of the write frontier
476    ///   behind the wallclock time every second. Every minute we emit the maximum lag observed
477    ///   over the last minute, together with the current time.
478    /// * Histograms: For each collection, we measure the lag of the write frontier behind
479    ///   wallclock time every second. Every minute we emit all lags observed over the last minute,
480    ///   together with the current histogram period.
481    ///
482    /// Histories are emitted to both Mz introspection and Prometheus, histograms only to
483    /// introspection. We treat lags of unreadable collections (i.e. collections that contain no
484    /// readable times) as undefined and set them to NULL in introspection and `u64::MAX` in
485    /// Prometheus.
486    ///
487    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
488    /// per second during normal operation.
489    fn refresh_wallclock_lag(&mut self) {
490        let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
491            Some(ts) => (self.wallclock_lag)(ts.clone()),
492            None => Duration::ZERO,
493        };
494
495        let now_ms = (self.now)();
496        let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
497        let histogram_labels = match &self.workload_class {
498            Some(wc) => [("workload_class", wc.clone())].into(),
499            None => BTreeMap::new(),
500        };
501
502        // First, iterate over all collections and collect histogram measurements.
503        // We keep a record of unreadable collections, so we can emit undefined lags for those here
504        // and below when we collect history measurements.
505        let mut unreadable_collections = BTreeSet::new();
506        for (id, collection) in &mut self.collections {
507            // We need to ask the storage controller for the read frontiers of storage collections.
508            let read_frontier = match self.storage_collections.collection_frontiers(*id) {
509                Ok(f) => f.read_capabilities,
510                Err(_) => collection.read_frontier(),
511            };
512            let write_frontier = collection.write_frontier();
513            let collection_unreadable = PartialOrder::less_equal(&write_frontier, &read_frontier);
514            if collection_unreadable {
515                unreadable_collections.insert(id);
516            }
517
518            if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
519                let bucket = if collection_unreadable {
520                    WallclockLag::Undefined
521                } else {
522                    let lag = frontier_lag(&write_frontier);
523                    let lag = lag.as_secs().next_power_of_two();
524                    WallclockLag::Seconds(lag)
525                };
526
527                let key = (histogram_period, bucket, histogram_labels.clone());
528                *stash.entry(key).or_default() += Diff::ONE;
529            }
530        }
531
532        // Second, iterate over all per-replica collections and collect history measurements.
533        for replica in self.replicas.values_mut() {
534            for (id, collection) in &mut replica.collections {
535                let lag = if unreadable_collections.contains(&id) {
536                    WallclockLag::Undefined
537                } else {
538                    let lag = frontier_lag(&collection.write_frontier);
539                    WallclockLag::Seconds(lag.as_secs())
540                };
541
542                if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
543                    *wallclock_lag_max = (*wallclock_lag_max).max(lag);
544                }
545
546                if let Some(metrics) = &mut collection.metrics {
547                    // No way to specify values as undefined in Prometheus metrics, so we use the
548                    // maximum value instead.
549                    let secs = lag.unwrap_seconds_or(u64::MAX);
550                    metrics.wallclock_lag.observe(secs);
551                };
552            }
553        }
554
555        // Record lags to persist, if it's time.
556        self.maybe_record_wallclock_lag();
557    }
558
559    /// Produce new wallclock lag introspection updates, provided enough time has passed since the
560    /// last recording.
561    //
562    /// We emit new introspection updates if the system time has passed into a new multiple of the
563    /// recording interval (typically 1 minute) since the last refresh. The storage controller uses
564    /// the same approach, ensuring that both controllers commit their lags at roughly the same
565    /// time, avoiding confusion caused by inconsistencies.
566    fn maybe_record_wallclock_lag(&mut self) {
567        if self.read_only {
568            return;
569        }
570
571        let duration_trunc = |datetime: DateTime<_>, interval| {
572            let td = TimeDelta::from_std(interval).ok()?;
573            datetime.duration_trunc(td).ok()
574        };
575
576        let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
577        let now_dt = mz_ore::now::to_datetime((self.now)());
578        let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
579            soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
580            let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
581            duration_trunc(now_dt, *default).unwrap()
582        });
583        if now_trunc <= self.wallclock_lag_last_recorded {
584            return;
585        }
586
587        let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
588
589        let mut history_updates = Vec::new();
590        for (replica_id, replica) in &mut self.replicas {
591            for (collection_id, collection) in &mut replica.collections {
592                let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
593                    continue;
594                };
595
596                let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
597                let row = Row::pack_slice(&[
598                    Datum::String(&collection_id.to_string()),
599                    Datum::String(&replica_id.to_string()),
600                    max_lag.into_interval_datum(),
601                    Datum::TimestampTz(now_ts),
602                ]);
603                history_updates.push((row, Diff::ONE));
604            }
605        }
606        if !history_updates.is_empty() {
607            self.deliver_introspection_updates(
608                IntrospectionType::WallclockLagHistory,
609                history_updates,
610            );
611        }
612
613        let mut histogram_updates = Vec::new();
614        let mut row_buf = Row::default();
615        for (collection_id, collection) in &mut self.collections {
616            let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
617                continue;
618            };
619
620            for ((period, lag, labels), count) in std::mem::take(stash) {
621                let mut packer = row_buf.packer();
622                packer.extend([
623                    Datum::TimestampTz(period.start),
624                    Datum::TimestampTz(period.end),
625                    Datum::String(&collection_id.to_string()),
626                    lag.into_uint64_datum(),
627                ]);
628                let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
629                packer.push_dict(labels);
630
631                histogram_updates.push((row_buf.clone(), count));
632            }
633        }
634        if !histogram_updates.is_empty() {
635            self.deliver_introspection_updates(
636                IntrospectionType::WallclockLagHistogram,
637                histogram_updates,
638            );
639        }
640
641        self.wallclock_lag_last_recorded = now_trunc;
642    }
643
644    /// Report updates (inserts or retractions) to the identified collection's dependencies.
645    ///
646    /// # Panics
647    ///
648    /// Panics if the identified collection does not exist.
649    fn report_dependency_updates(&self, id: GlobalId, diff: Diff) {
650        let collection = self.expect_collection(id);
651        let dependencies = collection.dependency_ids();
652
653        let updates = dependencies
654            .map(|dependency_id| {
655                let row = Row::pack_slice(&[
656                    Datum::String(&id.to_string()),
657                    Datum::String(&dependency_id.to_string()),
658                ]);
659                (row, diff)
660            })
661            .collect();
662
663        self.deliver_introspection_updates(IntrospectionType::ComputeDependencies, updates);
664    }
665
666    /// Returns `true` if the given collection is hydrated on at least one
667    /// replica.
668    ///
669    /// This also returns `true` in case this cluster does not have any
670    /// replicas.
671    #[mz_ore::instrument(level = "debug")]
672    pub fn collection_hydrated(&self, collection_id: GlobalId) -> Result<bool, CollectionMissing> {
673        if self.replicas.is_empty() {
674            return Ok(true);
675        }
676        for replica_state in self.replicas.values() {
677            let collection_state = replica_state
678                .collections
679                .get(&collection_id)
680                .ok_or(CollectionMissing(collection_id))?;
681
682            if collection_state.hydrated() {
683                return Ok(true);
684            }
685        }
686
687        Ok(false)
688    }
689
690    /// Returns `true` if each non-transient, non-excluded collection is hydrated on at
691    /// least one replica.
692    ///
693    /// This also returns `true` in case this cluster does not have any
694    /// replicas.
695    #[mz_ore::instrument(level = "debug")]
696    pub fn collections_hydrated_on_replicas(
697        &self,
698        target_replica_ids: Option<Vec<ReplicaId>>,
699        exclude_collections: &BTreeSet<GlobalId>,
700    ) -> Result<bool, HydrationCheckBadTarget> {
701        if self.replicas.is_empty() {
702            return Ok(true);
703        }
704        let mut all_hydrated = true;
705        let target_replicas: BTreeSet<ReplicaId> = self
706            .replicas
707            .keys()
708            .filter_map(|id| match target_replica_ids {
709                None => Some(id.clone()),
710                Some(ref ids) if ids.contains(id) => Some(id.clone()),
711                Some(_) => None,
712            })
713            .collect();
714        if let Some(targets) = target_replica_ids {
715            if target_replicas.is_empty() {
716                return Err(HydrationCheckBadTarget(targets));
717            }
718        }
719
720        for (id, _collection) in self.collections_iter() {
721            if id.is_transient() || exclude_collections.contains(&id) {
722                continue;
723            }
724
725            let mut collection_hydrated = false;
726            for replica_state in self.replicas.values() {
727                if !target_replicas.contains(&replica_state.id) {
728                    continue;
729                }
730                let collection_state = replica_state
731                    .collections
732                    .get(&id)
733                    .expect("missing collection state");
734
735                if collection_state.hydrated() {
736                    collection_hydrated = true;
737                    break;
738                }
739            }
740
741            if !collection_hydrated {
742                tracing::info!("collection {id} is not hydrated on any replica");
743                all_hydrated = false;
744                // We continue with our loop instead of breaking out early, so
745                // that we log all non-hydrated replicas.
746            }
747        }
748
749        Ok(all_hydrated)
750    }
751
752    /// Clean up collection state that is not needed anymore.
753    ///
754    /// Three conditions need to be true before we can remove state for a collection:
755    ///
756    ///  1. A client must have explicitly dropped the collection. If that is not the case, clients
757    ///     can still reasonably assume that the controller knows about the collection and can
758    ///     answer queries about it.
759    ///  2. There must be no outstanding read capabilities on the collection. As long as someone
760    ///     still holds read capabilities on a collection, we need to keep it around to be able
761    ///     to properly handle downgrading of said capabilities.
762    ///  3. All replica frontiers for the collection must have advanced to the empty frontier.
763    ///     Advancement to the empty frontiers signals that replicas are done computing the
764    ///     collection and that they won't send more `ComputeResponse`s for it. As long as we might
765    ///     receive responses for a collection we want to keep it around to be able to validate and
766    ///     handle these responses.
767    fn cleanup_collections(&mut self) {
768        let to_remove: Vec<_> = self
769            .collections_iter()
770            .filter(|(id, collection)| {
771                collection.dropped
772                    && collection.shared.lock_read_capabilities(|c| c.is_empty())
773                    && self
774                        .replicas
775                        .values()
776                        .all(|r| r.collection_frontiers_empty(*id))
777            })
778            .map(|(id, _collection)| id)
779            .collect();
780
781        for id in to_remove {
782            self.remove_collection(id);
783        }
784    }
785
786    /// Returns the state of the [`Instance`] formatted as JSON.
787    ///
788    /// The returned value is not guaranteed to be stable and may change at any point in time.
789    #[mz_ore::instrument(level = "debug")]
790    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
791        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
792        // returned object as a tradeoff between usability and stability. `serde_json` will fail
793        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
794        // prevents a future unrelated change from silently breaking this method.
795
796        // Destructure `self` here so we don't forget to consider dumping newly added fields.
797        let Self {
798            build_info: _,
799            storage_collections: _,
800            peek_stash_persist_location: _,
801            initialized,
802            read_only,
803            workload_class,
804            replicas,
805            collections,
806            log_sources: _,
807            peeks,
808            subscribes,
809            copy_tos,
810            history: _,
811            command_rx: _,
812            response_tx: _,
813            introspection_tx: _,
814            metrics: _,
815            dyncfg: _,
816            now: _,
817            wallclock_lag: _,
818            wallclock_lag_last_recorded,
819            read_hold_tx: _,
820            replica_tx: _,
821            replica_rx: _,
822        } = self;
823
824        let replicas: BTreeMap<_, _> = replicas
825            .iter()
826            .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
827            .collect::<Result<_, anyhow::Error>>()?;
828        let collections: BTreeMap<_, _> = collections
829            .iter()
830            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
831            .collect();
832        let peeks: BTreeMap<_, _> = peeks
833            .iter()
834            .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
835            .collect();
836        let subscribes: BTreeMap<_, _> = subscribes
837            .iter()
838            .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
839            .collect();
840        let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
841        let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
842
843        Ok(serde_json::json!({
844            "initialized": initialized,
845            "read_only": read_only,
846            "workload_class": workload_class,
847            "replicas": replicas,
848            "collections": collections,
849            "peeks": peeks,
850            "subscribes": subscribes,
851            "copy_tos": copy_tos,
852            "wallclock_lag_last_recorded": wallclock_lag_last_recorded,
853        }))
854    }
855
856    /// Reports the current write frontier for the identified compute collection.
857    pub(super) fn collection_write_frontier(
858        &self,
859        id: GlobalId,
860    ) -> Result<Antichain<T>, CollectionMissing> {
861        Ok(self.collection(id)?.write_frontier())
862    }
863}
864
865impl<T> Instance<T>
866where
867    T: ComputeControllerTimestamp,
868{
869    pub(super) fn new(
870        build_info: &'static BuildInfo,
871        storage: StorageCollections<T>,
872        peek_stash_persist_location: PersistLocation,
873        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
874        metrics: InstanceMetrics,
875        now: NowFn,
876        wallclock_lag: WallclockLagFn<T>,
877        dyncfg: Arc<ConfigSet>,
878        command_rx: mpsc::UnboundedReceiver<Command<T>>,
879        response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
880        read_hold_tx: read_holds::ChangeTx<T>,
881        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
882        read_only: bool,
883    ) -> Self {
884        let mut collections = BTreeMap::new();
885        let mut log_sources = BTreeMap::new();
886        for (log, id, shared) in arranged_logs {
887            let collection = CollectionState::new_log_collection(
888                id,
889                shared,
890                Arc::clone(&read_hold_tx),
891                introspection_tx.clone(),
892            );
893            collections.insert(id, collection);
894            log_sources.insert(log, id);
895        }
896
897        let history = ComputeCommandHistory::new(metrics.for_history());
898
899        let send_count = metrics.response_send_count.clone();
900        let recv_count = metrics.response_recv_count.clone();
901        let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
902
903        let now_dt = mz_ore::now::to_datetime(now());
904
905        Self {
906            build_info,
907            storage_collections: storage,
908            peek_stash_persist_location,
909            initialized: false,
910            read_only,
911            workload_class: None,
912            replicas: Default::default(),
913            collections,
914            log_sources,
915            peeks: Default::default(),
916            subscribes: Default::default(),
917            copy_tos: Default::default(),
918            history,
919            command_rx,
920            response_tx,
921            introspection_tx,
922            metrics,
923            dyncfg,
924            now,
925            wallclock_lag,
926            wallclock_lag_last_recorded: now_dt,
927            read_hold_tx,
928            replica_tx,
929            replica_rx,
930        }
931    }
932
933    pub(super) async fn run(mut self) {
934        self.send(ComputeCommand::Hello {
935            // The nonce is protocol iteration-specific and will be set in
936            // `ReplicaTask::specialize_command`.
937            nonce: Uuid::default(),
938        });
939
940        let instance_config = InstanceConfig {
941            peek_stash_persist_location: self.peek_stash_persist_location.clone(),
942            // The remaining fields are replica-specific and will be set in
943            // `ReplicaTask::specialize_command`.
944            logging: Default::default(),
945            expiration_offset: Default::default(),
946        };
947
948        self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
949
950        loop {
951            tokio::select! {
952                command = self.command_rx.recv() => match command {
953                    Some(cmd) => cmd(&mut self),
954                    None => break,
955                },
956                response = self.replica_rx.recv() => match response {
957                    Some(response) => self.handle_response(response),
958                    None => unreachable!("self owns a sender side of the channel"),
959                }
960            }
961        }
962    }
963
964    /// Update instance configuration.
965    #[mz_ore::instrument(level = "debug")]
966    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
967        if let Some(workload_class) = &config_params.workload_class {
968            self.workload_class = workload_class.clone();
969        }
970
971        let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
972        self.send(command);
973    }
974
975    /// Marks the end of any initialization commands.
976    ///
977    /// Intended to be called by `Controller`, rather than by other code.
978    /// Calling this method repeatedly has no effect.
979    #[mz_ore::instrument(level = "debug")]
980    pub fn initialization_complete(&mut self) {
981        // The compute protocol requires that `InitializationComplete` is sent only once.
982        if !self.initialized {
983            self.send(ComputeCommand::InitializationComplete);
984            self.initialized = true;
985        }
986    }
987
988    /// Allows collections to affect writes to external systems (persist).
989    ///
990    /// Calling this method repeatedly has no effect.
991    #[mz_ore::instrument(level = "debug")]
992    pub fn allow_writes(&mut self, collection_id: GlobalId) -> Result<(), CollectionMissing> {
993        let collection = self.collection_mut(collection_id)?;
994
995        // Do not send redundant allow-writes commands.
996        if !collection.read_only {
997            return Ok(());
998        }
999
1000        // Don't send allow-writes for collections that are not installed.
1001        let as_of = collection.read_frontier();
1002
1003        // If the collection has an empty `as_of`, it was either never installed on the replica or
1004        // has since been dropped. In either case the replica does not expect any commands for it.
1005        if as_of.is_empty() {
1006            return Ok(());
1007        }
1008
1009        collection.read_only = false;
1010        self.send(ComputeCommand::AllowWrites(collection_id));
1011
1012        Ok(())
1013    }
1014
1015    /// Shut down this instance.
1016    ///
1017    /// This method runs various assertions ensuring the instance state is empty. It exists to help
1018    /// us find bugs where the client drops a compute instance that still has replicas or
1019    /// collections installed, and later assumes that said replicas/collections still exists.
1020    ///
1021    /// # Panics
1022    ///
1023    /// Panics if the compute instance still has active replicas.
1024    /// Panics if the compute instance still has collections installed.
1025    #[mz_ore::instrument(level = "debug")]
1026    pub fn shutdown(&mut self) {
1027        // Taking the `command_rx` ensures that the [`Instance::run`] loop terminates.
1028        let (_tx, rx) = mpsc::unbounded_channel();
1029        let mut command_rx = std::mem::replace(&mut self.command_rx, rx);
1030
1031        // Apply all outstanding read hold changes. This might cause read hold downgrades to be
1032        // added to `command_tx`, so we need to apply those in a loop.
1033        //
1034        // TODO(teskje): Make `Command` an enum and assert that all received commands are read
1035        // hold downgrades.
1036        while let Ok(cmd) = command_rx.try_recv() {
1037            cmd(self);
1038        }
1039
1040        // Collections might have been dropped but not cleaned up yet.
1041        self.cleanup_collections();
1042
1043        let stray_replicas: Vec<_> = self.replicas.keys().collect();
1044        soft_assert_or_log!(
1045            stray_replicas.is_empty(),
1046            "dropped instance still has provisioned replicas: {stray_replicas:?}",
1047        );
1048
1049        let collections = self.collections.iter();
1050        let stray_collections: Vec<_> = collections
1051            .filter(|(_, c)| !c.log_collection)
1052            .map(|(id, _)| id)
1053            .collect();
1054        soft_assert_or_log!(
1055            stray_collections.is_empty(),
1056            "dropped instance still has installed collections: {stray_collections:?}",
1057        );
1058    }
1059
1060    /// Sends a command to all replicas of this instance.
1061    #[mz_ore::instrument(level = "debug")]
1062    fn send(&mut self, cmd: ComputeCommand<T>) {
1063        // Record the command so that new replicas can be brought up to speed.
1064        self.history.push(cmd.clone());
1065
1066        // Clone the command for each active replica.
1067        for replica in self.replicas.values_mut() {
1068            // Swallow error, we'll notice because the replica task has stopped.
1069            let _ = replica.client.send(cmd.clone());
1070        }
1071    }
1072
1073    /// Add a new instance replica, by ID.
1074    #[mz_ore::instrument(level = "debug")]
1075    pub fn add_replica(
1076        &mut self,
1077        id: ReplicaId,
1078        mut config: ReplicaConfig,
1079        epoch: Option<u64>,
1080    ) -> Result<(), ReplicaExists> {
1081        if self.replica_exists(id) {
1082            return Err(ReplicaExists(id));
1083        }
1084
1085        config.logging.index_logs = self.log_sources.clone();
1086
1087        let epoch = epoch.unwrap_or(1);
1088        let metrics = self.metrics.for_replica(id);
1089        let client = ReplicaClient::spawn(
1090            id,
1091            self.build_info,
1092            config.clone(),
1093            epoch,
1094            metrics.clone(),
1095            Arc::clone(&self.dyncfg),
1096            self.replica_tx.clone(),
1097        );
1098
1099        // Take this opportunity to clean up the history we should present.
1100        self.history.reduce();
1101
1102        // Advance the uppers of source imports
1103        self.history.update_source_uppers(&self.storage_collections);
1104
1105        // Replay the commands at the client, creating new dataflow identifiers.
1106        for command in self.history.iter() {
1107            if client.send(command.clone()).is_err() {
1108                // We swallow the error here. On the next send, we will fail again, and
1109                // restart the connection as well as this rehydration.
1110                tracing::warn!("Replica {:?} connection terminated during hydration", id);
1111                break;
1112            }
1113        }
1114
1115        // Add replica to tracked state.
1116        self.add_replica_state(id, client, config, epoch);
1117
1118        Ok(())
1119    }
1120
1121    /// Remove an existing instance replica, by ID.
1122    #[mz_ore::instrument(level = "debug")]
1123    pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1124        self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1125
1126        // Subscribes targeting this replica either won't be served anymore (if the replica is
1127        // dropped) or might produce inconsistent output (if the target collection is an
1128        // introspection index). We produce an error to inform upstream.
1129        let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1130        for subscribe_id in to_drop {
1131            let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1132            let response = ComputeControllerResponse::SubscribeResponse(
1133                subscribe_id,
1134                SubscribeBatch {
1135                    lower: subscribe.frontier.clone(),
1136                    upper: subscribe.frontier,
1137                    updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1138                },
1139            );
1140            self.deliver_response(response);
1141        }
1142
1143        // Peeks targeting this replica might not be served anymore (if the replica is dropped).
1144        // If the replica has failed it might come back and respond to the peek later, but it still
1145        // seems like a good idea to cancel the peek to inform the caller about the failure. This
1146        // is consistent with how we handle targeted subscribes above.
1147        let mut peek_responses = Vec::new();
1148        let mut to_drop = Vec::new();
1149        for (uuid, peek) in self.peeks_targeting(id) {
1150            peek_responses.push(ComputeControllerResponse::PeekNotification(
1151                uuid,
1152                PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1153                peek.otel_ctx.clone(),
1154            ));
1155            to_drop.push(uuid);
1156        }
1157        for response in peek_responses {
1158            self.deliver_response(response);
1159        }
1160        for uuid in to_drop {
1161            let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1162            self.finish_peek(uuid, response);
1163        }
1164
1165        // We might have a chance to forward implied capabilities and reduce the cost of bringing
1166        // up the next replica, if the dropped replica was the only one in the cluster.
1167        self.forward_implied_capabilities();
1168
1169        Ok(())
1170    }
1171
1172    /// Rehydrate the given instance replica.
1173    ///
1174    /// # Panics
1175    ///
1176    /// Panics if the specified replica does not exist.
1177    fn rehydrate_replica(&mut self, id: ReplicaId) {
1178        let config = self.replicas[&id].config.clone();
1179        let epoch = self.replicas[&id].epoch + 1;
1180
1181        self.remove_replica(id).expect("replica must exist");
1182        let result = self.add_replica(id, config, Some(epoch));
1183
1184        match result {
1185            Ok(()) => (),
1186            Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1187        }
1188    }
1189
1190    /// Rehydrate any failed replicas of this instance.
1191    fn rehydrate_failed_replicas(&mut self) {
1192        let replicas = self.replicas.iter();
1193        let failed_replicas: Vec<_> = replicas
1194            .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1195            .collect();
1196
1197        for replica_id in failed_replicas {
1198            self.rehydrate_replica(replica_id);
1199        }
1200    }
1201
1202    /// Creates the described dataflow and initializes state for its output.
1203    ///
1204    /// This method expects a `DataflowDescription` with an `as_of` frontier specified, as well as
1205    /// for each imported collection a read hold in `import_read_holds` at at least the `as_of`.
1206    ///
1207    /// If a `subscribe_target_replica` is given, any subscribes exported by the dataflow are
1208    /// configured to target that replica, i.e., only subscribe responses sent by that replica are
1209    /// considered.
1210    #[mz_ore::instrument(level = "debug")]
1211    pub fn create_dataflow(
1212        &mut self,
1213        dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1214        import_read_holds: Vec<ReadHold<T>>,
1215        subscribe_target_replica: Option<ReplicaId>,
1216        mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState<T>>,
1217    ) -> Result<(), DataflowCreationError> {
1218        use DataflowCreationError::*;
1219
1220        if let Some(replica_id) = subscribe_target_replica {
1221            if !self.replica_exists(replica_id) {
1222                return Err(ReplicaMissing(replica_id));
1223            }
1224        }
1225
1226        // Simple sanity checks around `as_of`
1227        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1228        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1229            return Err(EmptyAsOfForSubscribe);
1230        }
1231        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1232            return Err(EmptyAsOfForCopyTo);
1233        }
1234
1235        // Collect all dependencies of the dataflow, and read holds on them at the `as_of`.
1236        let mut storage_dependencies = BTreeMap::new();
1237        let mut compute_dependencies = BTreeMap::new();
1238
1239        // When we install per-replica input read holds, we cannot use the `as_of` because of
1240        // reconciliation: Existing slow replicas might be reading from the inputs at times before
1241        // the `as_of` and we would rather not crash them by allowing their inputs to compact too
1242        // far. So instead we take read holds at the least time available.
1243        let mut replica_input_read_holds = Vec::new();
1244
1245        let mut import_read_holds: BTreeMap<_, _> =
1246            import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1247
1248        for &id in dataflow.source_imports.keys() {
1249            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1250            replica_input_read_holds.push(read_hold.clone());
1251
1252            read_hold
1253                .try_downgrade(as_of.clone())
1254                .map_err(|_| ReadHoldInsufficient(id))?;
1255            storage_dependencies.insert(id, read_hold);
1256        }
1257
1258        for &id in dataflow.index_imports.keys() {
1259            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1260            read_hold
1261                .try_downgrade(as_of.clone())
1262                .map_err(|_| ReadHoldInsufficient(id))?;
1263            compute_dependencies.insert(id, read_hold);
1264        }
1265
1266        // If the `as_of` is empty, we are not going to create a dataflow, so replicas won't read
1267        // from the inputs.
1268        if as_of.is_empty() {
1269            replica_input_read_holds = Default::default();
1270        }
1271
1272        // Install collection state for each of the exports.
1273        for export_id in dataflow.export_ids() {
1274            let shared = shared_collection_state
1275                .remove(&export_id)
1276                .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1277            let write_only = dataflow.sink_exports.contains_key(&export_id);
1278            let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1279
1280            self.add_collection(
1281                export_id,
1282                as_of.clone(),
1283                shared,
1284                storage_dependencies.clone(),
1285                compute_dependencies.clone(),
1286                replica_input_read_holds.clone(),
1287                write_only,
1288                storage_sink,
1289                dataflow.initial_storage_as_of.clone(),
1290                dataflow.refresh_schedule.clone(),
1291            );
1292
1293            // If the export is a storage sink, we can advance its write frontier to the write
1294            // frontier of the target storage collection.
1295            if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1296                self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1297            }
1298        }
1299
1300        // Initialize tracking of subscribes.
1301        for subscribe_id in dataflow.subscribe_ids() {
1302            self.subscribes
1303                .insert(subscribe_id, ActiveSubscribe::new(subscribe_target_replica));
1304        }
1305
1306        // Initialize tracking of copy tos.
1307        for copy_to_id in dataflow.copy_to_ids() {
1308            self.copy_tos.insert(copy_to_id);
1309        }
1310
1311        // Here we augment all imported sources and all exported sinks with the appropriate
1312        // storage metadata needed by the compute instance.
1313        let mut source_imports = BTreeMap::new();
1314        for (id, import) in dataflow.source_imports {
1315            let frontiers = self
1316                .storage_collections
1317                .collection_frontiers(id)
1318                .expect("collection exists");
1319
1320            let collection_metadata = self
1321                .storage_collections
1322                .collection_metadata(id)
1323                .expect("we have a read hold on this collection");
1324
1325            let desc = SourceInstanceDesc {
1326                storage_metadata: collection_metadata.clone(),
1327                arguments: import.desc.arguments,
1328                typ: import.desc.typ.clone(),
1329            };
1330            source_imports.insert(
1331                id,
1332                mz_compute_types::dataflows::SourceImport {
1333                    desc,
1334                    monotonic: import.monotonic,
1335                    with_snapshot: import.with_snapshot,
1336                    upper: frontiers.write_frontier,
1337                },
1338            );
1339        }
1340
1341        let mut sink_exports = BTreeMap::new();
1342        for (id, se) in dataflow.sink_exports {
1343            let connection = match se.connection {
1344                ComputeSinkConnection::MaterializedView(conn) => {
1345                    let metadata = self
1346                        .storage_collections
1347                        .collection_metadata(id)
1348                        .map_err(|_| CollectionMissing(id))?
1349                        .clone();
1350                    let conn = MaterializedViewSinkConnection {
1351                        value_desc: conn.value_desc,
1352                        storage_metadata: metadata,
1353                    };
1354                    ComputeSinkConnection::MaterializedView(conn)
1355                }
1356                ComputeSinkConnection::ContinualTask(conn) => {
1357                    let metadata = self
1358                        .storage_collections
1359                        .collection_metadata(id)
1360                        .map_err(|_| DataflowCreationError::CollectionMissing(id))?
1361                        .clone();
1362                    let conn = ContinualTaskConnection {
1363                        input_id: conn.input_id,
1364                        storage_metadata: metadata,
1365                    };
1366                    ComputeSinkConnection::ContinualTask(conn)
1367                }
1368                ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1369                ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1370                    ComputeSinkConnection::CopyToS3Oneshot(conn)
1371                }
1372            };
1373            let desc = ComputeSinkDesc {
1374                from: se.from,
1375                from_desc: se.from_desc,
1376                connection,
1377                with_snapshot: se.with_snapshot,
1378                up_to: se.up_to,
1379                non_null_assertions: se.non_null_assertions,
1380                refresh_schedule: se.refresh_schedule,
1381            };
1382            sink_exports.insert(id, desc);
1383        }
1384
1385        // Flatten the dataflow plans into the representation expected by replicas.
1386        let objects_to_build = dataflow
1387            .objects_to_build
1388            .into_iter()
1389            .map(|object| BuildDesc {
1390                id: object.id,
1391                plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1392            })
1393            .collect();
1394
1395        let augmented_dataflow = DataflowDescription {
1396            source_imports,
1397            sink_exports,
1398            objects_to_build,
1399            // The rest of the fields are identical
1400            index_imports: dataflow.index_imports,
1401            index_exports: dataflow.index_exports,
1402            as_of: dataflow.as_of.clone(),
1403            until: dataflow.until,
1404            initial_storage_as_of: dataflow.initial_storage_as_of,
1405            refresh_schedule: dataflow.refresh_schedule,
1406            debug_name: dataflow.debug_name,
1407            time_dependence: dataflow.time_dependence,
1408        };
1409
1410        if augmented_dataflow.is_transient() {
1411            tracing::debug!(
1412                name = %augmented_dataflow.debug_name,
1413                import_ids = %augmented_dataflow.display_import_ids(),
1414                export_ids = %augmented_dataflow.display_export_ids(),
1415                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1416                until = ?augmented_dataflow.until.elements(),
1417                "creating dataflow",
1418            );
1419        } else {
1420            tracing::info!(
1421                name = %augmented_dataflow.debug_name,
1422                import_ids = %augmented_dataflow.display_import_ids(),
1423                export_ids = %augmented_dataflow.display_export_ids(),
1424                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1425                until = ?augmented_dataflow.until.elements(),
1426                "creating dataflow",
1427            );
1428        }
1429
1430        // Skip the actual dataflow creation for an empty `as_of`. (Happens e.g. for the
1431        // bootstrapping of a REFRESH AT mat view that is past its last refresh.)
1432        if as_of.is_empty() {
1433            tracing::info!(
1434                name = %augmented_dataflow.debug_name,
1435                "not sending `CreateDataflow`, because of empty `as_of`",
1436            );
1437        } else {
1438            let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1439            let dataflow = Box::new(augmented_dataflow);
1440            self.send(ComputeCommand::CreateDataflow(dataflow));
1441
1442            for id in collections {
1443                self.maybe_schedule_collection(id);
1444            }
1445        }
1446
1447        Ok(())
1448    }
1449
1450    /// Schedule the identified collection if all its inputs are available.
1451    ///
1452    /// # Panics
1453    ///
1454    /// Panics if the identified collection does not exist.
1455    fn maybe_schedule_collection(&mut self, id: GlobalId) {
1456        let collection = self.expect_collection(id);
1457
1458        // Don't schedule collections twice.
1459        if collection.scheduled {
1460            return;
1461        }
1462
1463        let as_of = collection.read_frontier();
1464
1465        // If the collection has an empty `as_of`, it was either never installed on the replica or
1466        // has since been dropped. In either case the replica does not expect any commands for it.
1467        if as_of.is_empty() {
1468            return;
1469        }
1470
1471        let ready = if id.is_transient() {
1472            // Always schedule transient collections immediately. The assumption is that those are
1473            // created by interactive user commands and we want to schedule them as quickly as
1474            // possible. Inputs might not yet be available, but when they become available, we
1475            // don't need to wait for the controller to become aware and for the scheduling check
1476            // to run again.
1477            true
1478        } else {
1479            // Ignore self-dependencies. Any self-dependencies do not need to be
1480            // available at the as_of for the dataflow to make progress, so we
1481            // can ignore them here. At the moment, only continual tasks have
1482            // self-dependencies, but this logic is correct for any dataflow, so
1483            // we don't special case it to CTs.
1484            let not_self_dep = |x: &GlobalId| *x != id;
1485
1486            // Make sure we never schedule a collection before its input compute collections have
1487            // been scheduled. Scheduling in the wrong order can lead to deadlocks.
1488            let mut deps_scheduled = true;
1489
1490            // Check dependency frontiers to determine if all inputs are
1491            // available. An input is available when its frontier is greater
1492            // than the `as_of`, i.e., all input data up to and including the
1493            // `as_of` has been sealed.
1494            let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1495            let mut compute_frontiers = Vec::new();
1496            for id in compute_deps {
1497                let dep = &self.expect_collection(id);
1498                deps_scheduled &= dep.scheduled;
1499                compute_frontiers.push(dep.write_frontier());
1500            }
1501
1502            let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1503            let storage_frontiers = self
1504                .storage_collections
1505                .collections_frontiers(storage_deps.collect())
1506                .expect("must exist");
1507            let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1508
1509            let mut frontiers = compute_frontiers.into_iter().chain(storage_frontiers);
1510            let frontiers_ready =
1511                frontiers.all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1512
1513            deps_scheduled && frontiers_ready
1514        };
1515
1516        if ready {
1517            self.send(ComputeCommand::Schedule(id));
1518            let collection = self.expect_collection_mut(id);
1519            collection.scheduled = true;
1520        }
1521    }
1522
1523    /// Schedule any unscheduled collections that are ready.
1524    fn schedule_collections(&mut self) {
1525        let ids: Vec<_> = self.collections.keys().copied().collect();
1526        for id in ids {
1527            self.maybe_schedule_collection(id);
1528        }
1529    }
1530
1531    /// Drops the read capability for the given collections and allows their resources to be
1532    /// reclaimed.
1533    #[mz_ore::instrument(level = "debug")]
1534    pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1535        for id in &ids {
1536            let collection = self.collection_mut(*id)?;
1537
1538            // Mark the collection as dropped to allow it to be removed from the controller state.
1539            collection.dropped = true;
1540
1541            // Drop the implied and warmup read holds to announce that clients are not
1542            // interested in the collection anymore.
1543            collection.implied_read_hold.release();
1544            collection.warmup_read_hold.release();
1545
1546            // If the collection is a subscribe, stop tracking it. This ensures that the controller
1547            // ceases to produce `SubscribeResponse`s for this subscribe.
1548            self.subscribes.remove(id);
1549            // If the collection is a copy to, stop tracking it. This ensures that the controller
1550            // ceases to produce `CopyToResponse`s` for this copy to.
1551            self.copy_tos.remove(id);
1552        }
1553
1554        Ok(())
1555    }
1556
1557    /// Initiate a peek request for the contents of `id` at `timestamp`.
1558    ///
1559    /// If this returns an error, then it didn't modify any `Instance` state.
1560    #[mz_ore::instrument(level = "debug")]
1561    pub fn peek(
1562        &mut self,
1563        peek_target: PeekTarget,
1564        literal_constraints: Option<Vec<Row>>,
1565        uuid: Uuid,
1566        timestamp: T,
1567        result_desc: RelationDesc,
1568        finishing: RowSetFinishing,
1569        map_filter_project: mz_expr::SafeMfpPlan,
1570        mut read_hold: ReadHold<T>,
1571        target_replica: Option<ReplicaId>,
1572        peek_response_tx: oneshot::Sender<PeekResponse>,
1573    ) -> Result<(), PeekError> {
1574        use PeekError::*;
1575
1576        let target_id = peek_target.id();
1577
1578        // Downgrade the provided read hold to the peek time.
1579        if read_hold.id() != target_id {
1580            return Err(ReadHoldIdMismatch(read_hold.id()));
1581        }
1582        read_hold
1583            .try_downgrade(Antichain::from_elem(timestamp.clone()))
1584            .map_err(|_| ReadHoldInsufficient(target_id))?;
1585
1586        if let Some(target) = target_replica {
1587            if !self.replica_exists(target) {
1588                return Err(ReplicaMissing(target));
1589            }
1590        }
1591
1592        let otel_ctx = OpenTelemetryContext::obtain();
1593
1594        self.peeks.insert(
1595            uuid,
1596            PendingPeek {
1597                target_replica,
1598                // TODO(guswynn): can we just hold the `tracing::Span` here instead?
1599                otel_ctx: otel_ctx.clone(),
1600                requested_at: Instant::now(),
1601                read_hold,
1602                peek_response_tx,
1603                limit: finishing.limit.map(usize::cast_from),
1604                offset: finishing.offset,
1605            },
1606        );
1607
1608        let peek = Peek {
1609            literal_constraints,
1610            uuid,
1611            timestamp,
1612            finishing,
1613            map_filter_project,
1614            // Obtain an `OpenTelemetryContext` from the thread-local tracing
1615            // tree to forward it on to the compute worker.
1616            otel_ctx,
1617            target: peek_target,
1618            result_desc,
1619        };
1620        self.send(ComputeCommand::Peek(Box::new(peek)));
1621
1622        Ok(())
1623    }
1624
1625    /// Cancels an existing peek request.
1626    #[mz_ore::instrument(level = "debug")]
1627    pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1628        let Some(peek) = self.peeks.get_mut(&uuid) else {
1629            tracing::warn!("did not find pending peek for {uuid}");
1630            return;
1631        };
1632
1633        let duration = peek.requested_at.elapsed();
1634        self.metrics
1635            .observe_peek_response(&PeekResponse::Canceled, duration);
1636
1637        // Enqueue a notification for the cancellation.
1638        let otel_ctx = peek.otel_ctx.clone();
1639        otel_ctx.attach_as_parent();
1640
1641        self.deliver_response(ComputeControllerResponse::PeekNotification(
1642            uuid,
1643            PeekNotification::Canceled,
1644            otel_ctx,
1645        ));
1646
1647        // Finish the peek.
1648        // This will also propagate the cancellation to the replicas.
1649        self.finish_peek(uuid, reason);
1650    }
1651
1652    /// Assigns a read policy to specific identifiers.
1653    ///
1654    /// The policies are assigned in the order presented, and repeated identifiers should
1655    /// conclude with the last policy. Changing a policy will immediately downgrade the read
1656    /// capability if appropriate, but it will not "recover" the read capability if the prior
1657    /// capability is already ahead of it.
1658    ///
1659    /// Identifiers not present in `policies` retain their existing read policies.
1660    ///
1661    /// It is an error to attempt to set a read policy for a collection that is not readable in the
1662    /// context of compute. At this time, only indexes are readable compute collections.
1663    #[mz_ore::instrument(level = "debug")]
1664    pub fn set_read_policy(
1665        &mut self,
1666        policies: Vec<(GlobalId, ReadPolicy<T>)>,
1667    ) -> Result<(), ReadPolicyError> {
1668        // Do error checking upfront, to avoid introducing inconsistencies between a collection's
1669        // `implied_capability` and `read_capabilities`.
1670        for (id, _policy) in &policies {
1671            let collection = self.collection(*id)?;
1672            if collection.read_policy.is_none() {
1673                return Err(ReadPolicyError::WriteOnlyCollection(*id));
1674            }
1675        }
1676
1677        for (id, new_policy) in policies {
1678            let collection = self.expect_collection_mut(id);
1679            let new_since = new_policy.frontier(collection.write_frontier().borrow());
1680            let _ = collection.implied_read_hold.try_downgrade(new_since);
1681            collection.read_policy = Some(new_policy);
1682        }
1683
1684        Ok(())
1685    }
1686
1687    /// Advance the global write frontier of the given collection.
1688    ///
1689    /// Frontier regressions are gracefully ignored.
1690    ///
1691    /// # Panics
1692    ///
1693    /// Panics if the identified collection does not exist.
1694    #[mz_ore::instrument(level = "debug")]
1695    fn maybe_update_global_write_frontier(&mut self, id: GlobalId, new_frontier: Antichain<T>) {
1696        let collection = self.expect_collection_mut(id);
1697
1698        let advanced = collection.shared.lock_write_frontier(|f| {
1699            let advanced = PartialOrder::less_than(f, &new_frontier);
1700            if advanced {
1701                f.clone_from(&new_frontier);
1702            }
1703            advanced
1704        });
1705
1706        if !advanced {
1707            return;
1708        }
1709
1710        // Relax the implied read hold according to the read policy.
1711        let new_since = match &collection.read_policy {
1712            Some(read_policy) => {
1713                // For readable collections the read frontier is determined by applying the
1714                // client-provided read policy to the write frontier.
1715                read_policy.frontier(new_frontier.borrow())
1716            }
1717            None => {
1718                // Write-only collections cannot be read within the context of the compute
1719                // controller, so their read frontier only controls the read holds taken on their
1720                // inputs. We can safely downgrade the input read holds to any time less than the
1721                // write frontier.
1722                //
1723                // Note that some write-only collections (continual tasks) need to observe changes
1724                // at their current write frontier during hydration. Thus, we cannot downgrade the
1725                // read frontier to the write frontier and instead step it back by one.
1726                Antichain::from_iter(
1727                    new_frontier
1728                        .iter()
1729                        .map(|t| t.step_back().unwrap_or_else(T::minimum)),
1730                )
1731            }
1732        };
1733        let _ = collection.implied_read_hold.try_downgrade(new_since);
1734
1735        // Report the frontier advancement.
1736        self.deliver_response(ComputeControllerResponse::FrontierUpper {
1737            id,
1738            upper: new_frontier,
1739        });
1740    }
1741
1742    /// Apply a collection read hold change.
1743    pub(super) fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch<T>) {
1744        let Some(collection) = self.collections.get_mut(&id) else {
1745            soft_panic_or_log!(
1746                "read hold change for absent collection (id={id}, changes={update:?})"
1747            );
1748            return;
1749        };
1750
1751        let new_since = collection.shared.lock_read_capabilities(|caps| {
1752            // Sanity check to prevent corrupted `read_capabilities`, which can cause hard-to-debug
1753            // issues (usually stuck read frontiers).
1754            let read_frontier = caps.frontier();
1755            for (time, diff) in update.iter() {
1756                let count = caps.count_for(time) + diff;
1757                assert!(
1758                    count >= 0,
1759                    "invalid read capabilities update: negative capability \
1760             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1761                );
1762                assert!(
1763                    count == 0 || read_frontier.less_equal(time),
1764                    "invalid read capabilities update: frontier regression \
1765             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1766                );
1767            }
1768
1769            // Apply read capability updates and learn about resulting changes to the read
1770            // frontier.
1771            let changes = caps.update_iter(update.drain());
1772
1773            let changed = changes.count() > 0;
1774            changed.then(|| caps.frontier().to_owned())
1775        });
1776
1777        let Some(new_since) = new_since else {
1778            return; // read frontier did not change
1779        };
1780
1781        // Propagate read frontier update to dependencies.
1782        for read_hold in collection.compute_dependencies.values_mut() {
1783            read_hold
1784                .try_downgrade(new_since.clone())
1785                .expect("frontiers don't regress");
1786        }
1787        for read_hold in collection.storage_dependencies.values_mut() {
1788            read_hold
1789                .try_downgrade(new_since.clone())
1790                .expect("frontiers don't regress");
1791        }
1792
1793        // Produce `AllowCompaction` command.
1794        self.send(ComputeCommand::AllowCompaction {
1795            id,
1796            frontier: new_since,
1797        });
1798    }
1799
1800    /// Fulfills a registered peek and cleans up associated state.
1801    ///
1802    /// As part of this we:
1803    ///  * Send a `PeekResponse` through the peek's response channel.
1804    ///  * Emit a `CancelPeek` command to instruct replicas to stop spending resources on this
1805    ///    peek, and to allow the `ComputeCommandHistory` to reduce away the corresponding `Peek`
1806    ///    command.
1807    ///  * Remove the read hold for this peek, unblocking compaction that might have waited on it.
1808    fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1809        let Some(peek) = self.peeks.remove(&uuid) else {
1810            return;
1811        };
1812
1813        // The recipient might not be interested in the peek response anymore, which is fine.
1814        let _ = peek.peek_response_tx.send(response);
1815
1816        // NOTE: We need to send the `CancelPeek` command _before_ we release the peek's read hold
1817        // (by dropping it), to avoid the edge case that caused database-issues#4812.
1818        self.send(ComputeCommand::CancelPeek { uuid });
1819
1820        drop(peek.read_hold);
1821    }
1822
1823    /// Handles a response from a replica. Replica IDs are re-used across replica restarts, so we
1824    /// use the replica epoch to drop stale responses.
1825    fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse<T>) {
1826        // Filter responses from non-existing or stale replicas.
1827        if self
1828            .replicas
1829            .get(&replica_id)
1830            .filter(|replica| replica.epoch == epoch)
1831            .is_none()
1832        {
1833            return;
1834        }
1835
1836        // Invariant: the replica exists and has the expected epoch.
1837
1838        match response {
1839            ComputeResponse::Frontiers(id, frontiers) => {
1840                self.handle_frontiers_response(id, frontiers, replica_id);
1841            }
1842            ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
1843                self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
1844            }
1845            ComputeResponse::CopyToResponse(id, response) => {
1846                self.handle_copy_to_response(id, response, replica_id);
1847            }
1848            ComputeResponse::SubscribeResponse(id, response) => {
1849                self.handle_subscribe_response(id, response, replica_id);
1850            }
1851            ComputeResponse::Status(response) => {
1852                self.handle_status_response(response, replica_id);
1853            }
1854        }
1855    }
1856
1857    /// Handle new frontiers, returning any compute response that needs to
1858    /// be sent to the client.
1859    fn handle_frontiers_response(
1860        &mut self,
1861        id: GlobalId,
1862        frontiers: FrontiersResponse<T>,
1863        replica_id: ReplicaId,
1864    ) {
1865        if !self.collections.contains_key(&id) {
1866            soft_panic_or_log!(
1867                "frontiers update for an unknown collection \
1868                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1869            );
1870            return;
1871        }
1872        let Some(replica) = self.replicas.get_mut(&replica_id) else {
1873            soft_panic_or_log!(
1874                "frontiers update for an unknown replica \
1875                 (replica_id={replica_id}, frontiers={frontiers:?})"
1876            );
1877            return;
1878        };
1879        let Some(replica_collection) = replica.collections.get_mut(&id) else {
1880            soft_panic_or_log!(
1881                "frontiers update for an unknown replica collection \
1882                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1883            );
1884            return;
1885        };
1886
1887        if let Some(new_frontier) = frontiers.input_frontier {
1888            replica_collection.update_input_frontier(new_frontier.clone());
1889        }
1890        if let Some(new_frontier) = frontiers.output_frontier {
1891            replica_collection.update_output_frontier(new_frontier.clone());
1892        }
1893        if let Some(new_frontier) = frontiers.write_frontier {
1894            replica_collection.update_write_frontier(new_frontier.clone());
1895            self.maybe_update_global_write_frontier(id, new_frontier);
1896        }
1897    }
1898
1899    #[mz_ore::instrument(level = "debug")]
1900    fn handle_peek_response(
1901        &mut self,
1902        uuid: Uuid,
1903        response: PeekResponse,
1904        otel_ctx: OpenTelemetryContext,
1905        replica_id: ReplicaId,
1906    ) {
1907        otel_ctx.attach_as_parent();
1908
1909        // We might not be tracking this peek anymore, because we have served a response already or
1910        // because it was canceled. If this is the case, we ignore the response.
1911        let Some(peek) = self.peeks.get(&uuid) else {
1912            return;
1913        };
1914
1915        // If the peek is targeting a replica, ignore responses from other replicas.
1916        let target_replica = peek.target_replica.unwrap_or(replica_id);
1917        if target_replica != replica_id {
1918            return;
1919        }
1920
1921        let duration = peek.requested_at.elapsed();
1922        self.metrics.observe_peek_response(&response, duration);
1923
1924        let notification = PeekNotification::new(&response, peek.offset, peek.limit);
1925        // NOTE: We use the `otel_ctx` from the response, not the pending peek, because we
1926        // currently want the parent to be whatever the compute worker did with this peek.
1927        self.deliver_response(ComputeControllerResponse::PeekNotification(
1928            uuid,
1929            notification,
1930            otel_ctx,
1931        ));
1932
1933        self.finish_peek(uuid, response)
1934    }
1935
1936    fn handle_copy_to_response(
1937        &mut self,
1938        sink_id: GlobalId,
1939        response: CopyToResponse,
1940        replica_id: ReplicaId,
1941    ) {
1942        if !self.collections.contains_key(&sink_id) {
1943            soft_panic_or_log!(
1944                "received response for an unknown copy-to \
1945                 (sink_id={sink_id}, replica_id={replica_id})",
1946            );
1947            return;
1948        }
1949        let Some(replica) = self.replicas.get_mut(&replica_id) else {
1950            soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
1951            return;
1952        };
1953        let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
1954            soft_panic_or_log!(
1955                "copy-to response for an unknown replica collection \
1956                 (sink_id={sink_id}, replica_id={replica_id})"
1957            );
1958            return;
1959        };
1960
1961        // Downgrade the replica frontiers, to enable dropping of input read holds and clean up of
1962        // collection state.
1963        // TODO(database-issues#4701): report copy-to frontiers through `Frontiers` responses
1964        replica_collection.update_write_frontier(Antichain::new());
1965        replica_collection.update_input_frontier(Antichain::new());
1966        replica_collection.update_output_frontier(Antichain::new());
1967
1968        // We might not be tracking this COPY TO because we have already returned a response
1969        // from one of the replicas. In that case, we ignore the response.
1970        if !self.copy_tos.remove(&sink_id) {
1971            return;
1972        }
1973
1974        let result = match response {
1975            CopyToResponse::RowCount(count) => Ok(count),
1976            CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
1977            // We should never get here: Replicas only drop copy to collections in response
1978            // to the controller allowing them to do so, and when the controller drops a
1979            // copy to it also removes it from the list of tracked copy_tos (see
1980            // [`Instance::drop_collections`]).
1981            CopyToResponse::Dropped => {
1982                tracing::error!(
1983                    %sink_id, %replica_id,
1984                    "received `Dropped` response for a tracked copy to",
1985                );
1986                return;
1987            }
1988        };
1989
1990        self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
1991    }
1992
1993    fn handle_subscribe_response(
1994        &mut self,
1995        subscribe_id: GlobalId,
1996        response: SubscribeResponse<T>,
1997        replica_id: ReplicaId,
1998    ) {
1999        if !self.collections.contains_key(&subscribe_id) {
2000            soft_panic_or_log!(
2001                "received response for an unknown subscribe \
2002                 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2003            );
2004            return;
2005        }
2006        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2007            soft_panic_or_log!(
2008                "subscribe response for an unknown replica (replica_id={replica_id})"
2009            );
2010            return;
2011        };
2012        let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2013            soft_panic_or_log!(
2014                "subscribe response for an unknown replica collection \
2015                 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2016            );
2017            return;
2018        };
2019
2020        // Always apply replica write frontier updates. Even if the subscribe is not tracked
2021        // anymore, there might still be replicas reading from its inputs, so we need to track the
2022        // frontiers until all replicas have advanced to the empty one.
2023        let write_frontier = match &response {
2024            SubscribeResponse::Batch(batch) => batch.upper.clone(),
2025            SubscribeResponse::DroppedAt(_) => Antichain::new(),
2026        };
2027
2028        // For subscribes we downgrade all replica frontiers based on write frontiers. This should
2029        // be fine because the input and output frontier of a subscribe track its write frontier.
2030        // TODO(database-issues#4701): report subscribe frontiers through `Frontiers` responses
2031        replica_collection.update_write_frontier(write_frontier.clone());
2032        replica_collection.update_input_frontier(write_frontier.clone());
2033        replica_collection.update_output_frontier(write_frontier.clone());
2034
2035        // If the subscribe is not tracked, or targets a different replica, there is nothing to do.
2036        let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2037            return;
2038        };
2039        let replica_targeted = subscribe.target_replica.unwrap_or(replica_id) == replica_id;
2040        if !replica_targeted {
2041            return;
2042        }
2043
2044        // Apply a global frontier update.
2045        // If this is a replica-targeted subscribe, it is important that we advance the global
2046        // frontier only based on responses from the targeted replica. Otherwise, another replica
2047        // could advance to the empty frontier, making us drop the subscribe on the targeted
2048        // replica prematurely.
2049        self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2050
2051        match response {
2052            SubscribeResponse::Batch(batch) => {
2053                let upper = batch.upper;
2054                let mut updates = batch.updates;
2055
2056                // If this batch advances the subscribe's frontier, we emit all updates at times
2057                // greater or equal to the last frontier (to avoid emitting duplicate updates).
2058                if PartialOrder::less_than(&subscribe.frontier, &upper) {
2059                    let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2060
2061                    if upper.is_empty() {
2062                        // This subscribe cannot produce more data. Stop tracking it.
2063                        self.subscribes.remove(&subscribe_id);
2064                    } else {
2065                        // This subscribe can produce more data. Update our tracking of it.
2066                        self.subscribes.insert(subscribe_id, subscribe);
2067                    }
2068
2069                    if let Ok(updates) = updates.as_mut() {
2070                        updates.retain(|(time, _data, _diff)| lower.less_equal(time));
2071                    }
2072                    self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2073                        subscribe_id,
2074                        SubscribeBatch {
2075                            lower,
2076                            upper,
2077                            updates,
2078                        },
2079                    ));
2080                }
2081            }
2082            SubscribeResponse::DroppedAt(frontier) => {
2083                // We should never get here: Replicas only drop subscribe collections in response
2084                // to the controller allowing them to do so, and when the controller drops a
2085                // subscribe it also removes it from the list of tracked subscribes (see
2086                // [`Instance::drop_collections`]).
2087                tracing::error!(
2088                    %subscribe_id,
2089                    %replica_id,
2090                    frontier = ?frontier.elements(),
2091                    "received `DroppedAt` response for a tracked subscribe",
2092                );
2093                self.subscribes.remove(&subscribe_id);
2094            }
2095        }
2096    }
2097
2098    fn handle_status_response(&self, response: StatusResponse, _replica_id: ReplicaId) {
2099        match response {
2100            StatusResponse::Placeholder => {}
2101        }
2102    }
2103
2104    /// Return the write frontiers of the dependencies of the given collection.
2105    fn dependency_write_frontiers<'b>(
2106        &'b self,
2107        collection: &'b CollectionState<T>,
2108    ) -> impl Iterator<Item = Antichain<T>> + 'b {
2109        let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2110            let collection = self.collections.get(&dep_id);
2111            collection.map(|c| c.write_frontier())
2112        });
2113        let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2114            let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2115            frontiers.map(|f| f.write_frontier)
2116        });
2117
2118        compute_frontiers.chain(storage_frontiers)
2119    }
2120
2121    /// Return the write frontiers of transitive storage dependencies of the given collection.
2122    fn transitive_storage_dependency_write_frontiers<'b>(
2123        &'b self,
2124        collection: &'b CollectionState<T>,
2125    ) -> impl Iterator<Item = Antichain<T>> + 'b {
2126        let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2127        let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2128        let mut done = BTreeSet::new();
2129
2130        while let Some(id) = todo.pop() {
2131            if done.contains(&id) {
2132                continue;
2133            }
2134            if let Some(dep) = self.collections.get(&id) {
2135                storage_ids.extend(dep.storage_dependency_ids());
2136                todo.extend(dep.compute_dependency_ids())
2137            }
2138            done.insert(id);
2139        }
2140
2141        let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2142            let frontiers = self.storage_collections.collection_frontiers(id).ok();
2143            frontiers.map(|f| f.write_frontier)
2144        });
2145
2146        storage_frontiers
2147    }
2148
2149    /// Downgrade the warmup capabilities of collections as much as possible.
2150    ///
2151    /// The only requirement we have for a collection's warmup capability is that it is for a time
2152    /// that is available in all of the collection's inputs. For each input the latest time that is
2153    /// the case for is `write_frontier - 1`. So the farthest we can downgrade a collection's
2154    /// warmup capability is the minimum of `write_frontier - 1` of all its inputs.
2155    ///
2156    /// This method expects to be periodically called as part of instance maintenance work.
2157    /// We would like to instead update the warmup capabilities synchronously in response to
2158    /// frontier updates of dependency collections, but that is not generally possible because we
2159    /// don't learn about frontier updates of storage collections synchronously. We could do
2160    /// synchronous updates for compute dependencies, but we refrain from doing for simplicity.
2161    fn downgrade_warmup_capabilities(&mut self) {
2162        let mut new_capabilities = BTreeMap::new();
2163        for (id, collection) in &self.collections {
2164            // For write-only collections that have advanced to the empty frontier, we can drop the
2165            // warmup capability entirely. There is no reason why we would need to hydrate those
2166            // collections again, so being able to warm them up is not useful.
2167            if collection.read_policy.is_none()
2168                && collection.shared.lock_write_frontier(|f| f.is_empty())
2169            {
2170                new_capabilities.insert(*id, Antichain::new());
2171                continue;
2172            }
2173
2174            let mut new_capability = Antichain::new();
2175            for frontier in self.dependency_write_frontiers(collection) {
2176                for time in frontier {
2177                    new_capability.insert(time.step_back().unwrap_or(time));
2178                }
2179            }
2180
2181            new_capabilities.insert(*id, new_capability);
2182        }
2183
2184        for (id, new_capability) in new_capabilities {
2185            let collection = self.expect_collection_mut(id);
2186            let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2187        }
2188    }
2189
2190    /// Forward the implied capabilities of collections, if possible.
2191    ///
2192    /// The implied capability of a collection controls (a) which times are still readable (for
2193    /// indexes) and (b) with which as-of the collection gets installed on a new replica. We are
2194    /// usually not allowed to advance an implied capability beyond the frontier that follows from
2195    /// the collection's read policy applied to its write frontier:
2196    ///
2197    ///  * For sink collections, some external consumer might rely on seeing all distinct times in
2198    ///    the input reflected in the output. If we'd forward the implied capability of a sink,
2199    ///    we'd risk skipping times in the output across replica restarts.
2200    ///  * For index collections, we might make the index unreadable by advancing its read frontier
2201    ///    beyond its write frontier.
2202    ///
2203    /// There is one case where forwarding an implied capability is fine though: an index installed
2204    /// on a cluster that has no replicas. Such indexes are not readable anyway until a new replica
2205    /// is added, so advancing its read frontier can't make it unreadable. We can thus advance the
2206    /// implied capability as long as we make sure that when a new replica is added, the expected
2207    /// relationship between write frontier, read policy, and implied capability can be restored
2208    /// immediately (modulo computation time).
2209    ///
2210    /// Forwarding implied capabilities is not necessary for the correct functioning of the
2211    /// controller but an optimization that is beneficial in two ways:
2212    ///
2213    ///  * It relaxes read holds on inputs to forwarded collections, allowing their compaction.
2214    ///  * It reduces the amount of historical detail new replicas need to process when computing
2215    ///    forwarded collections, as forwarding the implied capability also forwards the corresponding
2216    ///    dataflow as-of.
2217    fn forward_implied_capabilities(&mut self) {
2218        if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2219            return;
2220        }
2221        if !self.replicas.is_empty() {
2222            return;
2223        }
2224
2225        let mut new_capabilities = BTreeMap::new();
2226        for (id, collection) in &self.collections {
2227            let Some(read_policy) = &collection.read_policy else {
2228                // Collection is write-only, i.e. a sink.
2229                continue;
2230            };
2231
2232            // When a new replica is started, it will immediately be able to compute all collection
2233            // output up to the write frontier of its transitive storage inputs. So the new implied
2234            // read capability should be the read policy applied to that frontier.
2235            let mut dep_frontier = Antichain::new();
2236            for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2237                dep_frontier.extend(frontier);
2238            }
2239
2240            let new_capability = read_policy.frontier(dep_frontier.borrow());
2241            if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2242                new_capabilities.insert(*id, new_capability);
2243            }
2244        }
2245
2246        for (id, new_capability) in new_capabilities {
2247            let collection = self.expect_collection_mut(id);
2248            let _ = collection.implied_read_hold.try_downgrade(new_capability);
2249        }
2250    }
2251
2252    /// Acquires a `ReadHold` for the identified compute collection.
2253    ///
2254    /// This mirrors the logic used by the controller-side `InstanceState::acquire_read_hold`,
2255    /// but executes on the instance task itself.
2256    pub(super) fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
2257        // Similarly to InstanceState::acquire_read_hold and StorageCollections::acquire_read_holds,
2258        // we acquire read holds at the earliest possible time rather than returning a copy
2259        // of the implied read hold. This is so that dependents can acquire read holds on
2260        // compute dependencies at frontiers that are held back by other read holds the caller
2261        // has previously taken.
2262        let collection = self.collection(id)?;
2263        let since = collection.shared.lock_read_capabilities(|caps| {
2264            let since = caps.frontier().to_owned();
2265            caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
2266            since
2267        });
2268        let hold = ReadHold::new(id, since, Arc::clone(&self.read_hold_tx));
2269        Ok(hold)
2270    }
2271
2272    /// Process pending maintenance work.
2273    ///
2274    /// This method is invoked periodically by the global controller.
2275    /// It is a good place to perform maintenance work that arises from various controller state
2276    /// changes and that cannot conveniently be handled synchronously with those state changes.
2277    #[mz_ore::instrument(level = "debug")]
2278    pub fn maintain(&mut self) {
2279        self.rehydrate_failed_replicas();
2280        self.downgrade_warmup_capabilities();
2281        self.forward_implied_capabilities();
2282        self.schedule_collections();
2283        self.cleanup_collections();
2284        self.update_frontier_introspection();
2285        self.refresh_state_metrics();
2286        self.refresh_wallclock_lag();
2287    }
2288}
2289
2290/// State maintained about individual compute collections.
2291///
2292/// A compute collection is either an index, or a storage sink, or a subscribe, exported by a
2293/// compute dataflow.
2294#[derive(Debug)]
2295struct CollectionState<T: ComputeControllerTimestamp> {
2296    /// Whether this collection is a log collection.
2297    ///
2298    /// Log collections are special in that they are only maintained by a subset of all replicas.
2299    log_collection: bool,
2300    /// Whether this collection has been dropped by a controller client.
2301    ///
2302    /// The controller is allowed to remove the `CollectionState` for a collection only when
2303    /// `dropped == true`. Otherwise, clients might still expect to be able to query information
2304    /// about this collection.
2305    dropped: bool,
2306    /// Whether this collection has been scheduled, i.e., the controller has sent a `Schedule`
2307    /// command for it.
2308    scheduled: bool,
2309
2310    /// Whether this collection is in read-only mode.
2311    ///
2312    /// When in read-only mode, the dataflow is not allowed to affect external state (largely persist).
2313    read_only: bool,
2314
2315    /// State shared with the `ComputeController`.
2316    shared: SharedCollectionState<T>,
2317
2318    /// A read hold maintaining the implicit capability of the collection.
2319    ///
2320    /// This capability is kept to ensure that the collection remains readable according to its
2321    /// `read_policy`. It also ensures that read holds on the collection's dependencies are kept at
2322    /// some time not greater than the collection's `write_frontier`, guaranteeing that the
2323    /// collection's next outputs can always be computed without skipping times.
2324    implied_read_hold: ReadHold<T>,
2325    /// A read hold held to enable dataflow warmup.
2326    ///
2327    /// Dataflow warmup is an optimization that allows dataflows to immediately start hydrating
2328    /// even when their next output time (as implied by the `write_frontier`) is in the future.
2329    /// By installing a read capability derived from the write frontiers of the collection's
2330    /// inputs, we ensure that the as-of of new dataflows installed for the collection is at a time
2331    /// that is immediately available, so hydration can begin immediately too.
2332    warmup_read_hold: ReadHold<T>,
2333    /// The policy to use to downgrade `self.implied_read_hold`.
2334    ///
2335    /// If `None`, the collection is a write-only collection (i.e. a sink). For write-only
2336    /// collections, the `implied_read_hold` is only required for maintaining read holds on the
2337    /// inputs, so we can immediately downgrade it to the `write_frontier`.
2338    read_policy: Option<ReadPolicy<T>>,
2339
2340    /// Storage identifiers on which this collection depends, and read holds this collection
2341    /// requires on them.
2342    storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2343    /// Compute identifiers on which this collection depends, and read holds this collection
2344    /// requires on them.
2345    compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2346
2347    /// Introspection state associated with this collection.
2348    introspection: CollectionIntrospection<T>,
2349
2350    /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
2351    /// introspection update.
2352    ///
2353    /// Keys are `(period, lag, labels)` triples, values are counts.
2354    ///
2355    /// If this is `None`, wallclock lag is not tracked for this collection.
2356    wallclock_lag_histogram_stash: Option<
2357        BTreeMap<
2358            (
2359                WallclockLagHistogramPeriod,
2360                WallclockLag,
2361                BTreeMap<&'static str, String>,
2362            ),
2363            Diff,
2364        >,
2365    >,
2366}
2367
2368impl<T: ComputeControllerTimestamp> CollectionState<T> {
2369    /// Creates a new collection state, with an initial read policy valid from `since`.
2370    fn new(
2371        collection_id: GlobalId,
2372        as_of: Antichain<T>,
2373        shared: SharedCollectionState<T>,
2374        storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2375        compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2376        read_hold_tx: read_holds::ChangeTx<T>,
2377        introspection: CollectionIntrospection<T>,
2378    ) -> Self {
2379        // A collection is not readable before the `as_of`.
2380        let since = as_of.clone();
2381        // A collection won't produce updates for times before the `as_of`.
2382        let upper = as_of;
2383
2384        // Ensure that the provided `shared` is valid for the given `as_of`.
2385        assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2386        assert!(shared.lock_write_frontier(|f| f == &upper));
2387
2388        // Initialize collection read holds.
2389        // Note that the implied read hold was already added to the `read_capabilities` when
2390        // `shared` was created, so we only need to add the warmup read hold here.
2391        let implied_read_hold =
2392            ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2393        let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2394
2395        let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2396        shared.lock_read_capabilities(|c| {
2397            c.update_iter(updates);
2398        });
2399
2400        // In an effort to keep the produced wallclock lag introspection data small and
2401        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
2402        // select indexes and subscribes.
2403        let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2404            true => None,
2405            false => Some(Default::default()),
2406        };
2407
2408        Self {
2409            log_collection: false,
2410            dropped: false,
2411            scheduled: false,
2412            read_only: true,
2413            shared,
2414            implied_read_hold,
2415            warmup_read_hold,
2416            read_policy: Some(ReadPolicy::ValidFrom(since)),
2417            storage_dependencies,
2418            compute_dependencies,
2419            introspection,
2420            wallclock_lag_histogram_stash,
2421        }
2422    }
2423
2424    /// Creates a new collection state for a log collection.
2425    fn new_log_collection(
2426        id: GlobalId,
2427        shared: SharedCollectionState<T>,
2428        read_hold_tx: read_holds::ChangeTx<T>,
2429        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2430    ) -> Self {
2431        let since = Antichain::from_elem(T::minimum());
2432        let introspection =
2433            CollectionIntrospection::new(id, introspection_tx, since.clone(), false, None, None);
2434        let mut state = Self::new(
2435            id,
2436            since,
2437            shared,
2438            Default::default(),
2439            Default::default(),
2440            read_hold_tx,
2441            introspection,
2442        );
2443        state.log_collection = true;
2444        // Log collections are created and scheduled implicitly as part of replica initialization.
2445        state.scheduled = true;
2446        state
2447    }
2448
2449    /// Reports the current read frontier.
2450    fn read_frontier(&self) -> Antichain<T> {
2451        self.shared
2452            .lock_read_capabilities(|c| c.frontier().to_owned())
2453    }
2454
2455    /// Reports the current write frontier.
2456    fn write_frontier(&self) -> Antichain<T> {
2457        self.shared.lock_write_frontier(|f| f.clone())
2458    }
2459
2460    fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2461        self.storage_dependencies.keys().copied()
2462    }
2463
2464    fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2465        self.compute_dependencies.keys().copied()
2466    }
2467
2468    /// Reports the IDs of the dependencies of this collection.
2469    fn dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2470        self.compute_dependency_ids()
2471            .chain(self.storage_dependency_ids())
2472    }
2473}
2474
2475/// Collection state shared with the `ComputeController`.
2476///
2477/// Having this allows certain controller APIs, such as `ComputeController::collection_frontiers`
2478/// and `ComputeController::acquire_read_hold` to be non-`async`. This comes at the cost of
2479/// complexity (by introducing shared mutable state) and performance (by introducing locking). We
2480/// should aim to reduce the amount of shared state over time, rather than expand it.
2481///
2482/// Note that [`SharedCollectionState`]s are initialized by the `ComputeController` prior to the
2483/// collection's creation in the [`Instance`]. This is to allow compute clients to query frontiers
2484/// and take new read holds immediately, without having to wait for the [`Instance`] to update.
2485#[derive(Clone, Debug)]
2486pub(super) struct SharedCollectionState<T> {
2487    /// Accumulation of read capabilities for the collection.
2488    ///
2489    /// This accumulation contains the capabilities held by all [`ReadHold`]s given out for the
2490    /// collection, including `implied_read_hold` and `warmup_read_hold`.
2491    ///
2492    /// NOTE: This field may only be modified by [`Instance::apply_read_hold_change`],
2493    /// [`Instance::acquire_read_hold`], and `ComputeController::acquire_read_hold`.
2494    /// Nobody else should modify read capabilities directly. Instead, collection users should
2495    /// manage read holds through [`ReadHold`] objects acquired through
2496    /// `ComputeController::acquire_read_hold`.
2497    ///
2498    /// TODO(teskje): Restructure the code to enforce the above in the type system.
2499    read_capabilities: Arc<Mutex<MutableAntichain<T>>>,
2500    /// The write frontier of this collection.
2501    write_frontier: Arc<Mutex<Antichain<T>>>,
2502}
2503
2504impl<T: Timestamp> SharedCollectionState<T> {
2505    pub fn new(as_of: Antichain<T>) -> Self {
2506        // A collection is not readable before the `as_of`.
2507        let since = as_of.clone();
2508        // A collection won't produce updates for times before the `as_of`.
2509        let upper = as_of;
2510
2511        // Initialize read capabilities to the `since`.
2512        // The is the implied read capability. The corresponding [`ReadHold`] is created in
2513        // [`CollectionState::new`].
2514        let mut read_capabilities = MutableAntichain::new();
2515        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2516
2517        Self {
2518            read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2519            write_frontier: Arc::new(Mutex::new(upper)),
2520        }
2521    }
2522
2523    pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2524    where
2525        F: FnOnce(&mut MutableAntichain<T>) -> R,
2526    {
2527        let mut caps = self.read_capabilities.lock().expect("poisoned");
2528        f(&mut *caps)
2529    }
2530
2531    pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2532    where
2533        F: FnOnce(&mut Antichain<T>) -> R,
2534    {
2535        let mut frontier = self.write_frontier.lock().expect("poisoned");
2536        f(&mut *frontier)
2537    }
2538}
2539
2540/// Manages certain introspection relations associated with a collection. Upon creation, it adds
2541/// rows to introspection relations. When dropped, it retracts its managed rows.
2542///
2543/// TODO: `ComputeDependencies` could be moved under this.
2544#[derive(Debug)]
2545struct CollectionIntrospection<T: ComputeControllerTimestamp> {
2546    /// The ID of the compute collection.
2547    collection_id: GlobalId,
2548    /// A channel through which introspection updates are delivered.
2549    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2550    /// Introspection state for `IntrospectionType::Frontiers`.
2551    ///
2552    /// `Some` if the collection does _not_ sink into a storage collection (i.e. is not an MV). If
2553    /// the collection sinks into storage, the storage controller reports its frontiers instead.
2554    frontiers: Option<FrontiersIntrospectionState<T>>,
2555    /// Introspection state for `IntrospectionType::ComputeMaterializedViewRefreshes`.
2556    ///
2557    /// `Some` if the collection is a REFRESH MV.
2558    refresh: Option<RefreshIntrospectionState<T>>,
2559}
2560
2561impl<T: ComputeControllerTimestamp> CollectionIntrospection<T> {
2562    fn new(
2563        collection_id: GlobalId,
2564        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2565        as_of: Antichain<T>,
2566        storage_sink: bool,
2567        initial_as_of: Option<Antichain<T>>,
2568        refresh_schedule: Option<RefreshSchedule>,
2569    ) -> Self {
2570        let refresh =
2571            match (refresh_schedule, initial_as_of) {
2572                (Some(refresh_schedule), Some(initial_as_of)) => Some(
2573                    RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2574                ),
2575                (refresh_schedule, _) => {
2576                    // If we have a `refresh_schedule`, then the collection is a MV, so we should also have
2577                    // an `initial_as_of`.
2578                    soft_assert_or_log!(
2579                        refresh_schedule.is_none(),
2580                        "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2581                    );
2582                    None
2583                }
2584            };
2585        let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2586
2587        let self_ = Self {
2588            collection_id,
2589            introspection_tx,
2590            frontiers,
2591            refresh,
2592        };
2593
2594        self_.report_initial_state();
2595        self_
2596    }
2597
2598    /// Reports the initial introspection state.
2599    fn report_initial_state(&self) {
2600        if let Some(frontiers) = &self.frontiers {
2601            let row = frontiers.row_for_collection(self.collection_id);
2602            let updates = vec![(row, Diff::ONE)];
2603            self.send(IntrospectionType::Frontiers, updates);
2604        }
2605
2606        if let Some(refresh) = &self.refresh {
2607            let row = refresh.row_for_collection(self.collection_id);
2608            let updates = vec![(row, Diff::ONE)];
2609            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2610        }
2611    }
2612
2613    /// Observe the given current collection frontiers and update the introspection state as
2614    /// necessary.
2615    fn observe_frontiers(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2616        self.update_frontier_introspection(read_frontier, write_frontier);
2617        self.update_refresh_introspection(write_frontier);
2618    }
2619
2620    fn update_frontier_introspection(
2621        &mut self,
2622        read_frontier: &Antichain<T>,
2623        write_frontier: &Antichain<T>,
2624    ) {
2625        let Some(frontiers) = &mut self.frontiers else {
2626            return;
2627        };
2628
2629        if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2630        {
2631            return; // no change
2632        };
2633
2634        let retraction = frontiers.row_for_collection(self.collection_id);
2635        frontiers.update(read_frontier, write_frontier);
2636        let insertion = frontiers.row_for_collection(self.collection_id);
2637        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2638        self.send(IntrospectionType::Frontiers, updates);
2639    }
2640
2641    fn update_refresh_introspection(&mut self, write_frontier: &Antichain<T>) {
2642        let Some(refresh) = &mut self.refresh else {
2643            return;
2644        };
2645
2646        let retraction = refresh.row_for_collection(self.collection_id);
2647        refresh.frontier_update(write_frontier);
2648        let insertion = refresh.row_for_collection(self.collection_id);
2649
2650        if retraction == insertion {
2651            return; // no change
2652        }
2653
2654        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2655        self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2656    }
2657
2658    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2659        // Failure to send means the `ComputeController` has been dropped and doesn't care about
2660        // introspection updates anymore.
2661        let _ = self.introspection_tx.send((introspection_type, updates));
2662    }
2663}
2664
2665impl<T: ComputeControllerTimestamp> Drop for CollectionIntrospection<T> {
2666    fn drop(&mut self) {
2667        // Retract collection frontiers.
2668        if let Some(frontiers) = &self.frontiers {
2669            let row = frontiers.row_for_collection(self.collection_id);
2670            let updates = vec![(row, Diff::MINUS_ONE)];
2671            self.send(IntrospectionType::Frontiers, updates);
2672        }
2673
2674        // Retract MV refresh state.
2675        if let Some(refresh) = &self.refresh {
2676            let retraction = refresh.row_for_collection(self.collection_id);
2677            let updates = vec![(retraction, Diff::MINUS_ONE)];
2678            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2679        }
2680    }
2681}
2682
2683#[derive(Debug)]
2684struct FrontiersIntrospectionState<T> {
2685    read_frontier: Antichain<T>,
2686    write_frontier: Antichain<T>,
2687}
2688
2689impl<T: ComputeControllerTimestamp> FrontiersIntrospectionState<T> {
2690    fn new(as_of: Antichain<T>) -> Self {
2691        Self {
2692            read_frontier: as_of.clone(),
2693            write_frontier: as_of,
2694        }
2695    }
2696
2697    /// Return a `Row` reflecting the current collection frontiers.
2698    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2699        let read_frontier = self
2700            .read_frontier
2701            .as_option()
2702            .map_or(Datum::Null, |ts| ts.clone().into());
2703        let write_frontier = self
2704            .write_frontier
2705            .as_option()
2706            .map_or(Datum::Null, |ts| ts.clone().into());
2707        Row::pack_slice(&[
2708            Datum::String(&collection_id.to_string()),
2709            read_frontier,
2710            write_frontier,
2711        ])
2712    }
2713
2714    /// Update the introspection state with the given new frontiers.
2715    fn update(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2716        if read_frontier != &self.read_frontier {
2717            self.read_frontier.clone_from(read_frontier);
2718        }
2719        if write_frontier != &self.write_frontier {
2720            self.write_frontier.clone_from(write_frontier);
2721        }
2722    }
2723}
2724
2725/// Information needed to compute introspection updates for a REFRESH materialized view when the
2726/// write frontier advances.
2727#[derive(Debug)]
2728struct RefreshIntrospectionState<T> {
2729    // Immutable properties of the MV
2730    refresh_schedule: RefreshSchedule,
2731    initial_as_of: Antichain<T>,
2732    // Refresh state
2733    next_refresh: Datum<'static>,           // Null or an MzTimestamp
2734    last_completed_refresh: Datum<'static>, // Null or an MzTimestamp
2735}
2736
2737impl<T> RefreshIntrospectionState<T> {
2738    /// Return a `Row` reflecting the current refresh introspection state.
2739    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2740        Row::pack_slice(&[
2741            Datum::String(&collection_id.to_string()),
2742            self.last_completed_refresh,
2743            self.next_refresh,
2744        ])
2745    }
2746}
2747
2748impl<T: ComputeControllerTimestamp> RefreshIntrospectionState<T> {
2749    /// Construct a new [`RefreshIntrospectionState`], and apply an initial `frontier_update()` at
2750    /// the `upper`.
2751    fn new(
2752        refresh_schedule: RefreshSchedule,
2753        initial_as_of: Antichain<T>,
2754        upper: &Antichain<T>,
2755    ) -> Self {
2756        let mut self_ = Self {
2757            refresh_schedule: refresh_schedule.clone(),
2758            initial_as_of: initial_as_of.clone(),
2759            next_refresh: Datum::Null,
2760            last_completed_refresh: Datum::Null,
2761        };
2762        self_.frontier_update(upper);
2763        self_
2764    }
2765
2766    /// Should be called whenever the write frontier of the collection advances. It updates the
2767    /// state that should be recorded in introspection relations, but doesn't send the updates yet.
2768    fn frontier_update(&mut self, write_frontier: &Antichain<T>) {
2769        if write_frontier.is_empty() {
2770            self.last_completed_refresh =
2771                if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2772                    last_refresh.into()
2773                } else {
2774                    // If there is no last refresh, then we have a `REFRESH EVERY`, in which case
2775                    // the saturating roundup puts a refresh at the maximum possible timestamp.
2776                    T::maximum().into()
2777                };
2778            self.next_refresh = Datum::Null;
2779        } else {
2780            if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2781                // We are before the first refresh.
2782                self.last_completed_refresh = Datum::Null;
2783                let initial_as_of = self.initial_as_of.as_option().expect(
2784                    "initial_as_of can't be [], because then there would be no refreshes at all",
2785                );
2786                let first_refresh = initial_as_of
2787                    .round_up(&self.refresh_schedule)
2788                    .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2789                soft_assert_or_log!(
2790                    first_refresh == *initial_as_of,
2791                    "initial_as_of should be set to the first refresh"
2792                );
2793                self.next_refresh = first_refresh.into();
2794            } else {
2795                // The first refresh has already happened.
2796                let write_frontier = write_frontier.as_option().expect("checked above");
2797                self.last_completed_refresh = write_frontier
2798                    .round_down_minus_1(&self.refresh_schedule)
2799                    .map_or_else(
2800                        || {
2801                            soft_panic_or_log!(
2802                                "rounding down should have returned the first refresh or later"
2803                            );
2804                            Datum::Null
2805                        },
2806                        |last_completed_refresh| last_completed_refresh.into(),
2807                    );
2808                self.next_refresh = write_frontier.clone().into();
2809            }
2810        }
2811    }
2812}
2813
2814/// A note of an outstanding peek response.
2815#[derive(Debug)]
2816struct PendingPeek<T: Timestamp> {
2817    /// For replica-targeted peeks, this specifies the replica whose response we should pass on.
2818    ///
2819    /// If this value is `None`, we pass on the first response.
2820    target_replica: Option<ReplicaId>,
2821    /// The OpenTelemetry context for this peek.
2822    otel_ctx: OpenTelemetryContext,
2823    /// The time at which the peek was requested.
2824    ///
2825    /// Used to track peek durations.
2826    requested_at: Instant,
2827    /// The read hold installed to serve this peek.
2828    read_hold: ReadHold<T>,
2829    /// The channel to send peek results.
2830    peek_response_tx: oneshot::Sender<PeekResponse>,
2831    /// An optional limit of the peek's result size.
2832    limit: Option<usize>,
2833    /// The offset into the peek's result.
2834    offset: usize,
2835}
2836
2837#[derive(Debug, Clone)]
2838struct ActiveSubscribe<T> {
2839    /// Current upper frontier of this subscribe.
2840    frontier: Antichain<T>,
2841    /// For replica-targeted subscribes, this specifies the replica whose responses we should pass on.
2842    ///
2843    /// If this value is `None`, we pass on the first response for each time slice.
2844    target_replica: Option<ReplicaId>,
2845}
2846
2847impl<T: ComputeControllerTimestamp> ActiveSubscribe<T> {
2848    fn new(target_replica: Option<ReplicaId>) -> Self {
2849        Self {
2850            frontier: Antichain::from_elem(T::minimum()),
2851            target_replica,
2852        }
2853    }
2854}
2855
2856/// State maintained about individual replicas.
2857#[derive(Debug)]
2858struct ReplicaState<T: ComputeControllerTimestamp> {
2859    /// The ID of the replica.
2860    id: ReplicaId,
2861    /// Client for the running replica task.
2862    client: ReplicaClient<T>,
2863    /// The replica configuration.
2864    config: ReplicaConfig,
2865    /// Replica metrics.
2866    metrics: ReplicaMetrics,
2867    /// A channel through which introspection updates are delivered.
2868    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2869    /// Per-replica collection state.
2870    collections: BTreeMap<GlobalId, ReplicaCollectionState<T>>,
2871    /// The epoch of the replica.
2872    epoch: u64,
2873}
2874
2875impl<T: ComputeControllerTimestamp> ReplicaState<T> {
2876    fn new(
2877        id: ReplicaId,
2878        client: ReplicaClient<T>,
2879        config: ReplicaConfig,
2880        metrics: ReplicaMetrics,
2881        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2882        epoch: u64,
2883    ) -> Self {
2884        Self {
2885            id,
2886            client,
2887            config,
2888            metrics,
2889            introspection_tx,
2890            epoch,
2891            collections: Default::default(),
2892        }
2893    }
2894
2895    /// Add a collection to the replica state.
2896    ///
2897    /// # Panics
2898    ///
2899    /// Panics if a collection with the same ID exists already.
2900    fn add_collection(
2901        &mut self,
2902        id: GlobalId,
2903        as_of: Antichain<T>,
2904        input_read_holds: Vec<ReadHold<T>>,
2905    ) {
2906        let metrics = self.metrics.for_collection(id);
2907        let introspection = ReplicaCollectionIntrospection::new(
2908            self.id,
2909            id,
2910            self.introspection_tx.clone(),
2911            as_of.clone(),
2912        );
2913        let mut state =
2914            ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
2915
2916        // In an effort to keep the produced wallclock lag introspection data small and
2917        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
2918        // select indexes and subscribes.
2919        if id.is_transient() {
2920            state.wallclock_lag_max = None;
2921        }
2922
2923        if let Some(previous) = self.collections.insert(id, state) {
2924            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
2925        }
2926    }
2927
2928    /// Remove state for a collection.
2929    fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState<T>> {
2930        self.collections.remove(&id)
2931    }
2932
2933    /// Returns whether all replica frontiers of the given collection are empty.
2934    fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
2935        self.collections.get(&id).map_or(true, |c| {
2936            c.write_frontier.is_empty()
2937                && c.input_frontier.is_empty()
2938                && c.output_frontier.is_empty()
2939        })
2940    }
2941
2942    /// Returns the state of the [`ReplicaState`] formatted as JSON.
2943    ///
2944    /// The returned value is not guaranteed to be stable and may change at any point in time.
2945    #[mz_ore::instrument(level = "debug")]
2946    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2947        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
2948        // returned object as a tradeoff between usability and stability. `serde_json` will fail
2949        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
2950        // prevents a future unrelated change from silently breaking this method.
2951
2952        // Destructure `self` here so we don't forget to consider dumping newly added fields.
2953        let Self {
2954            id,
2955            client: _,
2956            config: _,
2957            metrics: _,
2958            introspection_tx: _,
2959            epoch,
2960            collections,
2961        } = self;
2962
2963        let collections: BTreeMap<_, _> = collections
2964            .iter()
2965            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
2966            .collect();
2967
2968        Ok(serde_json::json!({
2969            "id": id.to_string(),
2970            "collections": collections,
2971            "epoch": epoch,
2972        }))
2973    }
2974}
2975
2976#[derive(Debug)]
2977struct ReplicaCollectionState<T: ComputeControllerTimestamp> {
2978    /// The replica write frontier of this collection.
2979    ///
2980    /// See [`FrontiersResponse::write_frontier`].
2981    write_frontier: Antichain<T>,
2982    /// The replica input frontier of this collection.
2983    ///
2984    /// See [`FrontiersResponse::input_frontier`].
2985    input_frontier: Antichain<T>,
2986    /// The replica output frontier of this collection.
2987    ///
2988    /// See [`FrontiersResponse::output_frontier`].
2989    output_frontier: Antichain<T>,
2990
2991    /// Metrics tracked for this collection.
2992    ///
2993    /// If this is `None`, no metrics are collected.
2994    metrics: Option<ReplicaCollectionMetrics>,
2995    /// As-of frontier with which this collection was installed on the replica.
2996    as_of: Antichain<T>,
2997    /// Tracks introspection state for this collection.
2998    introspection: ReplicaCollectionIntrospection<T>,
2999    /// Read holds on storage inputs to this collection.
3000    ///
3001    /// These read holds are kept to ensure that the replica is able to read from storage inputs at
3002    /// all times it hasn't read yet. We only need to install read holds for storage inputs since
3003    /// compaction of compute inputs is implicitly held back by Timely/DD.
3004    input_read_holds: Vec<ReadHold<T>>,
3005
3006    /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
3007    ///
3008    /// If this is `None`, wallclock lag is not tracked for this collection.
3009    wallclock_lag_max: Option<WallclockLag>,
3010}
3011
3012impl<T: ComputeControllerTimestamp> ReplicaCollectionState<T> {
3013    fn new(
3014        metrics: Option<ReplicaCollectionMetrics>,
3015        as_of: Antichain<T>,
3016        introspection: ReplicaCollectionIntrospection<T>,
3017        input_read_holds: Vec<ReadHold<T>>,
3018    ) -> Self {
3019        Self {
3020            write_frontier: as_of.clone(),
3021            input_frontier: as_of.clone(),
3022            output_frontier: as_of.clone(),
3023            metrics,
3024            as_of,
3025            introspection,
3026            input_read_holds,
3027            wallclock_lag_max: Some(WallclockLag::MIN),
3028        }
3029    }
3030
3031    /// Returns whether this collection is hydrated.
3032    fn hydrated(&self) -> bool {
3033        // If the observed frontier is greater than the collection's as-of, the collection has
3034        // produced some output and is therefore hydrated.
3035        //
3036        // We need to consider the edge case where the as-of is the empty frontier. Such an as-of
3037        // is not useful for indexes, because they wouldn't be readable. For write-only
3038        // collections, an empty as-of means that the collection has been fully written and no new
3039        // dataflow needs to be created for it. Consequently, no hydration will happen either.
3040        //
3041        // Based on this, we could respond in two ways:
3042        //  * `false`, as in "the dataflow was never created"
3043        //  * `true`, as in "the dataflow completed immediately"
3044        //
3045        // Since hydration is often used as a measure of dataflow progress and we don't want to
3046        // give the impression that certain dataflows are somehow stuck when they are not, we go
3047        // with the second interpretation here.
3048        self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3049    }
3050
3051    /// Updates the replica write frontier of this collection.
3052    fn update_write_frontier(&mut self, new_frontier: Antichain<T>) {
3053        if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3054            soft_panic_or_log!(
3055                "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3056                self.write_frontier,
3057            );
3058            return;
3059        } else if new_frontier == self.write_frontier {
3060            return;
3061        }
3062
3063        self.write_frontier = new_frontier;
3064    }
3065
3066    /// Updates the replica input frontier of this collection.
3067    fn update_input_frontier(&mut self, new_frontier: Antichain<T>) {
3068        if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3069            soft_panic_or_log!(
3070                "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3071                self.input_frontier,
3072            );
3073            return;
3074        } else if new_frontier == self.input_frontier {
3075            return;
3076        }
3077
3078        self.input_frontier = new_frontier;
3079
3080        // Relax our read holds on collection inputs.
3081        for read_hold in &mut self.input_read_holds {
3082            let result = read_hold.try_downgrade(self.input_frontier.clone());
3083            soft_assert_or_log!(
3084                result.is_ok(),
3085                "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3086                self.input_frontier,
3087            );
3088        }
3089    }
3090
3091    /// Updates the replica output frontier of this collection.
3092    fn update_output_frontier(&mut self, new_frontier: Antichain<T>) {
3093        if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3094            soft_panic_or_log!(
3095                "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3096                self.output_frontier,
3097            );
3098            return;
3099        } else if new_frontier == self.output_frontier {
3100            return;
3101        }
3102
3103        self.output_frontier = new_frontier;
3104    }
3105}
3106
3107/// Maintains the introspection state for a given replica and collection, and ensures that reported
3108/// introspection data is retracted when the collection is dropped.
3109#[derive(Debug)]
3110struct ReplicaCollectionIntrospection<T: ComputeControllerTimestamp> {
3111    /// The ID of the replica.
3112    replica_id: ReplicaId,
3113    /// The ID of the compute collection.
3114    collection_id: GlobalId,
3115    /// The collection's reported replica write frontier.
3116    write_frontier: Antichain<T>,
3117    /// A channel through which introspection updates are delivered.
3118    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3119}
3120
3121impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
3122    /// Create a new `HydrationState` and initialize introspection.
3123    fn new(
3124        replica_id: ReplicaId,
3125        collection_id: GlobalId,
3126        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3127        as_of: Antichain<T>,
3128    ) -> Self {
3129        let self_ = Self {
3130            replica_id,
3131            collection_id,
3132            write_frontier: as_of,
3133            introspection_tx,
3134        };
3135
3136        self_.report_initial_state();
3137        self_
3138    }
3139
3140    /// Reports the initial introspection state.
3141    fn report_initial_state(&self) {
3142        let row = self.write_frontier_row();
3143        let updates = vec![(row, Diff::ONE)];
3144        self.send(IntrospectionType::ReplicaFrontiers, updates);
3145    }
3146
3147    /// Observe the given current write frontier and update the introspection state as necessary.
3148    fn observe_frontier(&mut self, write_frontier: &Antichain<T>) {
3149        if self.write_frontier == *write_frontier {
3150            return; // no change
3151        }
3152
3153        let retraction = self.write_frontier_row();
3154        self.write_frontier.clone_from(write_frontier);
3155        let insertion = self.write_frontier_row();
3156
3157        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3158        self.send(IntrospectionType::ReplicaFrontiers, updates);
3159    }
3160
3161    /// Return a `Row` reflecting the current replica write frontier.
3162    fn write_frontier_row(&self) -> Row {
3163        let write_frontier = self
3164            .write_frontier
3165            .as_option()
3166            .map_or(Datum::Null, |ts| ts.clone().into());
3167        Row::pack_slice(&[
3168            Datum::String(&self.collection_id.to_string()),
3169            Datum::String(&self.replica_id.to_string()),
3170            write_frontier,
3171        ])
3172    }
3173
3174    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3175        // Failure to send means the `ComputeController` has been dropped and doesn't care about
3176        // introspection updates anymore.
3177        let _ = self.introspection_tx.send((introspection_type, updates));
3178    }
3179}
3180
3181impl<T: ComputeControllerTimestamp> Drop for ReplicaCollectionIntrospection<T> {
3182    fn drop(&mut self) {
3183        // Retract the write frontier.
3184        let row = self.write_frontier_row();
3185        let updates = vec![(row, Diff::MINUS_ONE)];
3186        self.send(IntrospectionType::ReplicaFrontiers, updates);
3187    }
3188}