Skip to main content

mz_compute_client/controller/
instance.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller for a compute instance.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
21use mz_compute_types::plan::render_plan::RenderPlan;
22use mz_compute_types::sinks::{
23    ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, MaterializedViewSinkConnection,
24};
25use mz_compute_types::sources::SourceInstanceDesc;
26use mz_controller_types::dyncfgs::{
27    ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, WALLCLOCK_LAG_RECORDING_INTERVAL,
28};
29use mz_dyncfg::ConfigSet;
30use mz_expr::RowSetFinishing;
31use mz_ore::cast::CastFrom;
32use mz_ore::channel::instrumented_unbounded_channel;
33use mz_ore::now::NowFn;
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{soft_assert_or_log, soft_panic_or_log};
36use mz_persist_types::PersistLocation;
37use mz_repr::adt::timestamp::CheckedTimestamp;
38use mz_repr::refresh_schedule::RefreshSchedule;
39use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row};
40use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
41use mz_storage_types::read_holds::{self, ReadHold};
42use mz_storage_types::read_policy::ReadPolicy;
43use thiserror::Error;
44use timely::PartialOrder;
45use timely::progress::frontier::MutableAntichain;
46use timely::progress::{Antichain, ChangeBatch, Timestamp};
47use tokio::sync::{mpsc, oneshot};
48use uuid::Uuid;
49
50use crate::controller::error::{
51    CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
52};
53use crate::controller::instance_client::PeekError;
54use crate::controller::replica::{ReplicaClient, ReplicaConfig};
55use crate::controller::{
56    ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, PeekNotification,
57    ReplicaId, StorageCollections,
58};
59use crate::logging::LogVariant;
60use crate::metrics::IntCounter;
61use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
62use crate::protocol::command::{
63    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
64};
65use crate::protocol::history::ComputeCommandHistory;
66use crate::protocol::response::{
67    ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StatusResponse,
68    SubscribeBatch, SubscribeResponse,
69};
70
71#[derive(Error, Debug)]
72#[error("replica exists already: {0}")]
73pub(super) struct ReplicaExists(pub ReplicaId);
74
75#[derive(Error, Debug)]
76#[error("replica does not exist: {0}")]
77pub(super) struct ReplicaMissing(pub ReplicaId);
78
79#[derive(Error, Debug)]
80pub(super) enum DataflowCreationError {
81    #[error("collection does not exist: {0}")]
82    CollectionMissing(GlobalId),
83    #[error("replica does not exist: {0}")]
84    ReplicaMissing(ReplicaId),
85    #[error("dataflow definition lacks an as_of value")]
86    MissingAsOf,
87    #[error("subscribe dataflow has an empty as_of")]
88    EmptyAsOfForSubscribe,
89    #[error("copy to dataflow has an empty as_of")]
90    EmptyAsOfForCopyTo,
91    #[error("no read hold provided for dataflow import: {0}")]
92    ReadHoldMissing(GlobalId),
93    #[error("insufficient read hold provided for dataflow import: {0}")]
94    ReadHoldInsufficient(GlobalId),
95}
96
97impl From<CollectionMissing> for DataflowCreationError {
98    fn from(error: CollectionMissing) -> Self {
99        Self::CollectionMissing(error.0)
100    }
101}
102
103#[derive(Error, Debug)]
104pub(super) enum ReadPolicyError {
105    #[error("collection does not exist: {0}")]
106    CollectionMissing(GlobalId),
107    #[error("collection is write-only: {0}")]
108    WriteOnlyCollection(GlobalId),
109}
110
111impl From<CollectionMissing> for ReadPolicyError {
112    fn from(error: CollectionMissing) -> Self {
113        Self::CollectionMissing(error.0)
114    }
115}
116
117/// A command sent to an [`Instance`] task.
118pub(super) type Command<T> = Box<dyn FnOnce(&mut Instance<T>) + Send>;
119
120/// A response from a replica, composed of a replica ID, the replica's current epoch, and the
121/// compute response itself.
122pub(super) type ReplicaResponse<T> = (ReplicaId, u64, ComputeResponse<T>);
123
124/// The state we keep for a compute instance.
125pub(super) struct Instance<T: ComputeControllerTimestamp> {
126    /// Build info for spawning replicas
127    build_info: &'static BuildInfo,
128    /// A handle providing access to storage collections.
129    storage_collections: StorageCollections<T>,
130    /// Whether instance initialization has been completed.
131    initialized: bool,
132    /// Whether this instance is in read-only mode.
133    ///
134    /// When in read-only mode, this instance will not update persistent state, such as
135    /// wallclock lag introspection.
136    read_only: bool,
137    /// The workload class of this instance.
138    ///
139    /// This is currently only used to annotate metrics.
140    workload_class: Option<String>,
141    /// The replicas of this compute instance.
142    replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
143    /// Currently installed compute collections.
144    ///
145    /// New entries are added for all collections exported from dataflows created through
146    /// [`Instance::create_dataflow`].
147    ///
148    /// Entries are removed by [`Instance::cleanup_collections`]. See that method's documentation
149    /// about the conditions for removing collection state.
150    collections: BTreeMap<GlobalId, CollectionState<T>>,
151    /// IDs of log sources maintained by this compute instance.
152    log_sources: BTreeMap<LogVariant, GlobalId>,
153    /// Currently outstanding peeks.
154    ///
155    /// New entries are added for all peeks initiated through [`Instance::peek`].
156    ///
157    /// The entry for a peek is only removed once all replicas have responded to the peek. This is
158    /// currently required to ensure all replicas have stopped reading from the peeked collection's
159    /// inputs before we allow them to compact. database-issues#4822 tracks changing this so we only have to wait
160    /// for the first peek response.
161    peeks: BTreeMap<Uuid, PendingPeek<T>>,
162    /// Currently in-progress subscribes.
163    ///
164    /// New entries are added for all subscribes exported from dataflows created through
165    /// [`Instance::create_dataflow`].
166    ///
167    /// The entry for a subscribe is removed once at least one replica has reported the subscribe
168    /// to have advanced to the empty frontier or to have been dropped, implying that no further
169    /// updates will be emitted for this subscribe.
170    ///
171    /// Note that subscribes are tracked both in `collections` and `subscribes`. `collections`
172    /// keeps track of the subscribe's upper and since frontiers and ensures appropriate read holds
173    /// on the subscribe's input. `subscribes` is only used to track which updates have been
174    /// emitted, to decide if new ones should be emitted or suppressed.
175    subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>,
176    /// Tracks all in-progress COPY TOs.
177    ///
178    /// New entries are added for all s3 oneshot sinks (corresponding to a COPY TO) exported from
179    /// dataflows created through [`Instance::create_dataflow`].
180    ///
181    /// The entry for a copy to is removed once at least one replica has finished
182    /// or the exporting collection is dropped.
183    copy_tos: BTreeSet<GlobalId>,
184    /// The command history, used when introducing new replicas or restarting existing replicas.
185    history: ComputeCommandHistory<UIntGauge, T>,
186    /// Receiver for commands to be executed.
187    command_rx: mpsc::UnboundedReceiver<Command<T>>,
188    /// Sender for responses to be delivered.
189    response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
190    /// Sender for introspection updates to be recorded.
191    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
192    /// The registry the controller uses to report metrics.
193    metrics: InstanceMetrics,
194    /// Dynamic system configuration.
195    dyncfg: Arc<ConfigSet>,
196
197    /// The persist location where we can stash large peek results.
198    peek_stash_persist_location: PersistLocation,
199
200    /// A function that produces the current wallclock time.
201    now: NowFn,
202    /// A function that computes the lag between the given time and wallclock time.
203    wallclock_lag: WallclockLagFn<T>,
204    /// The last time wallclock lag introspection was recorded.
205    wallclock_lag_last_recorded: DateTime<Utc>,
206
207    /// Sender for updates to collection read holds.
208    ///
209    /// Copies of this sender are given to [`ReadHold`]s that are created in
210    /// [`CollectionState::new`].
211    read_hold_tx: read_holds::ChangeTx<T>,
212    /// A sender for responses from replicas.
213    replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
214    /// A receiver for responses from replicas.
215    replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse<T>, IntCounter>,
216}
217
218impl<T: ComputeControllerTimestamp> Instance<T> {
219    /// Acquire a handle to the collection state associated with `id`.
220    fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, CollectionMissing> {
221        self.collections.get(&id).ok_or(CollectionMissing(id))
222    }
223
224    /// Acquire a mutable handle to the collection state associated with `id`.
225    fn collection_mut(
226        &mut self,
227        id: GlobalId,
228    ) -> Result<&mut CollectionState<T>, CollectionMissing> {
229        self.collections.get_mut(&id).ok_or(CollectionMissing(id))
230    }
231
232    /// Acquire a handle to the collection state associated with `id`.
233    ///
234    /// # Panics
235    ///
236    /// Panics if the identified collection does not exist.
237    fn expect_collection(&self, id: GlobalId) -> &CollectionState<T> {
238        self.collections.get(&id).expect("collection must exist")
239    }
240
241    /// Acquire a mutable handle to the collection state associated with `id`.
242    ///
243    /// # Panics
244    ///
245    /// Panics if the identified collection does not exist.
246    fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T> {
247        self.collections
248            .get_mut(&id)
249            .expect("collection must exist")
250    }
251
252    fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState<T>)> {
253        self.collections.iter().map(|(id, coll)| (*id, coll))
254    }
255
256    /// Add a collection to the instance state.
257    ///
258    /// # Panics
259    ///
260    /// Panics if a collection with the same ID exists already.
261    fn add_collection(
262        &mut self,
263        id: GlobalId,
264        as_of: Antichain<T>,
265        shared: SharedCollectionState<T>,
266        storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
267        compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
268        replica_input_read_holds: Vec<ReadHold<T>>,
269        write_only: bool,
270        storage_sink: bool,
271        initial_as_of: Option<Antichain<T>>,
272        refresh_schedule: Option<RefreshSchedule>,
273        target_replica: Option<ReplicaId>,
274    ) {
275        // Add global collection state.
276        let dependency_ids: Vec<GlobalId> = compute_dependencies
277            .keys()
278            .chain(storage_dependencies.keys())
279            .copied()
280            .collect();
281        let introspection = CollectionIntrospection::new(
282            id,
283            self.introspection_tx.clone(),
284            as_of.clone(),
285            storage_sink,
286            initial_as_of,
287            refresh_schedule,
288            dependency_ids,
289        );
290        let mut state = CollectionState::new(
291            id,
292            as_of.clone(),
293            shared,
294            storage_dependencies,
295            compute_dependencies,
296            Arc::clone(&self.read_hold_tx),
297            introspection,
298        );
299        state.target_replica = target_replica;
300        // If the collection is write-only, clear its read policy to reflect that.
301        if write_only {
302            state.read_policy = None;
303        }
304
305        if let Some(previous) = self.collections.insert(id, state) {
306            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
307        }
308
309        // Add per-replica collection state.
310        for replica in self.replicas.values_mut() {
311            if target_replica.is_some_and(|id| id != replica.id) {
312                continue;
313            }
314            replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
315        }
316    }
317
318    fn remove_collection(&mut self, id: GlobalId) {
319        // Remove per-replica collection state.
320        for replica in self.replicas.values_mut() {
321            replica.remove_collection(id);
322        }
323
324        // Remove global collection state.
325        self.collections.remove(&id);
326    }
327
328    fn add_replica_state(
329        &mut self,
330        id: ReplicaId,
331        client: ReplicaClient<T>,
332        config: ReplicaConfig,
333        epoch: u64,
334    ) {
335        let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
336
337        let metrics = self.metrics.for_replica(id);
338        let mut replica = ReplicaState::new(
339            id,
340            client,
341            config,
342            metrics,
343            self.introspection_tx.clone(),
344            epoch,
345        );
346
347        // Add per-replica collection state.
348        for (collection_id, collection) in &self.collections {
349            // Skip log collections not maintained by this replica,
350            // and collections targeted at a different replica.
351            if (collection.log_collection && !log_ids.contains(collection_id))
352                || collection.target_replica.is_some_and(|rid| rid != id)
353            {
354                continue;
355            }
356
357            let as_of = if collection.log_collection {
358                // For log collections, we don't send a `CreateDataflow` command to the replica, so
359                // it doesn't know which as-of the controler chose and defaults to the minimum
360                // frontier instead. We need to initialize the controller-side tracking with the
361                // same frontier, to avoid observing regressions in the reported frontiers.
362                Antichain::from_elem(T::minimum())
363            } else {
364                collection.read_frontier().to_owned()
365            };
366
367            let input_read_holds = collection.storage_dependencies.values().cloned().collect();
368            replica.add_collection(*collection_id, as_of, input_read_holds);
369        }
370
371        self.replicas.insert(id, replica);
372    }
373
374    /// Enqueue the given response for delivery to the controller clients.
375    fn deliver_response(&self, response: ComputeControllerResponse<T>) {
376        // Failure to send means the `ComputeController` has been dropped and doesn't care about
377        // responses anymore.
378        let _ = self.response_tx.send(response);
379    }
380
381    /// Enqueue the given introspection updates for recording.
382    fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
383        // Failure to send means the `ComputeController` has been dropped and doesn't care about
384        // introspection updates anymore.
385        let _ = self.introspection_tx.send((type_, updates));
386    }
387
388    /// Returns whether the identified replica exists.
389    fn replica_exists(&self, id: ReplicaId) -> bool {
390        self.replicas.contains_key(&id)
391    }
392
393    /// Return the IDs of pending peeks targeting the specified replica.
394    fn peeks_targeting(
395        &self,
396        replica_id: ReplicaId,
397    ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)> {
398        self.peeks.iter().filter_map(move |(uuid, peek)| {
399            if peek.target_replica == Some(replica_id) {
400                Some((*uuid, peek))
401            } else {
402                None
403            }
404        })
405    }
406
407    /// Return the IDs of in-progress subscribes targeting the specified replica.
408    fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
409        self.subscribes.keys().copied().filter(move |id| {
410            let collection = self.expect_collection(*id);
411            collection.target_replica == Some(replica_id)
412        })
413    }
414
415    /// Update introspection with the current collection frontiers.
416    ///
417    /// We could also do this directly in response to frontier changes, but doing it periodically
418    /// lets us avoid emitting some introspection updates that can be consolidated (e.g. a write
419    /// frontier updated immediately followed by a read frontier update).
420    ///
421    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
422    /// per second during normal operation.
423    fn update_frontier_introspection(&mut self) {
424        for collection in self.collections.values_mut() {
425            collection
426                .introspection
427                .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
428        }
429
430        for replica in self.replicas.values_mut() {
431            for collection in replica.collections.values_mut() {
432                collection
433                    .introspection
434                    .observe_frontier(&collection.write_frontier);
435            }
436        }
437    }
438
439    /// Refresh the controller state metrics for this instance.
440    ///
441    /// We could also do state metric updates directly in response to state changes, but that would
442    /// mean littering the code with metric update calls. Encapsulating state metric maintenance in
443    /// a single method is less noisy.
444    ///
445    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
446    /// per second during normal operation.
447    fn refresh_state_metrics(&self) {
448        let unscheduled_collections_count =
449            self.collections.values().filter(|c| !c.scheduled).count();
450        let connected_replica_count = self
451            .replicas
452            .values()
453            .filter(|r| r.client.is_connected())
454            .count();
455
456        self.metrics
457            .replica_count
458            .set(u64::cast_from(self.replicas.len()));
459        self.metrics
460            .collection_count
461            .set(u64::cast_from(self.collections.len()));
462        self.metrics
463            .collection_unscheduled_count
464            .set(u64::cast_from(unscheduled_collections_count));
465        self.metrics
466            .peek_count
467            .set(u64::cast_from(self.peeks.len()));
468        self.metrics
469            .subscribe_count
470            .set(u64::cast_from(self.subscribes.len()));
471        self.metrics
472            .copy_to_count
473            .set(u64::cast_from(self.copy_tos.len()));
474        self.metrics
475            .connected_replica_count
476            .set(u64::cast_from(connected_replica_count));
477    }
478
479    /// Refresh the wallclock lag introspection and metrics with the current lag values.
480    ///
481    /// This method produces wallclock lag metrics of two different shapes:
482    ///
483    /// * Histories: For each replica and each collection, we measure the lag of the write frontier
484    ///   behind the wallclock time every second. Every minute we emit the maximum lag observed
485    ///   over the last minute, together with the current time.
486    /// * Histograms: For each collection, we measure the lag of the write frontier behind
487    ///   wallclock time every second. Every minute we emit all lags observed over the last minute,
488    ///   together with the current histogram period.
489    ///
490    /// Histories are emitted to both Mz introspection and Prometheus, histograms only to
491    /// introspection. We treat lags of unreadable collections (i.e. collections that contain no
492    /// readable times) as undefined and set them to NULL in introspection and `u64::MAX` in
493    /// Prometheus.
494    ///
495    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
496    /// per second during normal operation.
497    fn refresh_wallclock_lag(&mut self) {
498        let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
499            Some(ts) => (self.wallclock_lag)(ts.clone()),
500            None => Duration::ZERO,
501        };
502
503        let now_ms = (self.now)();
504        let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
505        let histogram_labels = match &self.workload_class {
506            Some(wc) => [("workload_class", wc.clone())].into(),
507            None => BTreeMap::new(),
508        };
509
510        // For collections that sink into storage, we need to ask the storage controller to know
511        // whether they're currently readable.
512        let readable_storage_collections: BTreeSet<_> = self
513            .collections
514            .keys()
515            .filter_map(|id| {
516                let frontiers = self.storage_collections.collection_frontiers(*id).ok()?;
517                PartialOrder::less_than(&frontiers.read_capabilities, &frontiers.write_frontier)
518                    .then_some(*id)
519            })
520            .collect();
521
522        // First, iterate over all collections and collect histogram measurements.
523        for (id, collection) in &mut self.collections {
524            let write_frontier = collection.write_frontier();
525            let readable = if self.storage_collections.check_exists(*id).is_ok() {
526                readable_storage_collections.contains(id)
527            } else {
528                PartialOrder::less_than(&collection.read_frontier(), &write_frontier)
529            };
530
531            if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
532                let bucket = if readable {
533                    let lag = frontier_lag(&write_frontier);
534                    let lag = lag.as_secs().next_power_of_two();
535                    WallclockLag::Seconds(lag)
536                } else {
537                    WallclockLag::Undefined
538                };
539
540                let key = (histogram_period, bucket, histogram_labels.clone());
541                *stash.entry(key).or_default() += Diff::ONE;
542            }
543        }
544
545        // Second, iterate over all per-replica collections and collect history measurements.
546        for replica in self.replicas.values_mut() {
547            for (id, collection) in &mut replica.collections {
548                // A per-replica collection is considered readable in the context of lag
549                // measurement if either:
550                //  (a) it sinks into a storage collection that is readable
551                //  (b) it is hydrated
552                let readable = readable_storage_collections.contains(id) || collection.hydrated();
553
554                let lag = if readable {
555                    let lag = frontier_lag(&collection.write_frontier);
556                    WallclockLag::Seconds(lag.as_secs())
557                } else {
558                    WallclockLag::Undefined
559                };
560
561                if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
562                    *wallclock_lag_max = (*wallclock_lag_max).max(lag);
563                }
564
565                if let Some(metrics) = &mut collection.metrics {
566                    // No way to specify values as undefined in Prometheus metrics, so we use the
567                    // maximum value instead.
568                    let secs = lag.unwrap_seconds_or(u64::MAX);
569                    metrics.wallclock_lag.observe(secs);
570                };
571            }
572        }
573
574        // Record lags to persist, if it's time.
575        self.maybe_record_wallclock_lag();
576    }
577
578    /// Produce new wallclock lag introspection updates, provided enough time has passed since the
579    /// last recording.
580    //
581    /// We emit new introspection updates if the system time has passed into a new multiple of the
582    /// recording interval (typically 1 minute) since the last refresh. The storage controller uses
583    /// the same approach, ensuring that both controllers commit their lags at roughly the same
584    /// time, avoiding confusion caused by inconsistencies.
585    fn maybe_record_wallclock_lag(&mut self) {
586        if self.read_only {
587            return;
588        }
589
590        let duration_trunc = |datetime: DateTime<_>, interval| {
591            let td = TimeDelta::from_std(interval).ok()?;
592            datetime.duration_trunc(td).ok()
593        };
594
595        let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
596        let now_dt = mz_ore::now::to_datetime((self.now)());
597        let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
598            soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
599            let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
600            duration_trunc(now_dt, *default).unwrap()
601        });
602        if now_trunc <= self.wallclock_lag_last_recorded {
603            return;
604        }
605
606        let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
607
608        let mut history_updates = Vec::new();
609        for (replica_id, replica) in &mut self.replicas {
610            for (collection_id, collection) in &mut replica.collections {
611                let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
612                    continue;
613                };
614
615                let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
616                let row = Row::pack_slice(&[
617                    Datum::String(&collection_id.to_string()),
618                    Datum::String(&replica_id.to_string()),
619                    max_lag.into_interval_datum(),
620                    Datum::TimestampTz(now_ts),
621                ]);
622                history_updates.push((row, Diff::ONE));
623            }
624        }
625        if !history_updates.is_empty() {
626            self.deliver_introspection_updates(
627                IntrospectionType::WallclockLagHistory,
628                history_updates,
629            );
630        }
631
632        let mut histogram_updates = Vec::new();
633        let mut row_buf = Row::default();
634        for (collection_id, collection) in &mut self.collections {
635            let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
636                continue;
637            };
638
639            for ((period, lag, labels), count) in std::mem::take(stash) {
640                let mut packer = row_buf.packer();
641                packer.extend([
642                    Datum::TimestampTz(period.start),
643                    Datum::TimestampTz(period.end),
644                    Datum::String(&collection_id.to_string()),
645                    lag.into_uint64_datum(),
646                ]);
647                let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
648                packer.push_dict(labels);
649
650                histogram_updates.push((row_buf.clone(), count));
651            }
652        }
653        if !histogram_updates.is_empty() {
654            self.deliver_introspection_updates(
655                IntrospectionType::WallclockLagHistogram,
656                histogram_updates,
657            );
658        }
659
660        self.wallclock_lag_last_recorded = now_trunc;
661    }
662
663    /// Returns `true` if the given collection is hydrated on at least one
664    /// replica.
665    ///
666    /// This also returns `true` in case this cluster does not have any
667    /// replicas.
668    #[mz_ore::instrument(level = "debug")]
669    pub fn collection_hydrated(&self, collection_id: GlobalId) -> Result<bool, CollectionMissing> {
670        if self.replicas.is_empty() {
671            return Ok(true);
672        }
673        for replica_state in self.replicas.values() {
674            let collection_state = replica_state
675                .collections
676                .get(&collection_id)
677                .ok_or(CollectionMissing(collection_id))?;
678
679            if collection_state.hydrated() {
680                return Ok(true);
681            }
682        }
683
684        Ok(false)
685    }
686
687    /// Returns `true` if each non-transient, non-excluded collection is hydrated on at
688    /// least one replica.
689    ///
690    /// This also returns `true` in case this cluster does not have any
691    /// replicas.
692    #[mz_ore::instrument(level = "debug")]
693    pub fn collections_hydrated_on_replicas(
694        &self,
695        target_replica_ids: Option<Vec<ReplicaId>>,
696        exclude_collections: &BTreeSet<GlobalId>,
697    ) -> Result<bool, HydrationCheckBadTarget> {
698        if self.replicas.is_empty() {
699            return Ok(true);
700        }
701        let mut all_hydrated = true;
702        let target_replicas: BTreeSet<ReplicaId> = self
703            .replicas
704            .keys()
705            .filter_map(|id| match target_replica_ids {
706                None => Some(id.clone()),
707                Some(ref ids) if ids.contains(id) => Some(id.clone()),
708                Some(_) => None,
709            })
710            .collect();
711        if let Some(targets) = target_replica_ids {
712            if target_replicas.is_empty() {
713                return Err(HydrationCheckBadTarget(targets));
714            }
715        }
716
717        for (id, _collection) in self.collections_iter() {
718            if id.is_transient() || exclude_collections.contains(&id) {
719                continue;
720            }
721
722            let mut collection_hydrated = false;
723            for replica_state in self.replicas.values() {
724                if !target_replicas.contains(&replica_state.id) {
725                    continue;
726                }
727                let collection_state = replica_state
728                    .collections
729                    .get(&id)
730                    .expect("missing collection state");
731
732                if collection_state.hydrated() {
733                    collection_hydrated = true;
734                    break;
735                }
736            }
737
738            if !collection_hydrated {
739                tracing::info!("collection {id} is not hydrated on any replica");
740                all_hydrated = false;
741                // We continue with our loop instead of breaking out early, so
742                // that we log all non-hydrated replicas.
743            }
744        }
745
746        Ok(all_hydrated)
747    }
748
749    /// Clean up collection state that is not needed anymore.
750    ///
751    /// Three conditions need to be true before we can remove state for a collection:
752    ///
753    ///  1. A client must have explicitly dropped the collection. If that is not the case, clients
754    ///     can still reasonably assume that the controller knows about the collection and can
755    ///     answer queries about it.
756    ///  2. There must be no outstanding read capabilities on the collection. As long as someone
757    ///     still holds read capabilities on a collection, we need to keep it around to be able
758    ///     to properly handle downgrading of said capabilities.
759    ///  3. All replica frontiers for the collection must have advanced to the empty frontier.
760    ///     Advancement to the empty frontiers signals that replicas are done computing the
761    ///     collection and that they won't send more `ComputeResponse`s for it. As long as we might
762    ///     receive responses for a collection we want to keep it around to be able to validate and
763    ///     handle these responses.
764    fn cleanup_collections(&mut self) {
765        let to_remove: Vec<_> = self
766            .collections_iter()
767            .filter(|(id, collection)| {
768                collection.dropped
769                    && collection.shared.lock_read_capabilities(|c| c.is_empty())
770                    && self
771                        .replicas
772                        .values()
773                        .all(|r| r.collection_frontiers_empty(*id))
774            })
775            .map(|(id, _collection)| id)
776            .collect();
777
778        for id in to_remove {
779            self.remove_collection(id);
780        }
781    }
782
783    /// Returns the state of the [`Instance`] formatted as JSON.
784    ///
785    /// The returned value is not guaranteed to be stable and may change at any point in time.
786    #[mz_ore::instrument(level = "debug")]
787    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
788        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
789        // returned object as a tradeoff between usability and stability. `serde_json` will fail
790        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
791        // prevents a future unrelated change from silently breaking this method.
792
793        // Destructure `self` here so we don't forget to consider dumping newly added fields.
794        let Self {
795            build_info: _,
796            storage_collections: _,
797            peek_stash_persist_location: _,
798            initialized,
799            read_only,
800            workload_class,
801            replicas,
802            collections,
803            log_sources: _,
804            peeks,
805            subscribes,
806            copy_tos,
807            history: _,
808            command_rx: _,
809            response_tx: _,
810            introspection_tx: _,
811            metrics: _,
812            dyncfg: _,
813            now: _,
814            wallclock_lag: _,
815            wallclock_lag_last_recorded,
816            read_hold_tx: _,
817            replica_tx: _,
818            replica_rx: _,
819        } = self;
820
821        let replicas: BTreeMap<_, _> = replicas
822            .iter()
823            .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
824            .collect::<Result<_, anyhow::Error>>()?;
825        let collections: BTreeMap<_, _> = collections
826            .iter()
827            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
828            .collect();
829        let peeks: BTreeMap<_, _> = peeks
830            .iter()
831            .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
832            .collect();
833        let subscribes: BTreeMap<_, _> = subscribes
834            .iter()
835            .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
836            .collect();
837        let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
838        let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
839
840        Ok(serde_json::json!({
841            "initialized": initialized,
842            "read_only": read_only,
843            "workload_class": workload_class,
844            "replicas": replicas,
845            "collections": collections,
846            "peeks": peeks,
847            "subscribes": subscribes,
848            "copy_tos": copy_tos,
849            "wallclock_lag_last_recorded": wallclock_lag_last_recorded,
850        }))
851    }
852
853    /// Reports the current write frontier for the identified compute collection.
854    pub(super) fn collection_write_frontier(
855        &self,
856        id: GlobalId,
857    ) -> Result<Antichain<T>, CollectionMissing> {
858        Ok(self.collection(id)?.write_frontier())
859    }
860}
861
862impl<T> Instance<T>
863where
864    T: ComputeControllerTimestamp,
865{
866    pub(super) fn new(
867        build_info: &'static BuildInfo,
868        storage: StorageCollections<T>,
869        peek_stash_persist_location: PersistLocation,
870        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
871        metrics: InstanceMetrics,
872        now: NowFn,
873        wallclock_lag: WallclockLagFn<T>,
874        dyncfg: Arc<ConfigSet>,
875        command_rx: mpsc::UnboundedReceiver<Command<T>>,
876        response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
877        read_hold_tx: read_holds::ChangeTx<T>,
878        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
879        read_only: bool,
880    ) -> Self {
881        let mut collections = BTreeMap::new();
882        let mut log_sources = BTreeMap::new();
883        for (log, id, shared) in arranged_logs {
884            let collection = CollectionState::new_log_collection(
885                id,
886                shared,
887                Arc::clone(&read_hold_tx),
888                introspection_tx.clone(),
889            );
890            collections.insert(id, collection);
891            log_sources.insert(log, id);
892        }
893
894        let history = ComputeCommandHistory::new(metrics.for_history());
895
896        let send_count = metrics.response_send_count.clone();
897        let recv_count = metrics.response_recv_count.clone();
898        let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
899
900        let now_dt = mz_ore::now::to_datetime(now());
901
902        Self {
903            build_info,
904            storage_collections: storage,
905            peek_stash_persist_location,
906            initialized: false,
907            read_only,
908            workload_class: None,
909            replicas: Default::default(),
910            collections,
911            log_sources,
912            peeks: Default::default(),
913            subscribes: Default::default(),
914            copy_tos: Default::default(),
915            history,
916            command_rx,
917            response_tx,
918            introspection_tx,
919            metrics,
920            dyncfg,
921            now,
922            wallclock_lag,
923            wallclock_lag_last_recorded: now_dt,
924            read_hold_tx,
925            replica_tx,
926            replica_rx,
927        }
928    }
929
930    pub(super) async fn run(mut self) {
931        self.send(ComputeCommand::Hello {
932            // The nonce is protocol iteration-specific and will be set in
933            // `ReplicaTask::specialize_command`.
934            nonce: Uuid::default(),
935        });
936
937        let instance_config = InstanceConfig {
938            peek_stash_persist_location: self.peek_stash_persist_location.clone(),
939            // The remaining fields are replica-specific and will be set in
940            // `ReplicaTask::specialize_command`.
941            logging: Default::default(),
942            expiration_offset: Default::default(),
943        };
944
945        self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
946
947        loop {
948            tokio::select! {
949                command = self.command_rx.recv() => match command {
950                    Some(cmd) => cmd(&mut self),
951                    None => break,
952                },
953                response = self.replica_rx.recv() => match response {
954                    Some(response) => self.handle_response(response),
955                    None => unreachable!("self owns a sender side of the channel"),
956                }
957            }
958        }
959    }
960
961    /// Update instance configuration.
962    #[mz_ore::instrument(level = "debug")]
963    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
964        if let Some(workload_class) = &config_params.workload_class {
965            self.workload_class = workload_class.clone();
966        }
967
968        let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
969        self.send(command);
970    }
971
972    /// Marks the end of any initialization commands.
973    ///
974    /// Intended to be called by `Controller`, rather than by other code.
975    /// Calling this method repeatedly has no effect.
976    #[mz_ore::instrument(level = "debug")]
977    pub fn initialization_complete(&mut self) {
978        // The compute protocol requires that `InitializationComplete` is sent only once.
979        if !self.initialized {
980            self.send(ComputeCommand::InitializationComplete);
981            self.initialized = true;
982        }
983    }
984
985    /// Allows collections to affect writes to external systems (persist).
986    ///
987    /// Calling this method repeatedly has no effect.
988    #[mz_ore::instrument(level = "debug")]
989    pub fn allow_writes(&mut self, collection_id: GlobalId) -> Result<(), CollectionMissing> {
990        let collection = self.collection_mut(collection_id)?;
991
992        // Do not send redundant allow-writes commands.
993        if !collection.read_only {
994            return Ok(());
995        }
996
997        // Don't send allow-writes for collections that are not installed.
998        let as_of = collection.read_frontier();
999
1000        // If the collection has an empty `as_of`, it was either never installed on the replica or
1001        // has since been dropped. In either case the replica does not expect any commands for it.
1002        if as_of.is_empty() {
1003            return Ok(());
1004        }
1005
1006        collection.read_only = false;
1007        self.send(ComputeCommand::AllowWrites(collection_id));
1008
1009        Ok(())
1010    }
1011
1012    /// Shut down this instance.
1013    ///
1014    /// This method asserts that the instance has no replicas left. It exists to help
1015    /// us find bugs where the client drops a compute instance that still has replicas
1016    /// installed, and later assumes that said replicas still exist.
1017    ///
1018    /// # Panics
1019    ///
1020    /// Soft-panics if the compute instance still has active replicas.
1021    #[mz_ore::instrument(level = "debug")]
1022    pub fn shutdown(&mut self) {
1023        // Taking the `command_rx` ensures that the [`Instance::run`] loop terminates.
1024        let (_tx, rx) = mpsc::unbounded_channel();
1025        self.command_rx = rx;
1026
1027        let stray_replicas: Vec<_> = self.replicas.keys().collect();
1028        soft_assert_or_log!(
1029            stray_replicas.is_empty(),
1030            "dropped instance still has provisioned replicas: {stray_replicas:?}",
1031        );
1032    }
1033
1034    /// Sends a command to replicas of this instance.
1035    #[mz_ore::instrument(level = "debug")]
1036    fn send(&mut self, cmd: ComputeCommand<T>) {
1037        // Record the command so that new replicas can be brought up to speed.
1038        self.history.push(cmd.clone());
1039
1040        let target_replica = self.target_replica(&cmd);
1041
1042        if let Some(rid) = target_replica {
1043            if let Some(replica) = self.replicas.get_mut(&rid) {
1044                let _ = replica.client.send(cmd);
1045            }
1046        } else {
1047            for replica in self.replicas.values_mut() {
1048                let _ = replica.client.send(cmd.clone());
1049            }
1050        }
1051    }
1052
1053    /// Determine the target replica for a compute command. Retrieves the
1054    /// collection named by the command, and returns the target replica if
1055    /// it is set, and None if not set, or the command doesn't name a collection.
1056    ///
1057    /// Panics if a create-dataflow command names collections that have different
1058    /// target replicas. It is an error to construct such an object and would
1059    /// indicate a bug in [`Self::create_dataflow`].
1060    fn target_replica(&self, cmd: &ComputeCommand<T>) -> Option<ReplicaId> {
1061        match &cmd {
1062            ComputeCommand::Schedule(id)
1063            | ComputeCommand::AllowWrites(id)
1064            | ComputeCommand::AllowCompaction { id, .. } => {
1065                self.expect_collection(*id).target_replica
1066            }
1067            ComputeCommand::CreateDataflow(desc) => {
1068                let mut target_replica = None;
1069                for id in desc.export_ids() {
1070                    if let Some(replica) = self.expect_collection(id).target_replica {
1071                        if target_replica.is_some() {
1072                            assert_eq!(target_replica, Some(replica));
1073                        }
1074                        target_replica = Some(replica);
1075                    }
1076                }
1077                target_replica
1078            }
1079            // Skip Peek as we don't allow replica-targeted indexes.
1080            ComputeCommand::Peek(_)
1081            | ComputeCommand::Hello { .. }
1082            | ComputeCommand::CreateInstance(_)
1083            | ComputeCommand::InitializationComplete
1084            | ComputeCommand::UpdateConfiguration(_)
1085            | ComputeCommand::CancelPeek { .. } => None,
1086        }
1087    }
1088
1089    /// Add a new instance replica, by ID.
1090    #[mz_ore::instrument(level = "debug")]
1091    pub fn add_replica(
1092        &mut self,
1093        id: ReplicaId,
1094        mut config: ReplicaConfig,
1095        epoch: Option<u64>,
1096    ) -> Result<(), ReplicaExists> {
1097        if self.replica_exists(id) {
1098            return Err(ReplicaExists(id));
1099        }
1100
1101        config.logging.index_logs = self.log_sources.clone();
1102
1103        let epoch = epoch.unwrap_or(1);
1104        let metrics = self.metrics.for_replica(id);
1105        let client = ReplicaClient::spawn(
1106            id,
1107            self.build_info,
1108            config.clone(),
1109            epoch,
1110            metrics.clone(),
1111            Arc::clone(&self.dyncfg),
1112            self.replica_tx.clone(),
1113        );
1114
1115        // Take this opportunity to clean up the history we should present.
1116        self.history.reduce();
1117
1118        // Advance the uppers of source imports
1119        self.history.update_source_uppers(&self.storage_collections);
1120
1121        // Replay the commands at the client, creating new dataflow identifiers.
1122        for command in self.history.iter() {
1123            // Skip `CreateDataflow` commands targeted at different replicas.
1124            if let Some(target_replica) = self.target_replica(command)
1125                && target_replica != id
1126            {
1127                continue;
1128            }
1129
1130            if client.send(command.clone()).is_err() {
1131                // We swallow the error here. On the next send, we will fail again, and
1132                // restart the connection as well as this rehydration.
1133                tracing::warn!("Replica {:?} connection terminated during hydration", id);
1134                break;
1135            }
1136        }
1137
1138        // Add replica to tracked state.
1139        self.add_replica_state(id, client, config, epoch);
1140
1141        Ok(())
1142    }
1143
1144    /// Remove an existing instance replica, by ID.
1145    #[mz_ore::instrument(level = "debug")]
1146    pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1147        self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1148
1149        // Subscribes targeting this replica either won't be served anymore (if the replica is
1150        // dropped) or might produce inconsistent output (if the target collection is an
1151        // introspection index). We produce an error to inform upstream.
1152        let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1153        for subscribe_id in to_drop {
1154            let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1155            let response = ComputeControllerResponse::SubscribeResponse(
1156                subscribe_id,
1157                SubscribeBatch {
1158                    lower: subscribe.frontier.clone(),
1159                    upper: subscribe.frontier,
1160                    updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1161                },
1162            );
1163            self.deliver_response(response);
1164        }
1165
1166        // Peeks targeting this replica might not be served anymore (if the replica is dropped).
1167        // If the replica has failed it might come back and respond to the peek later, but it still
1168        // seems like a good idea to cancel the peek to inform the caller about the failure. This
1169        // is consistent with how we handle targeted subscribes above.
1170        let mut peek_responses = Vec::new();
1171        let mut to_drop = Vec::new();
1172        for (uuid, peek) in self.peeks_targeting(id) {
1173            peek_responses.push(ComputeControllerResponse::PeekNotification(
1174                uuid,
1175                PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1176                peek.otel_ctx.clone(),
1177            ));
1178            to_drop.push(uuid);
1179        }
1180        for response in peek_responses {
1181            self.deliver_response(response);
1182        }
1183        for uuid in to_drop {
1184            let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1185            self.finish_peek(uuid, response);
1186        }
1187
1188        // We might have a chance to forward implied capabilities and reduce the cost of bringing
1189        // up the next replica, if the dropped replica was the only one in the cluster.
1190        self.forward_implied_capabilities();
1191
1192        Ok(())
1193    }
1194
1195    /// Rehydrate the given instance replica.
1196    ///
1197    /// # Panics
1198    ///
1199    /// Panics if the specified replica does not exist.
1200    fn rehydrate_replica(&mut self, id: ReplicaId) {
1201        let config = self.replicas[&id].config.clone();
1202        let epoch = self.replicas[&id].epoch + 1;
1203
1204        self.remove_replica(id).expect("replica must exist");
1205        let result = self.add_replica(id, config, Some(epoch));
1206
1207        match result {
1208            Ok(()) => (),
1209            Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1210        }
1211    }
1212
1213    /// Rehydrate any failed replicas of this instance.
1214    fn rehydrate_failed_replicas(&mut self) {
1215        let replicas = self.replicas.iter();
1216        let failed_replicas: Vec<_> = replicas
1217            .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1218            .collect();
1219
1220        for replica_id in failed_replicas {
1221            self.rehydrate_replica(replica_id);
1222        }
1223    }
1224
1225    /// Creates the described dataflow and initializes state for its output.
1226    ///
1227    /// This method expects a `DataflowDescription` with an `as_of` frontier specified, as well as
1228    /// for each imported collection a read hold in `import_read_holds` at at least the `as_of`.
1229    #[mz_ore::instrument(level = "debug")]
1230    pub fn create_dataflow(
1231        &mut self,
1232        dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1233        import_read_holds: Vec<ReadHold<T>>,
1234        mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState<T>>,
1235        target_replica: Option<ReplicaId>,
1236    ) -> Result<(), DataflowCreationError> {
1237        use DataflowCreationError::*;
1238
1239        // Validate that the target replica, if specified, exists.
1240        // A targeted dataflow is only installed on a single replica; if that
1241        // replica doesn't exist, we can't create the dataflow.
1242        if let Some(replica_id) = target_replica {
1243            if !self.replica_exists(replica_id) {
1244                return Err(ReplicaMissing(replica_id));
1245            }
1246        }
1247
1248        // Simple sanity checks around `as_of`
1249        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1250        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1251            return Err(EmptyAsOfForSubscribe);
1252        }
1253        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1254            return Err(EmptyAsOfForCopyTo);
1255        }
1256
1257        // Collect all dependencies of the dataflow, and read holds on them at the `as_of`.
1258        let mut storage_dependencies = BTreeMap::new();
1259        let mut compute_dependencies = BTreeMap::new();
1260
1261        // When we install per-replica input read holds, we cannot use the `as_of` because of
1262        // reconciliation: Existing slow replicas might be reading from the inputs at times before
1263        // the `as_of` and we would rather not crash them by allowing their inputs to compact too
1264        // far. So instead we take read holds at the least time available.
1265        let mut replica_input_read_holds = Vec::new();
1266
1267        let mut import_read_holds: BTreeMap<_, _> =
1268            import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1269
1270        for &id in dataflow.source_imports.keys() {
1271            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1272            replica_input_read_holds.push(read_hold.clone());
1273
1274            read_hold
1275                .try_downgrade(as_of.clone())
1276                .map_err(|_| ReadHoldInsufficient(id))?;
1277            storage_dependencies.insert(id, read_hold);
1278        }
1279
1280        for &id in dataflow.index_imports.keys() {
1281            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1282            read_hold
1283                .try_downgrade(as_of.clone())
1284                .map_err(|_| ReadHoldInsufficient(id))?;
1285            compute_dependencies.insert(id, read_hold);
1286        }
1287
1288        // If the `as_of` is empty, we are not going to create a dataflow, so replicas won't read
1289        // from the inputs.
1290        if as_of.is_empty() {
1291            replica_input_read_holds = Default::default();
1292        }
1293
1294        // Install collection state for each of the exports.
1295        for export_id in dataflow.export_ids() {
1296            let shared = shared_collection_state
1297                .remove(&export_id)
1298                .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1299            let write_only = dataflow.sink_exports.contains_key(&export_id);
1300            let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1301
1302            self.add_collection(
1303                export_id,
1304                as_of.clone(),
1305                shared,
1306                storage_dependencies.clone(),
1307                compute_dependencies.clone(),
1308                replica_input_read_holds.clone(),
1309                write_only,
1310                storage_sink,
1311                dataflow.initial_storage_as_of.clone(),
1312                dataflow.refresh_schedule.clone(),
1313                target_replica,
1314            );
1315
1316            // If the export is a storage sink, we can advance its write frontier to the write
1317            // frontier of the target storage collection.
1318            if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1319                self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1320            }
1321        }
1322
1323        // Initialize tracking of subscribes.
1324        for subscribe_id in dataflow.subscribe_ids() {
1325            self.subscribes
1326                .insert(subscribe_id, ActiveSubscribe::default());
1327        }
1328
1329        // Initialize tracking of copy tos.
1330        for copy_to_id in dataflow.copy_to_ids() {
1331            self.copy_tos.insert(copy_to_id);
1332        }
1333
1334        // Here we augment all imported sources and all exported sinks with the appropriate
1335        // storage metadata needed by the compute instance.
1336        let mut source_imports = BTreeMap::new();
1337        for (id, import) in dataflow.source_imports {
1338            let frontiers = self
1339                .storage_collections
1340                .collection_frontiers(id)
1341                .expect("collection exists");
1342
1343            let collection_metadata = self
1344                .storage_collections
1345                .collection_metadata(id)
1346                .expect("we have a read hold on this collection");
1347
1348            let desc = SourceInstanceDesc {
1349                storage_metadata: collection_metadata.clone(),
1350                arguments: import.desc.arguments,
1351                typ: import.desc.typ.clone(),
1352            };
1353            source_imports.insert(
1354                id,
1355                mz_compute_types::dataflows::SourceImport {
1356                    desc,
1357                    monotonic: import.monotonic,
1358                    with_snapshot: import.with_snapshot,
1359                    upper: frontiers.write_frontier,
1360                },
1361            );
1362        }
1363
1364        let mut sink_exports = BTreeMap::new();
1365        for (id, se) in dataflow.sink_exports {
1366            let connection = match se.connection {
1367                ComputeSinkConnection::MaterializedView(conn) => {
1368                    let metadata = self
1369                        .storage_collections
1370                        .collection_metadata(id)
1371                        .map_err(|_| CollectionMissing(id))?
1372                        .clone();
1373                    let conn = MaterializedViewSinkConnection {
1374                        value_desc: conn.value_desc,
1375                        storage_metadata: metadata,
1376                    };
1377                    ComputeSinkConnection::MaterializedView(conn)
1378                }
1379                ComputeSinkConnection::ContinualTask(conn) => {
1380                    let metadata = self
1381                        .storage_collections
1382                        .collection_metadata(id)
1383                        .map_err(|_| DataflowCreationError::CollectionMissing(id))?
1384                        .clone();
1385                    let conn = ContinualTaskConnection {
1386                        input_id: conn.input_id,
1387                        storage_metadata: metadata,
1388                    };
1389                    ComputeSinkConnection::ContinualTask(conn)
1390                }
1391                ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1392                ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1393                    ComputeSinkConnection::CopyToS3Oneshot(conn)
1394                }
1395            };
1396            let desc = ComputeSinkDesc {
1397                from: se.from,
1398                from_desc: se.from_desc,
1399                connection,
1400                with_snapshot: se.with_snapshot,
1401                up_to: se.up_to,
1402                non_null_assertions: se.non_null_assertions,
1403                refresh_schedule: se.refresh_schedule,
1404            };
1405            sink_exports.insert(id, desc);
1406        }
1407
1408        // Flatten the dataflow plans into the representation expected by replicas.
1409        let objects_to_build = dataflow
1410            .objects_to_build
1411            .into_iter()
1412            .map(|object| BuildDesc {
1413                id: object.id,
1414                plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1415            })
1416            .collect();
1417
1418        let augmented_dataflow = DataflowDescription {
1419            source_imports,
1420            sink_exports,
1421            objects_to_build,
1422            // The rest of the fields are identical
1423            index_imports: dataflow.index_imports,
1424            index_exports: dataflow.index_exports,
1425            as_of: dataflow.as_of.clone(),
1426            until: dataflow.until,
1427            initial_storage_as_of: dataflow.initial_storage_as_of,
1428            refresh_schedule: dataflow.refresh_schedule,
1429            debug_name: dataflow.debug_name,
1430            time_dependence: dataflow.time_dependence,
1431        };
1432
1433        if augmented_dataflow.is_transient() {
1434            tracing::debug!(
1435                name = %augmented_dataflow.debug_name,
1436                import_ids = %augmented_dataflow.display_import_ids(),
1437                export_ids = %augmented_dataflow.display_export_ids(),
1438                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1439                until = ?augmented_dataflow.until.elements(),
1440                "creating dataflow",
1441            );
1442        } else {
1443            tracing::info!(
1444                name = %augmented_dataflow.debug_name,
1445                import_ids = %augmented_dataflow.display_import_ids(),
1446                export_ids = %augmented_dataflow.display_export_ids(),
1447                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1448                until = ?augmented_dataflow.until.elements(),
1449                "creating dataflow",
1450            );
1451        }
1452
1453        // Skip the actual dataflow creation for an empty `as_of`. (Happens e.g. for the
1454        // bootstrapping of a REFRESH AT mat view that is past its last refresh.)
1455        if as_of.is_empty() {
1456            tracing::info!(
1457                name = %augmented_dataflow.debug_name,
1458                "not sending `CreateDataflow`, because of empty `as_of`",
1459            );
1460        } else {
1461            let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1462            self.send(ComputeCommand::CreateDataflow(Box::new(augmented_dataflow)));
1463
1464            for id in collections {
1465                self.maybe_schedule_collection(id);
1466            }
1467        }
1468
1469        Ok(())
1470    }
1471
1472    /// Schedule the identified collection if all its inputs are available.
1473    ///
1474    /// # Panics
1475    ///
1476    /// Panics if the identified collection does not exist.
1477    fn maybe_schedule_collection(&mut self, id: GlobalId) {
1478        let collection = self.expect_collection(id);
1479
1480        // Don't schedule collections twice.
1481        if collection.scheduled {
1482            return;
1483        }
1484
1485        let as_of = collection.read_frontier();
1486
1487        // If the collection has an empty `as_of`, it was either never installed on the replica or
1488        // has since been dropped. In either case the replica does not expect any commands for it.
1489        if as_of.is_empty() {
1490            return;
1491        }
1492
1493        let ready = if id.is_transient() {
1494            // Always schedule transient collections immediately. The assumption is that those are
1495            // created by interactive user commands and we want to schedule them as quickly as
1496            // possible. Inputs might not yet be available, but when they become available, we
1497            // don't need to wait for the controller to become aware and for the scheduling check
1498            // to run again.
1499            true
1500        } else {
1501            // Ignore self-dependencies. Any self-dependencies do not need to be
1502            // available at the as_of for the dataflow to make progress, so we
1503            // can ignore them here. At the moment, only continual tasks have
1504            // self-dependencies, but this logic is correct for any dataflow, so
1505            // we don't special case it to CTs.
1506            let not_self_dep = |x: &GlobalId| *x != id;
1507
1508            // Make sure we never schedule a collection before its input compute collections have
1509            // been scheduled. Scheduling in the wrong order can lead to deadlocks.
1510            let mut deps_scheduled = true;
1511
1512            // Check dependency frontiers to determine if all inputs are
1513            // available. An input is available when its frontier is greater
1514            // than the `as_of`, i.e., all input data up to and including the
1515            // `as_of` has been sealed.
1516            let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1517            let mut compute_frontiers = Vec::new();
1518            for id in compute_deps {
1519                let dep = &self.expect_collection(id);
1520                deps_scheduled &= dep.scheduled;
1521                compute_frontiers.push(dep.write_frontier());
1522            }
1523
1524            let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1525            let storage_frontiers = self
1526                .storage_collections
1527                .collections_frontiers(storage_deps.collect())
1528                .expect("must exist");
1529            let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1530
1531            let mut frontiers = compute_frontiers.into_iter().chain(storage_frontiers);
1532            let frontiers_ready =
1533                frontiers.all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1534
1535            deps_scheduled && frontiers_ready
1536        };
1537
1538        if ready {
1539            self.send(ComputeCommand::Schedule(id));
1540            let collection = self.expect_collection_mut(id);
1541            collection.scheduled = true;
1542        }
1543    }
1544
1545    /// Schedule any unscheduled collections that are ready.
1546    fn schedule_collections(&mut self) {
1547        let ids: Vec<_> = self.collections.keys().copied().collect();
1548        for id in ids {
1549            self.maybe_schedule_collection(id);
1550        }
1551    }
1552
1553    /// Drops the read capability for the given collections and allows their resources to be
1554    /// reclaimed.
1555    #[mz_ore::instrument(level = "debug")]
1556    pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1557        for id in &ids {
1558            let collection = self.collection_mut(*id)?;
1559
1560            // Mark the collection as dropped to allow it to be removed from the controller state.
1561            collection.dropped = true;
1562
1563            // Drop the implied and warmup read holds to announce that clients are not
1564            // interested in the collection anymore.
1565            collection.implied_read_hold.release();
1566            collection.warmup_read_hold.release();
1567
1568            // If the collection is a subscribe, stop tracking it. This ensures that the controller
1569            // ceases to produce `SubscribeResponse`s for this subscribe.
1570            self.subscribes.remove(id);
1571            // If the collection is a copy to, stop tracking it. This ensures that the controller
1572            // ceases to produce `CopyToResponse`s` for this copy to.
1573            self.copy_tos.remove(id);
1574        }
1575
1576        Ok(())
1577    }
1578
1579    /// Initiate a peek request for the contents of `id` at `timestamp`.
1580    ///
1581    /// If this returns an error, then it didn't modify any `Instance` state.
1582    #[mz_ore::instrument(level = "debug")]
1583    pub fn peek(
1584        &mut self,
1585        peek_target: PeekTarget,
1586        literal_constraints: Option<Vec<Row>>,
1587        uuid: Uuid,
1588        timestamp: T,
1589        result_desc: RelationDesc,
1590        finishing: RowSetFinishing,
1591        map_filter_project: mz_expr::SafeMfpPlan,
1592        mut read_hold: ReadHold<T>,
1593        target_replica: Option<ReplicaId>,
1594        peek_response_tx: oneshot::Sender<PeekResponse>,
1595    ) -> Result<(), PeekError> {
1596        use PeekError::*;
1597
1598        let target_id = peek_target.id();
1599
1600        // Downgrade the provided read hold to the peek time.
1601        if read_hold.id() != target_id {
1602            return Err(ReadHoldIdMismatch(read_hold.id()));
1603        }
1604        read_hold
1605            .try_downgrade(Antichain::from_elem(timestamp.clone()))
1606            .map_err(|_| ReadHoldInsufficient(target_id))?;
1607
1608        if let Some(target) = target_replica {
1609            if !self.replica_exists(target) {
1610                return Err(ReplicaMissing(target));
1611            }
1612        }
1613
1614        let otel_ctx = OpenTelemetryContext::obtain();
1615
1616        self.peeks.insert(
1617            uuid,
1618            PendingPeek {
1619                target_replica,
1620                // TODO(guswynn): can we just hold the `tracing::Span` here instead?
1621                otel_ctx: otel_ctx.clone(),
1622                requested_at: Instant::now(),
1623                read_hold,
1624                peek_response_tx,
1625                limit: finishing.limit.map(usize::cast_from),
1626                offset: finishing.offset,
1627            },
1628        );
1629
1630        let peek = Peek {
1631            literal_constraints,
1632            uuid,
1633            timestamp,
1634            finishing,
1635            map_filter_project,
1636            // Obtain an `OpenTelemetryContext` from the thread-local tracing
1637            // tree to forward it on to the compute worker.
1638            otel_ctx,
1639            target: peek_target,
1640            result_desc,
1641        };
1642        self.send(ComputeCommand::Peek(Box::new(peek)));
1643
1644        Ok(())
1645    }
1646
1647    /// Cancels an existing peek request.
1648    #[mz_ore::instrument(level = "debug")]
1649    pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1650        let Some(peek) = self.peeks.get_mut(&uuid) else {
1651            tracing::warn!("did not find pending peek for {uuid}");
1652            return;
1653        };
1654
1655        let duration = peek.requested_at.elapsed();
1656        self.metrics
1657            .observe_peek_response(&PeekResponse::Canceled, duration);
1658
1659        // Enqueue a notification for the cancellation.
1660        let otel_ctx = peek.otel_ctx.clone();
1661        otel_ctx.attach_as_parent();
1662
1663        self.deliver_response(ComputeControllerResponse::PeekNotification(
1664            uuid,
1665            PeekNotification::Canceled,
1666            otel_ctx,
1667        ));
1668
1669        // Finish the peek.
1670        // This will also propagate the cancellation to the replicas.
1671        self.finish_peek(uuid, reason);
1672    }
1673
1674    /// Assigns a read policy to specific identifiers.
1675    ///
1676    /// The policies are assigned in the order presented, and repeated identifiers should
1677    /// conclude with the last policy. Changing a policy will immediately downgrade the read
1678    /// capability if appropriate, but it will not "recover" the read capability if the prior
1679    /// capability is already ahead of it.
1680    ///
1681    /// Identifiers not present in `policies` retain their existing read policies.
1682    ///
1683    /// It is an error to attempt to set a read policy for a collection that is not readable in the
1684    /// context of compute. At this time, only indexes are readable compute collections.
1685    #[mz_ore::instrument(level = "debug")]
1686    pub fn set_read_policy(
1687        &mut self,
1688        policies: Vec<(GlobalId, ReadPolicy<T>)>,
1689    ) -> Result<(), ReadPolicyError> {
1690        // Do error checking upfront, to avoid introducing inconsistencies between a collection's
1691        // `implied_capability` and `read_capabilities`.
1692        for (id, _policy) in &policies {
1693            let collection = self.collection(*id)?;
1694            if collection.read_policy.is_none() {
1695                return Err(ReadPolicyError::WriteOnlyCollection(*id));
1696            }
1697        }
1698
1699        for (id, new_policy) in policies {
1700            let collection = self.expect_collection_mut(id);
1701            let new_since = new_policy.frontier(collection.write_frontier().borrow());
1702            let _ = collection.implied_read_hold.try_downgrade(new_since);
1703            collection.read_policy = Some(new_policy);
1704        }
1705
1706        Ok(())
1707    }
1708
1709    /// Advance the global write frontier of the given collection.
1710    ///
1711    /// Frontier regressions are gracefully ignored.
1712    ///
1713    /// # Panics
1714    ///
1715    /// Panics if the identified collection does not exist.
1716    #[mz_ore::instrument(level = "debug")]
1717    fn maybe_update_global_write_frontier(&mut self, id: GlobalId, new_frontier: Antichain<T>) {
1718        let collection = self.expect_collection_mut(id);
1719
1720        let advanced = collection.shared.lock_write_frontier(|f| {
1721            let advanced = PartialOrder::less_than(f, &new_frontier);
1722            if advanced {
1723                f.clone_from(&new_frontier);
1724            }
1725            advanced
1726        });
1727
1728        if !advanced {
1729            return;
1730        }
1731
1732        // Relax the implied read hold according to the read policy.
1733        let new_since = match &collection.read_policy {
1734            Some(read_policy) => {
1735                // For readable collections the read frontier is determined by applying the
1736                // client-provided read policy to the write frontier.
1737                read_policy.frontier(new_frontier.borrow())
1738            }
1739            None => {
1740                // Write-only collections cannot be read within the context of the compute
1741                // controller, so their read frontier only controls the read holds taken on their
1742                // inputs. We can safely downgrade the input read holds to any time less than the
1743                // write frontier.
1744                //
1745                // Note that some write-only collections (continual tasks) need to observe changes
1746                // at their current write frontier during hydration. Thus, we cannot downgrade the
1747                // read frontier to the write frontier and instead step it back by one.
1748                Antichain::from_iter(
1749                    new_frontier
1750                        .iter()
1751                        .map(|t| t.step_back().unwrap_or_else(T::minimum)),
1752                )
1753            }
1754        };
1755        let _ = collection.implied_read_hold.try_downgrade(new_since);
1756
1757        // Report the frontier advancement.
1758        self.deliver_response(ComputeControllerResponse::FrontierUpper {
1759            id,
1760            upper: new_frontier,
1761        });
1762    }
1763
1764    /// Apply a collection read hold change.
1765    pub(super) fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch<T>) {
1766        let Some(collection) = self.collections.get_mut(&id) else {
1767            soft_panic_or_log!(
1768                "read hold change for absent collection (id={id}, changes={update:?})"
1769            );
1770            return;
1771        };
1772
1773        let new_since = collection.shared.lock_read_capabilities(|caps| {
1774            // Sanity check to prevent corrupted `read_capabilities`, which can cause hard-to-debug
1775            // issues (usually stuck read frontiers).
1776            let read_frontier = caps.frontier();
1777            for (time, diff) in update.iter() {
1778                let count = caps.count_for(time) + diff;
1779                assert!(
1780                    count >= 0,
1781                    "invalid read capabilities update: negative capability \
1782             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1783                );
1784                assert!(
1785                    count == 0 || read_frontier.less_equal(time),
1786                    "invalid read capabilities update: frontier regression \
1787             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1788                );
1789            }
1790
1791            // Apply read capability updates and learn about resulting changes to the read
1792            // frontier.
1793            let changes = caps.update_iter(update.drain());
1794
1795            let changed = changes.count() > 0;
1796            changed.then(|| caps.frontier().to_owned())
1797        });
1798
1799        let Some(new_since) = new_since else {
1800            return; // read frontier did not change
1801        };
1802
1803        // Propagate read frontier update to dependencies.
1804        for read_hold in collection.compute_dependencies.values_mut() {
1805            read_hold
1806                .try_downgrade(new_since.clone())
1807                .expect("frontiers don't regress");
1808        }
1809        for read_hold in collection.storage_dependencies.values_mut() {
1810            read_hold
1811                .try_downgrade(new_since.clone())
1812                .expect("frontiers don't regress");
1813        }
1814
1815        // Produce `AllowCompaction` command.
1816        self.send(ComputeCommand::AllowCompaction {
1817            id,
1818            frontier: new_since,
1819        });
1820    }
1821
1822    /// Fulfills a registered peek and cleans up associated state.
1823    ///
1824    /// As part of this we:
1825    ///  * Send a `PeekResponse` through the peek's response channel.
1826    ///  * Emit a `CancelPeek` command to instruct replicas to stop spending resources on this
1827    ///    peek, and to allow the `ComputeCommandHistory` to reduce away the corresponding `Peek`
1828    ///    command.
1829    ///  * Remove the read hold for this peek, unblocking compaction that might have waited on it.
1830    fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1831        let Some(peek) = self.peeks.remove(&uuid) else {
1832            return;
1833        };
1834
1835        // The recipient might not be interested in the peek response anymore, which is fine.
1836        let _ = peek.peek_response_tx.send(response);
1837
1838        // NOTE: We need to send the `CancelPeek` command _before_ we release the peek's read hold
1839        // (by dropping it), to avoid the edge case that caused database-issues#4812.
1840        self.send(ComputeCommand::CancelPeek { uuid });
1841
1842        drop(peek.read_hold);
1843    }
1844
1845    /// Handles a response from a replica. Replica IDs are re-used across replica restarts, so we
1846    /// use the replica epoch to drop stale responses.
1847    fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse<T>) {
1848        // Filter responses from non-existing or stale replicas.
1849        if self
1850            .replicas
1851            .get(&replica_id)
1852            .filter(|replica| replica.epoch == epoch)
1853            .is_none()
1854        {
1855            return;
1856        }
1857
1858        // Invariant: the replica exists and has the expected epoch.
1859
1860        match response {
1861            ComputeResponse::Frontiers(id, frontiers) => {
1862                self.handle_frontiers_response(id, frontiers, replica_id);
1863            }
1864            ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
1865                self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
1866            }
1867            ComputeResponse::CopyToResponse(id, response) => {
1868                self.handle_copy_to_response(id, response, replica_id);
1869            }
1870            ComputeResponse::SubscribeResponse(id, response) => {
1871                self.handle_subscribe_response(id, response, replica_id);
1872            }
1873            ComputeResponse::Status(response) => {
1874                self.handle_status_response(response, replica_id);
1875            }
1876        }
1877    }
1878
1879    /// Handle new frontiers, returning any compute response that needs to
1880    /// be sent to the client.
1881    fn handle_frontiers_response(
1882        &mut self,
1883        id: GlobalId,
1884        frontiers: FrontiersResponse<T>,
1885        replica_id: ReplicaId,
1886    ) {
1887        if !self.collections.contains_key(&id) {
1888            soft_panic_or_log!(
1889                "frontiers update for an unknown collection \
1890                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1891            );
1892            return;
1893        }
1894        let Some(replica) = self.replicas.get_mut(&replica_id) else {
1895            soft_panic_or_log!(
1896                "frontiers update for an unknown replica \
1897                 (replica_id={replica_id}, frontiers={frontiers:?})"
1898            );
1899            return;
1900        };
1901        let Some(replica_collection) = replica.collections.get_mut(&id) else {
1902            soft_panic_or_log!(
1903                "frontiers update for an unknown replica collection \
1904                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1905            );
1906            return;
1907        };
1908
1909        if let Some(new_frontier) = frontiers.input_frontier {
1910            replica_collection.update_input_frontier(new_frontier.clone());
1911        }
1912        if let Some(new_frontier) = frontiers.output_frontier {
1913            replica_collection.update_output_frontier(new_frontier.clone());
1914        }
1915        if let Some(new_frontier) = frontiers.write_frontier {
1916            replica_collection.update_write_frontier(new_frontier.clone());
1917            self.maybe_update_global_write_frontier(id, new_frontier);
1918        }
1919    }
1920
1921    #[mz_ore::instrument(level = "debug")]
1922    fn handle_peek_response(
1923        &mut self,
1924        uuid: Uuid,
1925        response: PeekResponse,
1926        otel_ctx: OpenTelemetryContext,
1927        replica_id: ReplicaId,
1928    ) {
1929        otel_ctx.attach_as_parent();
1930
1931        // We might not be tracking this peek anymore, because we have served a response already or
1932        // because it was canceled. If this is the case, we ignore the response.
1933        let Some(peek) = self.peeks.get(&uuid) else {
1934            return;
1935        };
1936
1937        // If the peek is targeting a replica, ignore responses from other replicas.
1938        let target_replica = peek.target_replica.unwrap_or(replica_id);
1939        if target_replica != replica_id {
1940            return;
1941        }
1942
1943        let duration = peek.requested_at.elapsed();
1944        self.metrics.observe_peek_response(&response, duration);
1945
1946        let notification = PeekNotification::new(&response, peek.offset, peek.limit);
1947        // NOTE: We use the `otel_ctx` from the response, not the pending peek, because we
1948        // currently want the parent to be whatever the compute worker did with this peek.
1949        self.deliver_response(ComputeControllerResponse::PeekNotification(
1950            uuid,
1951            notification,
1952            otel_ctx,
1953        ));
1954
1955        self.finish_peek(uuid, response)
1956    }
1957
1958    fn handle_copy_to_response(
1959        &mut self,
1960        sink_id: GlobalId,
1961        response: CopyToResponse,
1962        replica_id: ReplicaId,
1963    ) {
1964        if !self.collections.contains_key(&sink_id) {
1965            soft_panic_or_log!(
1966                "received response for an unknown copy-to \
1967                 (sink_id={sink_id}, replica_id={replica_id})",
1968            );
1969            return;
1970        }
1971        let Some(replica) = self.replicas.get_mut(&replica_id) else {
1972            soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
1973            return;
1974        };
1975        let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
1976            soft_panic_or_log!(
1977                "copy-to response for an unknown replica collection \
1978                 (sink_id={sink_id}, replica_id={replica_id})"
1979            );
1980            return;
1981        };
1982
1983        // Downgrade the replica frontiers, to enable dropping of input read holds and clean up of
1984        // collection state.
1985        // TODO(database-issues#4701): report copy-to frontiers through `Frontiers` responses
1986        replica_collection.update_write_frontier(Antichain::new());
1987        replica_collection.update_input_frontier(Antichain::new());
1988        replica_collection.update_output_frontier(Antichain::new());
1989
1990        // We might not be tracking this COPY TO because we have already returned a response
1991        // from one of the replicas. In that case, we ignore the response.
1992        if !self.copy_tos.remove(&sink_id) {
1993            return;
1994        }
1995
1996        let result = match response {
1997            CopyToResponse::RowCount(count) => Ok(count),
1998            CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
1999            // We should never get here: Replicas only drop copy to collections in response
2000            // to the controller allowing them to do so, and when the controller drops a
2001            // copy to it also removes it from the list of tracked copy_tos (see
2002            // [`Instance::drop_collections`]).
2003            CopyToResponse::Dropped => {
2004                tracing::error!(
2005                    %sink_id, %replica_id,
2006                    "received `Dropped` response for a tracked copy to",
2007                );
2008                return;
2009            }
2010        };
2011
2012        self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2013    }
2014
2015    fn handle_subscribe_response(
2016        &mut self,
2017        subscribe_id: GlobalId,
2018        response: SubscribeResponse<T>,
2019        replica_id: ReplicaId,
2020    ) {
2021        if !self.collections.contains_key(&subscribe_id) {
2022            soft_panic_or_log!(
2023                "received response for an unknown subscribe \
2024                 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2025            );
2026            return;
2027        }
2028        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2029            soft_panic_or_log!(
2030                "subscribe response for an unknown replica (replica_id={replica_id})"
2031            );
2032            return;
2033        };
2034        let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2035            soft_panic_or_log!(
2036                "subscribe response for an unknown replica collection \
2037                 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2038            );
2039            return;
2040        };
2041
2042        // Always apply replica write frontier updates. Even if the subscribe is not tracked
2043        // anymore, there might still be replicas reading from its inputs, so we need to track the
2044        // frontiers until all replicas have advanced to the empty one.
2045        let write_frontier = match &response {
2046            SubscribeResponse::Batch(batch) => batch.upper.clone(),
2047            SubscribeResponse::DroppedAt(_) => Antichain::new(),
2048        };
2049
2050        // For subscribes we downgrade all replica frontiers based on write frontiers. This should
2051        // be fine because the input and output frontier of a subscribe track its write frontier.
2052        // TODO(database-issues#4701): report subscribe frontiers through `Frontiers` responses
2053        replica_collection.update_write_frontier(write_frontier.clone());
2054        replica_collection.update_input_frontier(write_frontier.clone());
2055        replica_collection.update_output_frontier(write_frontier.clone());
2056
2057        // If the subscribe is not tracked, or targets a different replica, there is nothing to do.
2058        let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2059            return;
2060        };
2061
2062        // Apply a global frontier update.
2063        // If this is a replica-targeted subscribe, it is important that we advance the global
2064        // frontier only based on responses from the targeted replica. Otherwise, another replica
2065        // could advance to the empty frontier, making us drop the subscribe on the targeted
2066        // replica prematurely.
2067        self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2068
2069        match response {
2070            SubscribeResponse::Batch(batch) => {
2071                let upper = batch.upper;
2072                let mut updates = batch.updates;
2073
2074                // If this batch advances the subscribe's frontier, we emit all updates at times
2075                // greater or equal to the last frontier (to avoid emitting duplicate updates).
2076                if PartialOrder::less_than(&subscribe.frontier, &upper) {
2077                    let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2078
2079                    if upper.is_empty() {
2080                        // This subscribe cannot produce more data. Stop tracking it.
2081                        self.subscribes.remove(&subscribe_id);
2082                    } else {
2083                        // This subscribe can produce more data. Update our tracking of it.
2084                        self.subscribes.insert(subscribe_id, subscribe);
2085                    }
2086
2087                    if let Ok(updates) = updates.as_mut() {
2088                        updates.retain(|(time, _data, _diff)| lower.less_equal(time));
2089                    }
2090                    self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2091                        subscribe_id,
2092                        SubscribeBatch {
2093                            lower,
2094                            upper,
2095                            updates,
2096                        },
2097                    ));
2098                }
2099            }
2100            SubscribeResponse::DroppedAt(frontier) => {
2101                // We should never get here: Replicas only drop subscribe collections in response
2102                // to the controller allowing them to do so, and when the controller drops a
2103                // subscribe it also removes it from the list of tracked subscribes (see
2104                // [`Instance::drop_collections`]).
2105                tracing::error!(
2106                    %subscribe_id,
2107                    %replica_id,
2108                    frontier = ?frontier.elements(),
2109                    "received `DroppedAt` response for a tracked subscribe",
2110                );
2111                self.subscribes.remove(&subscribe_id);
2112            }
2113        }
2114    }
2115
2116    fn handle_status_response(&self, response: StatusResponse, _replica_id: ReplicaId) {
2117        match response {
2118            StatusResponse::Placeholder => {}
2119        }
2120    }
2121
2122    /// Return the write frontiers of the dependencies of the given collection.
2123    fn dependency_write_frontiers<'b>(
2124        &'b self,
2125        collection: &'b CollectionState<T>,
2126    ) -> impl Iterator<Item = Antichain<T>> + 'b {
2127        let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2128            let collection = self.collections.get(&dep_id);
2129            collection.map(|c| c.write_frontier())
2130        });
2131        let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2132            let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2133            frontiers.map(|f| f.write_frontier)
2134        });
2135
2136        compute_frontiers.chain(storage_frontiers)
2137    }
2138
2139    /// Return the write frontiers of transitive storage dependencies of the given collection.
2140    fn transitive_storage_dependency_write_frontiers<'b>(
2141        &'b self,
2142        collection: &'b CollectionState<T>,
2143    ) -> impl Iterator<Item = Antichain<T>> + 'b {
2144        let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2145        let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2146        let mut done = BTreeSet::new();
2147
2148        while let Some(id) = todo.pop() {
2149            if done.contains(&id) {
2150                continue;
2151            }
2152            if let Some(dep) = self.collections.get(&id) {
2153                storage_ids.extend(dep.storage_dependency_ids());
2154                todo.extend(dep.compute_dependency_ids())
2155            }
2156            done.insert(id);
2157        }
2158
2159        let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2160            let frontiers = self.storage_collections.collection_frontiers(id).ok();
2161            frontiers.map(|f| f.write_frontier)
2162        });
2163
2164        storage_frontiers
2165    }
2166
2167    /// Downgrade the warmup capabilities of collections as much as possible.
2168    ///
2169    /// The only requirement we have for a collection's warmup capability is that it is for a time
2170    /// that is available in all of the collection's inputs. For each input the latest time that is
2171    /// the case for is `write_frontier - 1`. So the farthest we can downgrade a collection's
2172    /// warmup capability is the minimum of `write_frontier - 1` of all its inputs.
2173    ///
2174    /// This method expects to be periodically called as part of instance maintenance work.
2175    /// We would like to instead update the warmup capabilities synchronously in response to
2176    /// frontier updates of dependency collections, but that is not generally possible because we
2177    /// don't learn about frontier updates of storage collections synchronously. We could do
2178    /// synchronous updates for compute dependencies, but we refrain from doing for simplicity.
2179    fn downgrade_warmup_capabilities(&mut self) {
2180        let mut new_capabilities = BTreeMap::new();
2181        for (id, collection) in &self.collections {
2182            // For write-only collections that have advanced to the empty frontier, we can drop the
2183            // warmup capability entirely. There is no reason why we would need to hydrate those
2184            // collections again, so being able to warm them up is not useful.
2185            if collection.read_policy.is_none()
2186                && collection.shared.lock_write_frontier(|f| f.is_empty())
2187            {
2188                new_capabilities.insert(*id, Antichain::new());
2189                continue;
2190            }
2191
2192            let mut new_capability = Antichain::new();
2193            for frontier in self.dependency_write_frontiers(collection) {
2194                for time in frontier {
2195                    new_capability.insert(time.step_back().unwrap_or(time));
2196                }
2197            }
2198
2199            new_capabilities.insert(*id, new_capability);
2200        }
2201
2202        for (id, new_capability) in new_capabilities {
2203            let collection = self.expect_collection_mut(id);
2204            let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2205        }
2206    }
2207
2208    /// Forward the implied capabilities of collections, if possible.
2209    ///
2210    /// The implied capability of a collection controls (a) which times are still readable (for
2211    /// indexes) and (b) with which as-of the collection gets installed on a new replica. We are
2212    /// usually not allowed to advance an implied capability beyond the frontier that follows from
2213    /// the collection's read policy applied to its write frontier:
2214    ///
2215    ///  * For sink collections, some external consumer might rely on seeing all distinct times in
2216    ///    the input reflected in the output. If we'd forward the implied capability of a sink,
2217    ///    we'd risk skipping times in the output across replica restarts.
2218    ///  * For index collections, we might make the index unreadable by advancing its read frontier
2219    ///    beyond its write frontier.
2220    ///
2221    /// There is one case where forwarding an implied capability is fine though: an index installed
2222    /// on a cluster that has no replicas. Such indexes are not readable anyway until a new replica
2223    /// is added, so advancing its read frontier can't make it unreadable. We can thus advance the
2224    /// implied capability as long as we make sure that when a new replica is added, the expected
2225    /// relationship between write frontier, read policy, and implied capability can be restored
2226    /// immediately (modulo computation time).
2227    ///
2228    /// Forwarding implied capabilities is not necessary for the correct functioning of the
2229    /// controller but an optimization that is beneficial in two ways:
2230    ///
2231    ///  * It relaxes read holds on inputs to forwarded collections, allowing their compaction.
2232    ///  * It reduces the amount of historical detail new replicas need to process when computing
2233    ///    forwarded collections, as forwarding the implied capability also forwards the corresponding
2234    ///    dataflow as-of.
2235    fn forward_implied_capabilities(&mut self) {
2236        if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2237            return;
2238        }
2239        if !self.replicas.is_empty() {
2240            return;
2241        }
2242
2243        let mut new_capabilities = BTreeMap::new();
2244        for (id, collection) in &self.collections {
2245            let Some(read_policy) = &collection.read_policy else {
2246                // Collection is write-only, i.e. a sink.
2247                continue;
2248            };
2249
2250            // When a new replica is started, it will immediately be able to compute all collection
2251            // output up to the write frontier of its transitive storage inputs. So the new implied
2252            // read capability should be the read policy applied to that frontier.
2253            let mut dep_frontier = Antichain::new();
2254            for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2255                dep_frontier.extend(frontier);
2256            }
2257
2258            let new_capability = read_policy.frontier(dep_frontier.borrow());
2259            if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2260                new_capabilities.insert(*id, new_capability);
2261            }
2262        }
2263
2264        for (id, new_capability) in new_capabilities {
2265            let collection = self.expect_collection_mut(id);
2266            let _ = collection.implied_read_hold.try_downgrade(new_capability);
2267        }
2268    }
2269
2270    /// Acquires a `ReadHold` for the identified compute collection.
2271    ///
2272    /// This mirrors the logic used by the controller-side `InstanceState::acquire_read_hold`,
2273    /// but executes on the instance task itself.
2274    pub(super) fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
2275        // Similarly to InstanceState::acquire_read_hold and StorageCollections::acquire_read_holds,
2276        // we acquire read holds at the earliest possible time rather than returning a copy
2277        // of the implied read hold. This is so that dependents can acquire read holds on
2278        // compute dependencies at frontiers that are held back by other read holds the caller
2279        // has previously taken.
2280        let collection = self.collection(id)?;
2281        let since = collection.shared.lock_read_capabilities(|caps| {
2282            let since = caps.frontier().to_owned();
2283            caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
2284            since
2285        });
2286        let hold = ReadHold::new(id, since, Arc::clone(&self.read_hold_tx));
2287        Ok(hold)
2288    }
2289
2290    /// Process pending maintenance work.
2291    ///
2292    /// This method is invoked periodically by the global controller.
2293    /// It is a good place to perform maintenance work that arises from various controller state
2294    /// changes and that cannot conveniently be handled synchronously with those state changes.
2295    #[mz_ore::instrument(level = "debug")]
2296    pub fn maintain(&mut self) {
2297        self.rehydrate_failed_replicas();
2298        self.downgrade_warmup_capabilities();
2299        self.forward_implied_capabilities();
2300        self.schedule_collections();
2301        self.cleanup_collections();
2302        self.update_frontier_introspection();
2303        self.refresh_state_metrics();
2304        self.refresh_wallclock_lag();
2305    }
2306}
2307
2308/// State maintained about individual compute collections.
2309///
2310/// A compute collection is either an index, or a storage sink, or a subscribe, exported by a
2311/// compute dataflow.
2312#[derive(Debug)]
2313struct CollectionState<T: ComputeControllerTimestamp> {
2314    /// If set, this collection is only maintained by the specified replica.
2315    target_replica: Option<ReplicaId>,
2316    /// Whether this collection is a log collection.
2317    ///
2318    /// Log collections are special in that they are only maintained by a subset of all replicas.
2319    log_collection: bool,
2320    /// Whether this collection has been dropped by a controller client.
2321    ///
2322    /// The controller is allowed to remove the `CollectionState` for a collection only when
2323    /// `dropped == true`. Otherwise, clients might still expect to be able to query information
2324    /// about this collection.
2325    dropped: bool,
2326    /// Whether this collection has been scheduled, i.e., the controller has sent a `Schedule`
2327    /// command for it.
2328    scheduled: bool,
2329
2330    /// Whether this collection is in read-only mode.
2331    ///
2332    /// When in read-only mode, the dataflow is not allowed to affect external state (largely persist).
2333    read_only: bool,
2334
2335    /// State shared with the `ComputeController`.
2336    shared: SharedCollectionState<T>,
2337
2338    /// A read hold maintaining the implicit capability of the collection.
2339    ///
2340    /// This capability is kept to ensure that the collection remains readable according to its
2341    /// `read_policy`. It also ensures that read holds on the collection's dependencies are kept at
2342    /// some time not greater than the collection's `write_frontier`, guaranteeing that the
2343    /// collection's next outputs can always be computed without skipping times.
2344    implied_read_hold: ReadHold<T>,
2345    /// A read hold held to enable dataflow warmup.
2346    ///
2347    /// Dataflow warmup is an optimization that allows dataflows to immediately start hydrating
2348    /// even when their next output time (as implied by the `write_frontier`) is in the future.
2349    /// By installing a read capability derived from the write frontiers of the collection's
2350    /// inputs, we ensure that the as-of of new dataflows installed for the collection is at a time
2351    /// that is immediately available, so hydration can begin immediately too.
2352    warmup_read_hold: ReadHold<T>,
2353    /// The policy to use to downgrade `self.implied_read_hold`.
2354    ///
2355    /// If `None`, the collection is a write-only collection (i.e. a sink). For write-only
2356    /// collections, the `implied_read_hold` is only required for maintaining read holds on the
2357    /// inputs, so we can immediately downgrade it to the `write_frontier`.
2358    read_policy: Option<ReadPolicy<T>>,
2359
2360    /// Storage identifiers on which this collection depends, and read holds this collection
2361    /// requires on them.
2362    storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2363    /// Compute identifiers on which this collection depends, and read holds this collection
2364    /// requires on them.
2365    compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2366
2367    /// Introspection state associated with this collection.
2368    introspection: CollectionIntrospection<T>,
2369
2370    /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
2371    /// introspection update.
2372    ///
2373    /// Keys are `(period, lag, labels)` triples, values are counts.
2374    ///
2375    /// If this is `None`, wallclock lag is not tracked for this collection.
2376    wallclock_lag_histogram_stash: Option<
2377        BTreeMap<
2378            (
2379                WallclockLagHistogramPeriod,
2380                WallclockLag,
2381                BTreeMap<&'static str, String>,
2382            ),
2383            Diff,
2384        >,
2385    >,
2386}
2387
2388impl<T: ComputeControllerTimestamp> CollectionState<T> {
2389    /// Creates a new collection state, with an initial read policy valid from `since`.
2390    fn new(
2391        collection_id: GlobalId,
2392        as_of: Antichain<T>,
2393        shared: SharedCollectionState<T>,
2394        storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2395        compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2396        read_hold_tx: read_holds::ChangeTx<T>,
2397        introspection: CollectionIntrospection<T>,
2398    ) -> Self {
2399        // A collection is not readable before the `as_of`.
2400        let since = as_of.clone();
2401        // A collection won't produce updates for times before the `as_of`.
2402        let upper = as_of;
2403
2404        // Ensure that the provided `shared` is valid for the given `as_of`.
2405        assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2406        assert!(shared.lock_write_frontier(|f| f == &upper));
2407
2408        // Initialize collection read holds.
2409        // Note that the implied read hold was already added to the `read_capabilities` when
2410        // `shared` was created, so we only need to add the warmup read hold here.
2411        let implied_read_hold =
2412            ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2413        let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2414
2415        let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2416        shared.lock_read_capabilities(|c| {
2417            c.update_iter(updates);
2418        });
2419
2420        // In an effort to keep the produced wallclock lag introspection data small and
2421        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
2422        // select indexes and subscribes.
2423        let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2424            true => None,
2425            false => Some(Default::default()),
2426        };
2427
2428        Self {
2429            target_replica: None,
2430            log_collection: false,
2431            dropped: false,
2432            scheduled: false,
2433            read_only: true,
2434            shared,
2435            implied_read_hold,
2436            warmup_read_hold,
2437            read_policy: Some(ReadPolicy::ValidFrom(since)),
2438            storage_dependencies,
2439            compute_dependencies,
2440            introspection,
2441            wallclock_lag_histogram_stash,
2442        }
2443    }
2444
2445    /// Creates a new collection state for a log collection.
2446    fn new_log_collection(
2447        id: GlobalId,
2448        shared: SharedCollectionState<T>,
2449        read_hold_tx: read_holds::ChangeTx<T>,
2450        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2451    ) -> Self {
2452        let since = Antichain::from_elem(T::minimum());
2453        let introspection = CollectionIntrospection::new(
2454            id,
2455            introspection_tx,
2456            since.clone(),
2457            false,
2458            None,
2459            None,
2460            Vec::new(),
2461        );
2462        let mut state = Self::new(
2463            id,
2464            since,
2465            shared,
2466            Default::default(),
2467            Default::default(),
2468            read_hold_tx,
2469            introspection,
2470        );
2471        state.log_collection = true;
2472        // Log collections are created and scheduled implicitly as part of replica initialization.
2473        state.scheduled = true;
2474        state
2475    }
2476
2477    /// Reports the current read frontier.
2478    fn read_frontier(&self) -> Antichain<T> {
2479        self.shared
2480            .lock_read_capabilities(|c| c.frontier().to_owned())
2481    }
2482
2483    /// Reports the current write frontier.
2484    fn write_frontier(&self) -> Antichain<T> {
2485        self.shared.lock_write_frontier(|f| f.clone())
2486    }
2487
2488    fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2489        self.storage_dependencies.keys().copied()
2490    }
2491
2492    fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2493        self.compute_dependencies.keys().copied()
2494    }
2495}
2496
2497/// Collection state shared with the `ComputeController`.
2498///
2499/// Having this allows certain controller APIs, such as `ComputeController::collection_frontiers`
2500/// and `ComputeController::acquire_read_hold` to be non-`async`. This comes at the cost of
2501/// complexity (by introducing shared mutable state) and performance (by introducing locking). We
2502/// should aim to reduce the amount of shared state over time, rather than expand it.
2503///
2504/// Note that [`SharedCollectionState`]s are initialized by the `ComputeController` prior to the
2505/// collection's creation in the [`Instance`]. This is to allow compute clients to query frontiers
2506/// and take new read holds immediately, without having to wait for the [`Instance`] to update.
2507#[derive(Clone, Debug)]
2508pub(super) struct SharedCollectionState<T> {
2509    /// Accumulation of read capabilities for the collection.
2510    ///
2511    /// This accumulation contains the capabilities held by all [`ReadHold`]s given out for the
2512    /// collection, including `implied_read_hold` and `warmup_read_hold`.
2513    ///
2514    /// NOTE: This field may only be modified by [`Instance::apply_read_hold_change`],
2515    /// [`Instance::acquire_read_hold`], and `ComputeController::acquire_read_hold`.
2516    /// Nobody else should modify read capabilities directly. Instead, collection users should
2517    /// manage read holds through [`ReadHold`] objects acquired through
2518    /// `ComputeController::acquire_read_hold`.
2519    ///
2520    /// TODO(teskje): Restructure the code to enforce the above in the type system.
2521    read_capabilities: Arc<Mutex<MutableAntichain<T>>>,
2522    /// The write frontier of this collection.
2523    write_frontier: Arc<Mutex<Antichain<T>>>,
2524}
2525
2526impl<T: Timestamp> SharedCollectionState<T> {
2527    pub fn new(as_of: Antichain<T>) -> Self {
2528        // A collection is not readable before the `as_of`.
2529        let since = as_of.clone();
2530        // A collection won't produce updates for times before the `as_of`.
2531        let upper = as_of;
2532
2533        // Initialize read capabilities to the `since`.
2534        // The is the implied read capability. The corresponding [`ReadHold`] is created in
2535        // [`CollectionState::new`].
2536        let mut read_capabilities = MutableAntichain::new();
2537        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2538
2539        Self {
2540            read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2541            write_frontier: Arc::new(Mutex::new(upper)),
2542        }
2543    }
2544
2545    pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2546    where
2547        F: FnOnce(&mut MutableAntichain<T>) -> R,
2548    {
2549        let mut caps = self.read_capabilities.lock().expect("poisoned");
2550        f(&mut *caps)
2551    }
2552
2553    pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2554    where
2555        F: FnOnce(&mut Antichain<T>) -> R,
2556    {
2557        let mut frontier = self.write_frontier.lock().expect("poisoned");
2558        f(&mut *frontier)
2559    }
2560}
2561
2562/// Manages certain introspection relations associated with a collection. Upon creation, it adds
2563/// rows to introspection relations. When dropped, it retracts its managed rows.
2564#[derive(Debug)]
2565struct CollectionIntrospection<T: ComputeControllerTimestamp> {
2566    /// The ID of the compute collection.
2567    collection_id: GlobalId,
2568    /// A channel through which introspection updates are delivered.
2569    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2570    /// Introspection state for `IntrospectionType::Frontiers`.
2571    ///
2572    /// `Some` if the collection does _not_ sink into a storage collection (i.e. is not an MV). If
2573    /// the collection sinks into storage, the storage controller reports its frontiers instead.
2574    frontiers: Option<FrontiersIntrospectionState<T>>,
2575    /// Introspection state for `IntrospectionType::ComputeMaterializedViewRefreshes`.
2576    ///
2577    /// `Some` if the collection is a REFRESH MV.
2578    refresh: Option<RefreshIntrospectionState<T>>,
2579    /// The IDs of the collection's dependencies, for `IntrospectionType::ComputeDependencies`.
2580    dependency_ids: Vec<GlobalId>,
2581}
2582
2583impl<T: ComputeControllerTimestamp> CollectionIntrospection<T> {
2584    fn new(
2585        collection_id: GlobalId,
2586        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2587        as_of: Antichain<T>,
2588        storage_sink: bool,
2589        initial_as_of: Option<Antichain<T>>,
2590        refresh_schedule: Option<RefreshSchedule>,
2591        dependency_ids: Vec<GlobalId>,
2592    ) -> Self {
2593        let refresh =
2594            match (refresh_schedule, initial_as_of) {
2595                (Some(refresh_schedule), Some(initial_as_of)) => Some(
2596                    RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2597                ),
2598                (refresh_schedule, _) => {
2599                    // If we have a `refresh_schedule`, then the collection is a MV, so we should also have
2600                    // an `initial_as_of`.
2601                    soft_assert_or_log!(
2602                        refresh_schedule.is_none(),
2603                        "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2604                    );
2605                    None
2606                }
2607            };
2608        let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2609
2610        let self_ = Self {
2611            collection_id,
2612            introspection_tx,
2613            frontiers,
2614            refresh,
2615            dependency_ids,
2616        };
2617
2618        self_.report_initial_state();
2619        self_
2620    }
2621
2622    /// Reports the initial introspection state.
2623    fn report_initial_state(&self) {
2624        if let Some(frontiers) = &self.frontiers {
2625            let row = frontiers.row_for_collection(self.collection_id);
2626            let updates = vec![(row, Diff::ONE)];
2627            self.send(IntrospectionType::Frontiers, updates);
2628        }
2629
2630        if let Some(refresh) = &self.refresh {
2631            let row = refresh.row_for_collection(self.collection_id);
2632            let updates = vec![(row, Diff::ONE)];
2633            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2634        }
2635
2636        if !self.dependency_ids.is_empty() {
2637            let updates = self.dependency_rows(Diff::ONE);
2638            self.send(IntrospectionType::ComputeDependencies, updates);
2639        }
2640    }
2641
2642    /// Produces rows for the `ComputeDependencies` introspection relation.
2643    fn dependency_rows(&self, diff: Diff) -> Vec<(Row, Diff)> {
2644        self.dependency_ids
2645            .iter()
2646            .map(|dependency_id| {
2647                let row = Row::pack_slice(&[
2648                    Datum::String(&self.collection_id.to_string()),
2649                    Datum::String(&dependency_id.to_string()),
2650                ]);
2651                (row, diff)
2652            })
2653            .collect()
2654    }
2655
2656    /// Observe the given current collection frontiers and update the introspection state as
2657    /// necessary.
2658    fn observe_frontiers(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2659        self.update_frontier_introspection(read_frontier, write_frontier);
2660        self.update_refresh_introspection(write_frontier);
2661    }
2662
2663    fn update_frontier_introspection(
2664        &mut self,
2665        read_frontier: &Antichain<T>,
2666        write_frontier: &Antichain<T>,
2667    ) {
2668        let Some(frontiers) = &mut self.frontiers else {
2669            return;
2670        };
2671
2672        if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2673        {
2674            return; // no change
2675        };
2676
2677        let retraction = frontiers.row_for_collection(self.collection_id);
2678        frontiers.update(read_frontier, write_frontier);
2679        let insertion = frontiers.row_for_collection(self.collection_id);
2680        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2681        self.send(IntrospectionType::Frontiers, updates);
2682    }
2683
2684    fn update_refresh_introspection(&mut self, write_frontier: &Antichain<T>) {
2685        let Some(refresh) = &mut self.refresh else {
2686            return;
2687        };
2688
2689        let retraction = refresh.row_for_collection(self.collection_id);
2690        refresh.frontier_update(write_frontier);
2691        let insertion = refresh.row_for_collection(self.collection_id);
2692
2693        if retraction == insertion {
2694            return; // no change
2695        }
2696
2697        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2698        self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2699    }
2700
2701    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2702        // Failure to send means the `ComputeController` has been dropped and doesn't care about
2703        // introspection updates anymore.
2704        let _ = self.introspection_tx.send((introspection_type, updates));
2705    }
2706}
2707
2708impl<T: ComputeControllerTimestamp> Drop for CollectionIntrospection<T> {
2709    fn drop(&mut self) {
2710        // Retract collection frontiers.
2711        if let Some(frontiers) = &self.frontiers {
2712            let row = frontiers.row_for_collection(self.collection_id);
2713            let updates = vec![(row, Diff::MINUS_ONE)];
2714            self.send(IntrospectionType::Frontiers, updates);
2715        }
2716
2717        // Retract MV refresh state.
2718        if let Some(refresh) = &self.refresh {
2719            let retraction = refresh.row_for_collection(self.collection_id);
2720            let updates = vec![(retraction, Diff::MINUS_ONE)];
2721            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2722        }
2723
2724        // Retract collection dependencies.
2725        if !self.dependency_ids.is_empty() {
2726            let updates = self.dependency_rows(Diff::MINUS_ONE);
2727            self.send(IntrospectionType::ComputeDependencies, updates);
2728        }
2729    }
2730}
2731
2732#[derive(Debug)]
2733struct FrontiersIntrospectionState<T> {
2734    read_frontier: Antichain<T>,
2735    write_frontier: Antichain<T>,
2736}
2737
2738impl<T: ComputeControllerTimestamp> FrontiersIntrospectionState<T> {
2739    fn new(as_of: Antichain<T>) -> Self {
2740        Self {
2741            read_frontier: as_of.clone(),
2742            write_frontier: as_of,
2743        }
2744    }
2745
2746    /// Return a `Row` reflecting the current collection frontiers.
2747    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2748        let read_frontier = self
2749            .read_frontier
2750            .as_option()
2751            .map_or(Datum::Null, |ts| ts.clone().into());
2752        let write_frontier = self
2753            .write_frontier
2754            .as_option()
2755            .map_or(Datum::Null, |ts| ts.clone().into());
2756        Row::pack_slice(&[
2757            Datum::String(&collection_id.to_string()),
2758            read_frontier,
2759            write_frontier,
2760        ])
2761    }
2762
2763    /// Update the introspection state with the given new frontiers.
2764    fn update(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2765        if read_frontier != &self.read_frontier {
2766            self.read_frontier.clone_from(read_frontier);
2767        }
2768        if write_frontier != &self.write_frontier {
2769            self.write_frontier.clone_from(write_frontier);
2770        }
2771    }
2772}
2773
2774/// Information needed to compute introspection updates for a REFRESH materialized view when the
2775/// write frontier advances.
2776#[derive(Debug)]
2777struct RefreshIntrospectionState<T> {
2778    // Immutable properties of the MV
2779    refresh_schedule: RefreshSchedule,
2780    initial_as_of: Antichain<T>,
2781    // Refresh state
2782    next_refresh: Datum<'static>,           // Null or an MzTimestamp
2783    last_completed_refresh: Datum<'static>, // Null or an MzTimestamp
2784}
2785
2786impl<T> RefreshIntrospectionState<T> {
2787    /// Return a `Row` reflecting the current refresh introspection state.
2788    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2789        Row::pack_slice(&[
2790            Datum::String(&collection_id.to_string()),
2791            self.last_completed_refresh,
2792            self.next_refresh,
2793        ])
2794    }
2795}
2796
2797impl<T: ComputeControllerTimestamp> RefreshIntrospectionState<T> {
2798    /// Construct a new [`RefreshIntrospectionState`], and apply an initial `frontier_update()` at
2799    /// the `upper`.
2800    fn new(
2801        refresh_schedule: RefreshSchedule,
2802        initial_as_of: Antichain<T>,
2803        upper: &Antichain<T>,
2804    ) -> Self {
2805        let mut self_ = Self {
2806            refresh_schedule: refresh_schedule.clone(),
2807            initial_as_of: initial_as_of.clone(),
2808            next_refresh: Datum::Null,
2809            last_completed_refresh: Datum::Null,
2810        };
2811        self_.frontier_update(upper);
2812        self_
2813    }
2814
2815    /// Should be called whenever the write frontier of the collection advances. It updates the
2816    /// state that should be recorded in introspection relations, but doesn't send the updates yet.
2817    fn frontier_update(&mut self, write_frontier: &Antichain<T>) {
2818        if write_frontier.is_empty() {
2819            self.last_completed_refresh =
2820                if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2821                    last_refresh.into()
2822                } else {
2823                    // If there is no last refresh, then we have a `REFRESH EVERY`, in which case
2824                    // the saturating roundup puts a refresh at the maximum possible timestamp.
2825                    T::maximum().into()
2826                };
2827            self.next_refresh = Datum::Null;
2828        } else {
2829            if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2830                // We are before the first refresh.
2831                self.last_completed_refresh = Datum::Null;
2832                let initial_as_of = self.initial_as_of.as_option().expect(
2833                    "initial_as_of can't be [], because then there would be no refreshes at all",
2834                );
2835                let first_refresh = initial_as_of
2836                    .round_up(&self.refresh_schedule)
2837                    .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2838                soft_assert_or_log!(
2839                    first_refresh == *initial_as_of,
2840                    "initial_as_of should be set to the first refresh"
2841                );
2842                self.next_refresh = first_refresh.into();
2843            } else {
2844                // The first refresh has already happened.
2845                let write_frontier = write_frontier.as_option().expect("checked above");
2846                self.last_completed_refresh = write_frontier
2847                    .round_down_minus_1(&self.refresh_schedule)
2848                    .map_or_else(
2849                        || {
2850                            soft_panic_or_log!(
2851                                "rounding down should have returned the first refresh or later"
2852                            );
2853                            Datum::Null
2854                        },
2855                        |last_completed_refresh| last_completed_refresh.into(),
2856                    );
2857                self.next_refresh = write_frontier.clone().into();
2858            }
2859        }
2860    }
2861}
2862
2863/// A note of an outstanding peek response.
2864#[derive(Debug)]
2865struct PendingPeek<T: Timestamp> {
2866    /// For replica-targeted peeks, this specifies the replica whose response we should pass on.
2867    ///
2868    /// If this value is `None`, we pass on the first response.
2869    target_replica: Option<ReplicaId>,
2870    /// The OpenTelemetry context for this peek.
2871    otel_ctx: OpenTelemetryContext,
2872    /// The time at which the peek was requested.
2873    ///
2874    /// Used to track peek durations.
2875    requested_at: Instant,
2876    /// The read hold installed to serve this peek.
2877    read_hold: ReadHold<T>,
2878    /// The channel to send peek results.
2879    peek_response_tx: oneshot::Sender<PeekResponse>,
2880    /// An optional limit of the peek's result size.
2881    limit: Option<usize>,
2882    /// The offset into the peek's result.
2883    offset: usize,
2884}
2885
2886#[derive(Debug, Clone)]
2887struct ActiveSubscribe<T> {
2888    /// Current upper frontier of this subscribe.
2889    frontier: Antichain<T>,
2890}
2891
2892impl<T: ComputeControllerTimestamp> Default for ActiveSubscribe<T> {
2893    fn default() -> Self {
2894        Self {
2895            frontier: Antichain::from_elem(T::minimum()),
2896        }
2897    }
2898}
2899
2900/// State maintained about individual replicas.
2901#[derive(Debug)]
2902struct ReplicaState<T: ComputeControllerTimestamp> {
2903    /// The ID of the replica.
2904    id: ReplicaId,
2905    /// Client for the running replica task.
2906    client: ReplicaClient<T>,
2907    /// The replica configuration.
2908    config: ReplicaConfig,
2909    /// Replica metrics.
2910    metrics: ReplicaMetrics,
2911    /// A channel through which introspection updates are delivered.
2912    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2913    /// Per-replica collection state.
2914    collections: BTreeMap<GlobalId, ReplicaCollectionState<T>>,
2915    /// The epoch of the replica.
2916    epoch: u64,
2917}
2918
2919impl<T: ComputeControllerTimestamp> ReplicaState<T> {
2920    fn new(
2921        id: ReplicaId,
2922        client: ReplicaClient<T>,
2923        config: ReplicaConfig,
2924        metrics: ReplicaMetrics,
2925        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2926        epoch: u64,
2927    ) -> Self {
2928        Self {
2929            id,
2930            client,
2931            config,
2932            metrics,
2933            introspection_tx,
2934            epoch,
2935            collections: Default::default(),
2936        }
2937    }
2938
2939    /// Add a collection to the replica state.
2940    ///
2941    /// # Panics
2942    ///
2943    /// Panics if a collection with the same ID exists already.
2944    fn add_collection(
2945        &mut self,
2946        id: GlobalId,
2947        as_of: Antichain<T>,
2948        input_read_holds: Vec<ReadHold<T>>,
2949    ) {
2950        let metrics = self.metrics.for_collection(id);
2951        let introspection = ReplicaCollectionIntrospection::new(
2952            self.id,
2953            id,
2954            self.introspection_tx.clone(),
2955            as_of.clone(),
2956        );
2957        let mut state =
2958            ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
2959
2960        // In an effort to keep the produced wallclock lag introspection data small and
2961        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
2962        // select indexes and subscribes.
2963        if id.is_transient() {
2964            state.wallclock_lag_max = None;
2965        }
2966
2967        if let Some(previous) = self.collections.insert(id, state) {
2968            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
2969        }
2970    }
2971
2972    /// Remove state for a collection.
2973    fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState<T>> {
2974        self.collections.remove(&id)
2975    }
2976
2977    /// Returns whether all replica frontiers of the given collection are empty.
2978    fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
2979        self.collections.get(&id).map_or(true, |c| {
2980            c.write_frontier.is_empty()
2981                && c.input_frontier.is_empty()
2982                && c.output_frontier.is_empty()
2983        })
2984    }
2985
2986    /// Returns the state of the [`ReplicaState`] formatted as JSON.
2987    ///
2988    /// The returned value is not guaranteed to be stable and may change at any point in time.
2989    #[mz_ore::instrument(level = "debug")]
2990    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2991        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
2992        // returned object as a tradeoff between usability and stability. `serde_json` will fail
2993        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
2994        // prevents a future unrelated change from silently breaking this method.
2995
2996        // Destructure `self` here so we don't forget to consider dumping newly added fields.
2997        let Self {
2998            id,
2999            client: _,
3000            config: _,
3001            metrics: _,
3002            introspection_tx: _,
3003            epoch,
3004            collections,
3005        } = self;
3006
3007        let collections: BTreeMap<_, _> = collections
3008            .iter()
3009            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
3010            .collect();
3011
3012        Ok(serde_json::json!({
3013            "id": id.to_string(),
3014            "collections": collections,
3015            "epoch": epoch,
3016        }))
3017    }
3018}
3019
3020#[derive(Debug)]
3021struct ReplicaCollectionState<T: ComputeControllerTimestamp> {
3022    /// The replica write frontier of this collection.
3023    ///
3024    /// See [`FrontiersResponse::write_frontier`].
3025    write_frontier: Antichain<T>,
3026    /// The replica input frontier of this collection.
3027    ///
3028    /// See [`FrontiersResponse::input_frontier`].
3029    input_frontier: Antichain<T>,
3030    /// The replica output frontier of this collection.
3031    ///
3032    /// See [`FrontiersResponse::output_frontier`].
3033    output_frontier: Antichain<T>,
3034
3035    /// Metrics tracked for this collection.
3036    ///
3037    /// If this is `None`, no metrics are collected.
3038    metrics: Option<ReplicaCollectionMetrics>,
3039    /// As-of frontier with which this collection was installed on the replica.
3040    as_of: Antichain<T>,
3041    /// Tracks introspection state for this collection.
3042    introspection: ReplicaCollectionIntrospection<T>,
3043    /// Read holds on storage inputs to this collection.
3044    ///
3045    /// These read holds are kept to ensure that the replica is able to read from storage inputs at
3046    /// all times it hasn't read yet. We only need to install read holds for storage inputs since
3047    /// compaction of compute inputs is implicitly held back by Timely/DD.
3048    input_read_holds: Vec<ReadHold<T>>,
3049
3050    /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
3051    ///
3052    /// If this is `None`, wallclock lag is not tracked for this collection.
3053    wallclock_lag_max: Option<WallclockLag>,
3054}
3055
3056impl<T: ComputeControllerTimestamp> ReplicaCollectionState<T> {
3057    fn new(
3058        metrics: Option<ReplicaCollectionMetrics>,
3059        as_of: Antichain<T>,
3060        introspection: ReplicaCollectionIntrospection<T>,
3061        input_read_holds: Vec<ReadHold<T>>,
3062    ) -> Self {
3063        Self {
3064            write_frontier: as_of.clone(),
3065            input_frontier: as_of.clone(),
3066            output_frontier: as_of.clone(),
3067            metrics,
3068            as_of,
3069            introspection,
3070            input_read_holds,
3071            wallclock_lag_max: Some(WallclockLag::MIN),
3072        }
3073    }
3074
3075    /// Returns whether this collection is hydrated.
3076    fn hydrated(&self) -> bool {
3077        // If the observed frontier is greater than the collection's as-of, the collection has
3078        // produced some output and is therefore hydrated.
3079        //
3080        // We need to consider the edge case where the as-of is the empty frontier. Such an as-of
3081        // is not useful for indexes, because they wouldn't be readable. For write-only
3082        // collections, an empty as-of means that the collection has been fully written and no new
3083        // dataflow needs to be created for it. Consequently, no hydration will happen either.
3084        //
3085        // Based on this, we could respond in two ways:
3086        //  * `false`, as in "the dataflow was never created"
3087        //  * `true`, as in "the dataflow completed immediately"
3088        //
3089        // Since hydration is often used as a measure of dataflow progress and we don't want to
3090        // give the impression that certain dataflows are somehow stuck when they are not, we go
3091        // with the second interpretation here.
3092        self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3093    }
3094
3095    /// Updates the replica write frontier of this collection.
3096    fn update_write_frontier(&mut self, new_frontier: Antichain<T>) {
3097        if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3098            soft_panic_or_log!(
3099                "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3100                self.write_frontier,
3101            );
3102            return;
3103        } else if new_frontier == self.write_frontier {
3104            return;
3105        }
3106
3107        self.write_frontier = new_frontier;
3108    }
3109
3110    /// Updates the replica input frontier of this collection.
3111    fn update_input_frontier(&mut self, new_frontier: Antichain<T>) {
3112        if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3113            soft_panic_or_log!(
3114                "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3115                self.input_frontier,
3116            );
3117            return;
3118        } else if new_frontier == self.input_frontier {
3119            return;
3120        }
3121
3122        self.input_frontier = new_frontier;
3123
3124        // Relax our read holds on collection inputs.
3125        for read_hold in &mut self.input_read_holds {
3126            let result = read_hold.try_downgrade(self.input_frontier.clone());
3127            soft_assert_or_log!(
3128                result.is_ok(),
3129                "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3130                self.input_frontier,
3131            );
3132        }
3133    }
3134
3135    /// Updates the replica output frontier of this collection.
3136    fn update_output_frontier(&mut self, new_frontier: Antichain<T>) {
3137        if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3138            soft_panic_or_log!(
3139                "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3140                self.output_frontier,
3141            );
3142            return;
3143        } else if new_frontier == self.output_frontier {
3144            return;
3145        }
3146
3147        self.output_frontier = new_frontier;
3148    }
3149}
3150
3151/// Maintains the introspection state for a given replica and collection, and ensures that reported
3152/// introspection data is retracted when the collection is dropped.
3153#[derive(Debug)]
3154struct ReplicaCollectionIntrospection<T: ComputeControllerTimestamp> {
3155    /// The ID of the replica.
3156    replica_id: ReplicaId,
3157    /// The ID of the compute collection.
3158    collection_id: GlobalId,
3159    /// The collection's reported replica write frontier.
3160    write_frontier: Antichain<T>,
3161    /// A channel through which introspection updates are delivered.
3162    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3163}
3164
3165impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
3166    /// Create a new `HydrationState` and initialize introspection.
3167    fn new(
3168        replica_id: ReplicaId,
3169        collection_id: GlobalId,
3170        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3171        as_of: Antichain<T>,
3172    ) -> Self {
3173        let self_ = Self {
3174            replica_id,
3175            collection_id,
3176            write_frontier: as_of,
3177            introspection_tx,
3178        };
3179
3180        self_.report_initial_state();
3181        self_
3182    }
3183
3184    /// Reports the initial introspection state.
3185    fn report_initial_state(&self) {
3186        let row = self.write_frontier_row();
3187        let updates = vec![(row, Diff::ONE)];
3188        self.send(IntrospectionType::ReplicaFrontiers, updates);
3189    }
3190
3191    /// Observe the given current write frontier and update the introspection state as necessary.
3192    fn observe_frontier(&mut self, write_frontier: &Antichain<T>) {
3193        if self.write_frontier == *write_frontier {
3194            return; // no change
3195        }
3196
3197        let retraction = self.write_frontier_row();
3198        self.write_frontier.clone_from(write_frontier);
3199        let insertion = self.write_frontier_row();
3200
3201        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3202        self.send(IntrospectionType::ReplicaFrontiers, updates);
3203    }
3204
3205    /// Return a `Row` reflecting the current replica write frontier.
3206    fn write_frontier_row(&self) -> Row {
3207        let write_frontier = self
3208            .write_frontier
3209            .as_option()
3210            .map_or(Datum::Null, |ts| ts.clone().into());
3211        Row::pack_slice(&[
3212            Datum::String(&self.collection_id.to_string()),
3213            Datum::String(&self.replica_id.to_string()),
3214            write_frontier,
3215        ])
3216    }
3217
3218    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3219        // Failure to send means the `ComputeController` has been dropped and doesn't care about
3220        // introspection updates anymore.
3221        let _ = self.introspection_tx.send((introspection_type, updates));
3222    }
3223}
3224
3225impl<T: ComputeControllerTimestamp> Drop for ReplicaCollectionIntrospection<T> {
3226    fn drop(&mut self) {
3227        // Retract the write frontier.
3228        let row = self.write_frontier_row();
3229        let updates = vec![(row, Diff::MINUS_ONE)];
3230        self.send(IntrospectionType::ReplicaFrontiers, updates);
3231    }
3232}