mz_compute_client/controller/
instance.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller for a compute instance.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::ComputeInstanceId;
21use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
22use mz_compute_types::plan::LirId;
23use mz_compute_types::plan::render_plan::RenderPlan;
24use mz_compute_types::sinks::{
25    ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, MaterializedViewSinkConnection,
26};
27use mz_compute_types::sources::SourceInstanceDesc;
28use mz_controller_types::dyncfgs::{
29    ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION,
30    WALLCLOCK_LAG_RECORDING_INTERVAL,
31};
32use mz_dyncfg::ConfigSet;
33use mz_expr::RowSetFinishing;
34use mz_ore::cast::CastFrom;
35use mz_ore::channel::instrumented_unbounded_channel;
36use mz_ore::now::NowFn;
37use mz_ore::tracing::OpenTelemetryContext;
38use mz_ore::{soft_assert_or_log, soft_panic_or_log};
39use mz_persist_types::PersistLocation;
40use mz_repr::adt::timestamp::CheckedTimestamp;
41use mz_repr::refresh_schedule::RefreshSchedule;
42use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row};
43use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
44use mz_storage_types::read_holds::{self, ReadHold};
45use mz_storage_types::read_policy::ReadPolicy;
46use serde::Serialize;
47use thiserror::Error;
48use timely::PartialOrder;
49use timely::progress::frontier::MutableAntichain;
50use timely::progress::{Antichain, ChangeBatch, Timestamp};
51use tokio::sync::mpsc::error::SendError;
52use tokio::sync::{mpsc, oneshot};
53use uuid::Uuid;
54
55use crate::controller::error::{
56    CollectionLookupError, CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
57};
58use crate::controller::replica::{ReplicaClient, ReplicaConfig};
59use crate::controller::{
60    ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, PeekNotification,
61    ReplicaId, StorageCollections,
62};
63use crate::logging::LogVariant;
64use crate::metrics::IntCounter;
65use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
66use crate::protocol::command::{
67    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
68};
69use crate::protocol::history::ComputeCommandHistory;
70use crate::protocol::response::{
71    ComputeResponse, CopyToResponse, FrontiersResponse, OperatorHydrationStatus, PeekResponse,
72    StatusResponse, SubscribeBatch, SubscribeResponse,
73};
74use crate::service::{ComputeClient, ComputeGrpcClient};
75
76#[derive(Error, Debug)]
77#[error("replica exists already: {0}")]
78pub(super) struct ReplicaExists(pub ReplicaId);
79
80#[derive(Error, Debug)]
81#[error("replica does not exist: {0}")]
82pub(super) struct ReplicaMissing(pub ReplicaId);
83
84#[derive(Error, Debug)]
85pub(super) enum DataflowCreationError {
86    #[error("collection does not exist: {0}")]
87    CollectionMissing(GlobalId),
88    #[error("replica does not exist: {0}")]
89    ReplicaMissing(ReplicaId),
90    #[error("dataflow definition lacks an as_of value")]
91    MissingAsOf,
92    #[error("subscribe dataflow has an empty as_of")]
93    EmptyAsOfForSubscribe,
94    #[error("copy to dataflow has an empty as_of")]
95    EmptyAsOfForCopyTo,
96    #[error("no read hold provided for dataflow import: {0}")]
97    ReadHoldMissing(GlobalId),
98    #[error("insufficient read hold provided for dataflow import: {0}")]
99    ReadHoldInsufficient(GlobalId),
100}
101
102impl From<CollectionMissing> for DataflowCreationError {
103    fn from(error: CollectionMissing) -> Self {
104        Self::CollectionMissing(error.0)
105    }
106}
107
108#[derive(Error, Debug)]
109pub(super) enum PeekError {
110    #[error("replica does not exist: {0}")]
111    ReplicaMissing(ReplicaId),
112    #[error("read hold ID does not match peeked collection: {0}")]
113    ReadHoldIdMismatch(GlobalId),
114    #[error("insufficient read hold provided: {0}")]
115    ReadHoldInsufficient(GlobalId),
116}
117
118#[derive(Error, Debug)]
119pub(super) enum ReadPolicyError {
120    #[error("collection does not exist: {0}")]
121    CollectionMissing(GlobalId),
122    #[error("collection is write-only: {0}")]
123    WriteOnlyCollection(GlobalId),
124}
125
126impl From<CollectionMissing> for ReadPolicyError {
127    fn from(error: CollectionMissing) -> Self {
128        Self::CollectionMissing(error.0)
129    }
130}
131
132/// A command sent to an [`Instance`] task.
133pub type Command<T> = Box<dyn FnOnce(&mut Instance<T>) + Send>;
134
135/// A client for an [`Instance`] task.
136#[derive(Clone, derivative::Derivative)]
137#[derivative(Debug)]
138pub(super) struct Client<T: ComputeControllerTimestamp> {
139    /// A sender for commands for the instance.
140    command_tx: mpsc::UnboundedSender<Command<T>>,
141    /// A sender for read hold changes for collections installed on the instance.
142    #[derivative(Debug = "ignore")]
143    read_hold_tx: read_holds::ChangeTx<T>,
144}
145
146impl<T: ComputeControllerTimestamp> Client<T> {
147    pub fn send(&self, command: Command<T>) -> Result<(), SendError<Command<T>>> {
148        self.command_tx.send(command)
149    }
150
151    pub fn read_hold_tx(&self) -> read_holds::ChangeTx<T> {
152        Arc::clone(&self.read_hold_tx)
153    }
154}
155
156impl<T> Client<T>
157where
158    T: ComputeControllerTimestamp,
159    ComputeGrpcClient: ComputeClient<T>,
160{
161    pub fn spawn(
162        id: ComputeInstanceId,
163        build_info: &'static BuildInfo,
164        storage: StorageCollections<T>,
165        peek_stash_persist_location: PersistLocation,
166        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
167        metrics: InstanceMetrics,
168        now: NowFn,
169        wallclock_lag: WallclockLagFn<T>,
170        dyncfg: Arc<ConfigSet>,
171        response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
172        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
173    ) -> Self {
174        let (command_tx, command_rx) = mpsc::unbounded_channel();
175
176        let read_hold_tx: read_holds::ChangeTx<_> = {
177            let command_tx = command_tx.clone();
178            Arc::new(move |id, change: ChangeBatch<_>| {
179                let cmd: Command<_> = {
180                    let change = change.clone();
181                    Box::new(move |i| i.apply_read_hold_change(id, change.clone()))
182                };
183                command_tx.send(cmd).map_err(|_| SendError((id, change)))
184            })
185        };
186
187        mz_ore::task::spawn(
188            || format!("compute-instance-{id}"),
189            Instance::new(
190                build_info,
191                storage,
192                peek_stash_persist_location,
193                arranged_logs,
194                metrics,
195                now,
196                wallclock_lag,
197                dyncfg,
198                command_rx,
199                response_tx,
200                Arc::clone(&read_hold_tx),
201                introspection_tx,
202            )
203            .run(),
204        );
205
206        Self {
207            command_tx,
208            read_hold_tx,
209        }
210    }
211}
212
213/// A response from a replica, composed of a replica ID, the replica's current epoch, and the
214/// compute response itself.
215pub(super) type ReplicaResponse<T> = (ReplicaId, u64, ComputeResponse<T>);
216
217/// The state we keep for a compute instance.
218pub(super) struct Instance<T: ComputeControllerTimestamp> {
219    /// Build info for spawning replicas
220    build_info: &'static BuildInfo,
221    /// A handle providing access to storage collections.
222    storage_collections: StorageCollections<T>,
223    /// Whether instance initialization has been completed.
224    initialized: bool,
225    /// Whether or not this instance is in read-only mode.
226    ///
227    /// When in read-only mode, neither the controller nor the instances
228    /// controlled by it are allowed to affect changes to external systems
229    /// (largely persist).
230    read_only: bool,
231    /// The workload class of this instance.
232    ///
233    /// This is currently only used to annotate metrics.
234    workload_class: Option<String>,
235    /// The replicas of this compute instance.
236    replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
237    /// Currently installed compute collections.
238    ///
239    /// New entries are added for all collections exported from dataflows created through
240    /// [`Instance::create_dataflow`].
241    ///
242    /// Entries are removed by [`Instance::cleanup_collections`]. See that method's documentation
243    /// about the conditions for removing collection state.
244    collections: BTreeMap<GlobalId, CollectionState<T>>,
245    /// IDs of log sources maintained by this compute instance.
246    log_sources: BTreeMap<LogVariant, GlobalId>,
247    /// Currently outstanding peeks.
248    ///
249    /// New entries are added for all peeks initiated through [`Instance::peek`].
250    ///
251    /// The entry for a peek is only removed once all replicas have responded to the peek. This is
252    /// currently required to ensure all replicas have stopped reading from the peeked collection's
253    /// inputs before we allow them to compact. database-issues#4822 tracks changing this so we only have to wait
254    /// for the first peek response.
255    peeks: BTreeMap<Uuid, PendingPeek<T>>,
256    /// Currently in-progress subscribes.
257    ///
258    /// New entries are added for all subscribes exported from dataflows created through
259    /// [`Instance::create_dataflow`].
260    ///
261    /// The entry for a subscribe is removed once at least one replica has reported the subscribe
262    /// to have advanced to the empty frontier or to have been dropped, implying that no further
263    /// updates will be emitted for this subscribe.
264    ///
265    /// Note that subscribes are tracked both in `collections` and `subscribes`. `collections`
266    /// keeps track of the subscribe's upper and since frontiers and ensures appropriate read holds
267    /// on the subscribe's input. `subscribes` is only used to track which updates have been
268    /// emitted, to decide if new ones should be emitted or suppressed.
269    subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>,
270    /// Tracks all in-progress COPY TOs.
271    ///
272    /// New entries are added for all s3 oneshot sinks (corresponding to a COPY TO) exported from
273    /// dataflows created through [`Instance::create_dataflow`].
274    ///
275    /// The entry for a copy to is removed once at least one replica has finished
276    /// or the exporting collection is dropped.
277    copy_tos: BTreeSet<GlobalId>,
278    /// The command history, used when introducing new replicas or restarting existing replicas.
279    history: ComputeCommandHistory<UIntGauge, T>,
280    /// Receiver for commands to be executed.
281    command_rx: mpsc::UnboundedReceiver<Command<T>>,
282    /// Sender for responses to be delivered.
283    response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
284    /// Sender for introspection updates to be recorded.
285    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
286    /// The registry the controller uses to report metrics.
287    metrics: InstanceMetrics,
288    /// Dynamic system configuration.
289    dyncfg: Arc<ConfigSet>,
290
291    /// The persist location where we can stash large peek results.
292    peek_stash_persist_location: PersistLocation,
293
294    /// A function that produces the current wallclock time.
295    now: NowFn,
296    /// A function that computes the lag between the given time and wallclock time.
297    wallclock_lag: WallclockLagFn<T>,
298    /// The last time wallclock lag introspection was recorded.
299    wallclock_lag_last_recorded: DateTime<Utc>,
300
301    /// Sender for updates to collection read holds.
302    ///
303    /// Copies of this sender are given to [`ReadHold`]s that are created in
304    /// [`CollectionState::new`].
305    read_hold_tx: read_holds::ChangeTx<T>,
306    /// A sender for responses from replicas.
307    replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
308    /// A receiver for responses from replicas.
309    replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse<T>, IntCounter>,
310}
311
312impl<T: ComputeControllerTimestamp> Instance<T> {
313    /// Acquire a handle to the collection state associated with `id`.
314    fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, CollectionMissing> {
315        self.collections.get(&id).ok_or(CollectionMissing(id))
316    }
317
318    /// Acquire a mutable handle to the collection state associated with `id`.
319    fn collection_mut(
320        &mut self,
321        id: GlobalId,
322    ) -> Result<&mut CollectionState<T>, CollectionMissing> {
323        self.collections.get_mut(&id).ok_or(CollectionMissing(id))
324    }
325
326    /// Acquire a handle to the collection state associated with `id`.
327    ///
328    /// # Panics
329    ///
330    /// Panics if the identified collection does not exist.
331    fn expect_collection(&self, id: GlobalId) -> &CollectionState<T> {
332        self.collections.get(&id).expect("collection must exist")
333    }
334
335    /// Acquire a mutable handle to the collection state associated with `id`.
336    ///
337    /// # Panics
338    ///
339    /// Panics if the identified collection does not exist.
340    fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T> {
341        self.collections
342            .get_mut(&id)
343            .expect("collection must exist")
344    }
345
346    fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState<T>)> {
347        self.collections.iter().map(|(id, coll)| (*id, coll))
348    }
349
350    /// Add a collection to the instance state.
351    ///
352    /// # Panics
353    ///
354    /// Panics if a collection with the same ID exists already.
355    fn add_collection(
356        &mut self,
357        id: GlobalId,
358        as_of: Antichain<T>,
359        shared: SharedCollectionState<T>,
360        storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
361        compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
362        replica_input_read_holds: Vec<ReadHold<T>>,
363        write_only: bool,
364        storage_sink: bool,
365        initial_as_of: Option<Antichain<T>>,
366        refresh_schedule: Option<RefreshSchedule>,
367    ) {
368        // Add global collection state.
369        let introspection = CollectionIntrospection::new(
370            id,
371            self.introspection_tx.clone(),
372            as_of.clone(),
373            storage_sink,
374            initial_as_of,
375            refresh_schedule,
376        );
377        let mut state = CollectionState::new(
378            id,
379            as_of.clone(),
380            shared,
381            storage_dependencies,
382            compute_dependencies,
383            Arc::clone(&self.read_hold_tx),
384            introspection,
385        );
386        // If the collection is write-only, clear its read policy to reflect that.
387        if write_only {
388            state.read_policy = None;
389        }
390
391        if let Some(previous) = self.collections.insert(id, state) {
392            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
393        }
394
395        // Add per-replica collection state.
396        for replica in self.replicas.values_mut() {
397            replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
398        }
399
400        // Update introspection.
401        self.report_dependency_updates(id, Diff::ONE);
402    }
403
404    fn remove_collection(&mut self, id: GlobalId) {
405        // Update introspection.
406        self.report_dependency_updates(id, Diff::MINUS_ONE);
407
408        // Remove per-replica collection state.
409        for replica in self.replicas.values_mut() {
410            replica.remove_collection(id);
411        }
412
413        // Remove global collection state.
414        self.collections.remove(&id);
415    }
416
417    fn add_replica_state(
418        &mut self,
419        id: ReplicaId,
420        client: ReplicaClient<T>,
421        config: ReplicaConfig,
422        epoch: u64,
423    ) {
424        let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
425
426        let metrics = self.metrics.for_replica(id);
427        let mut replica = ReplicaState::new(
428            id,
429            client,
430            config,
431            metrics,
432            self.introspection_tx.clone(),
433            epoch,
434        );
435
436        // Add per-replica collection state.
437        for (collection_id, collection) in &self.collections {
438            // Skip log collections not maintained by this replica.
439            if collection.log_collection && !log_ids.contains(collection_id) {
440                continue;
441            }
442
443            let as_of = if collection.log_collection {
444                // For log collections, we don't send a `CreateDataflow` command to the replica, so
445                // it doesn't know which as-of the controler chose and defaults to the minimum
446                // frontier instead. We need to initialize the controller-side tracking with the
447                // same frontier, to avoid observing regressions in the reported frontiers.
448                Antichain::from_elem(T::minimum())
449            } else {
450                collection.read_frontier().to_owned()
451            };
452
453            let input_read_holds = collection.storage_dependencies.values().cloned().collect();
454            replica.add_collection(*collection_id, as_of, input_read_holds);
455        }
456
457        self.replicas.insert(id, replica);
458    }
459
460    /// Enqueue the given response for delivery to the controller clients.
461    fn deliver_response(&self, response: ComputeControllerResponse<T>) {
462        // Failure to send means the `ComputeController` has been dropped and doesn't care about
463        // responses anymore.
464        let _ = self.response_tx.send(response);
465    }
466
467    /// Enqueue the given introspection updates for recording.
468    fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
469        // Failure to send means the `ComputeController` has been dropped and doesn't care about
470        // introspection updates anymore.
471        let _ = self.introspection_tx.send((type_, updates));
472    }
473
474    /// Returns whether the identified replica exists.
475    fn replica_exists(&self, id: ReplicaId) -> bool {
476        self.replicas.contains_key(&id)
477    }
478
479    /// Return the IDs of pending peeks targeting the specified replica.
480    fn peeks_targeting(
481        &self,
482        replica_id: ReplicaId,
483    ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)> {
484        self.peeks.iter().filter_map(move |(uuid, peek)| {
485            if peek.target_replica == Some(replica_id) {
486                Some((*uuid, peek))
487            } else {
488                None
489            }
490        })
491    }
492
493    /// Return the IDs of in-progress subscribes targeting the specified replica.
494    fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
495        self.subscribes.iter().filter_map(move |(id, subscribe)| {
496            let targeting = subscribe.target_replica == Some(replica_id);
497            targeting.then_some(*id)
498        })
499    }
500
501    /// Update introspection with the current collection frontiers.
502    ///
503    /// We could also do this directly in response to frontier changes, but doing it periodically
504    /// lets us avoid emitting some introspection updates that can be consolidated (e.g. a write
505    /// frontier updated immediately followed by a read frontier update).
506    ///
507    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
508    /// per second during normal operation.
509    fn update_frontier_introspection(&mut self) {
510        for collection in self.collections.values_mut() {
511            collection
512                .introspection
513                .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
514        }
515
516        for replica in self.replicas.values_mut() {
517            for collection in replica.collections.values_mut() {
518                collection
519                    .introspection
520                    .observe_frontier(&collection.write_frontier);
521            }
522        }
523    }
524
525    /// Refresh the controller state metrics for this instance.
526    ///
527    /// We could also do state metric updates directly in response to state changes, but that would
528    /// mean littering the code with metric update calls. Encapsulating state metric maintenance in
529    /// a single method is less noisy.
530    ///
531    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
532    /// per second during normal operation.
533    fn refresh_state_metrics(&self) {
534        let unscheduled_collections_count =
535            self.collections.values().filter(|c| !c.scheduled).count();
536        let connected_replica_count = self
537            .replicas
538            .values()
539            .filter(|r| r.client.is_connected())
540            .count();
541
542        self.metrics
543            .replica_count
544            .set(u64::cast_from(self.replicas.len()));
545        self.metrics
546            .collection_count
547            .set(u64::cast_from(self.collections.len()));
548        self.metrics
549            .collection_unscheduled_count
550            .set(u64::cast_from(unscheduled_collections_count));
551        self.metrics
552            .peek_count
553            .set(u64::cast_from(self.peeks.len()));
554        self.metrics
555            .subscribe_count
556            .set(u64::cast_from(self.subscribes.len()));
557        self.metrics
558            .copy_to_count
559            .set(u64::cast_from(self.copy_tos.len()));
560        self.metrics
561            .connected_replica_count
562            .set(u64::cast_from(connected_replica_count));
563    }
564
565    /// Refresh the wallclock lag introspection and metrics with the current lag values.
566    ///
567    /// This method produces wallclock lag metrics of two different shapes:
568    ///
569    /// * Histories: For each replica and each collection, we measure the lag of the write frontier
570    ///   behind the wallclock time every second. Every minute we emit the maximum lag observed
571    ///   over the last minute, together with the current time.
572    /// * Histograms: For each collection, we measure the lag of the write frontier behind
573    ///   wallclock time every second. Every minute we emit all lags observed over the last minute,
574    ///   together with the current histogram period.
575    ///
576    /// Histories are emitted to both Mz introspection and Prometheus, histograms only to
577    /// introspection. We treat lags of unreadable collections (i.e. collections that contain no
578    /// readable times) as undefined and set them to NULL in introspection and `u64::MAX` in
579    /// Prometheus.
580    ///
581    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
582    /// per second during normal operation.
583    fn refresh_wallclock_lag(&mut self) {
584        let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
585            Some(ts) => (self.wallclock_lag)(ts.clone()),
586            None => Duration::ZERO,
587        };
588
589        let now_ms = (self.now)();
590        let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
591        let histogram_labels = match &self.workload_class {
592            Some(wc) => [("workload_class", wc.clone())].into(),
593            None => BTreeMap::new(),
594        };
595
596        // First, iterate over all collections and collect histogram measurements.
597        // We keep a record of unreadable collections, so we can emit undefined lags for those here
598        // and below when we collect history measurements.
599        let mut unreadable_collections = BTreeSet::new();
600        for (id, collection) in &mut self.collections {
601            // We need to ask the storage controller for the read frontiers of storage collections.
602            let read_frontier = match self.storage_collections.collection_frontiers(*id) {
603                Ok(f) => f.read_capabilities,
604                Err(_) => collection.read_frontier(),
605            };
606            let write_frontier = collection.write_frontier();
607            let collection_unreadable = PartialOrder::less_equal(&write_frontier, &read_frontier);
608            if collection_unreadable {
609                unreadable_collections.insert(id);
610            }
611
612            if !ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION.get(&self.dyncfg) {
613                continue;
614            }
615
616            if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
617                let bucket = if collection_unreadable {
618                    WallclockLag::Undefined
619                } else {
620                    let lag = frontier_lag(&write_frontier);
621                    let lag = lag.as_secs().next_power_of_two();
622                    WallclockLag::Seconds(lag)
623                };
624
625                let key = (histogram_period, bucket, histogram_labels.clone());
626                *stash.entry(key).or_default() += Diff::ONE;
627            }
628        }
629
630        // Second, iterate over all per-replica collections and collect history measurements.
631        for replica in self.replicas.values_mut() {
632            for (id, collection) in &mut replica.collections {
633                let lag = if unreadable_collections.contains(&id) {
634                    WallclockLag::Undefined
635                } else {
636                    let lag = frontier_lag(&collection.write_frontier);
637                    WallclockLag::Seconds(lag.as_secs())
638                };
639
640                if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
641                    *wallclock_lag_max = (*wallclock_lag_max).max(lag);
642                }
643
644                if let Some(metrics) = &mut collection.metrics {
645                    // No way to specify values as undefined in Prometheus metrics, so we use the
646                    // maximum value instead.
647                    let secs = lag.unwrap_seconds_or(u64::MAX);
648                    metrics.wallclock_lag.observe(secs);
649                };
650            }
651        }
652
653        // Record lags to persist, if it's time.
654        self.maybe_record_wallclock_lag();
655    }
656
657    /// Produce new wallclock lag introspection updates, provided enough time has passed since the
658    /// last recording.
659    //
660    /// We emit new introspection updates if the system time has passed into a new multiple of the
661    /// recording interval (typically 1 minute) since the last refresh. The storage controller uses
662    /// the same approach, ensuring that both controllers commit their lags at roughly the same
663    /// time, avoiding confusion caused by inconsistencies.
664    fn maybe_record_wallclock_lag(&mut self) {
665        if self.read_only {
666            return;
667        }
668
669        let duration_trunc = |datetime: DateTime<_>, interval| {
670            let td = TimeDelta::from_std(interval).ok()?;
671            datetime.duration_trunc(td).ok()
672        };
673
674        let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
675        let now_dt = mz_ore::now::to_datetime((self.now)());
676        let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
677            soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
678            let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
679            duration_trunc(now_dt, *default).unwrap()
680        });
681        if now_trunc <= self.wallclock_lag_last_recorded {
682            return;
683        }
684
685        let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
686
687        let mut history_updates = Vec::new();
688        for (replica_id, replica) in &mut self.replicas {
689            for (collection_id, collection) in &mut replica.collections {
690                let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
691                    continue;
692                };
693
694                let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
695                let row = Row::pack_slice(&[
696                    Datum::String(&collection_id.to_string()),
697                    Datum::String(&replica_id.to_string()),
698                    max_lag.into_interval_datum(),
699                    Datum::TimestampTz(now_ts),
700                ]);
701                history_updates.push((row, Diff::ONE));
702            }
703        }
704        if !history_updates.is_empty() {
705            self.deliver_introspection_updates(
706                IntrospectionType::WallclockLagHistory,
707                history_updates,
708            );
709        }
710
711        let mut histogram_updates = Vec::new();
712        let mut row_buf = Row::default();
713        for (collection_id, collection) in &mut self.collections {
714            let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
715                continue;
716            };
717
718            for ((period, lag, labels), count) in std::mem::take(stash) {
719                let mut packer = row_buf.packer();
720                packer.extend([
721                    Datum::TimestampTz(period.start),
722                    Datum::TimestampTz(period.end),
723                    Datum::String(&collection_id.to_string()),
724                    lag.into_uint64_datum(),
725                ]);
726                let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
727                packer.push_dict(labels);
728
729                histogram_updates.push((row_buf.clone(), count));
730            }
731        }
732        if !histogram_updates.is_empty() {
733            self.deliver_introspection_updates(
734                IntrospectionType::WallclockLagHistogram,
735                histogram_updates,
736            );
737        }
738
739        self.wallclock_lag_last_recorded = now_trunc;
740    }
741
742    /// Report updates (inserts or retractions) to the identified collection's dependencies.
743    ///
744    /// # Panics
745    ///
746    /// Panics if the identified collection does not exist.
747    fn report_dependency_updates(&self, id: GlobalId, diff: Diff) {
748        let collection = self.expect_collection(id);
749        let dependencies = collection.dependency_ids();
750
751        let updates = dependencies
752            .map(|dependency_id| {
753                let row = Row::pack_slice(&[
754                    Datum::String(&id.to_string()),
755                    Datum::String(&dependency_id.to_string()),
756                ]);
757                (row, diff)
758            })
759            .collect();
760
761        self.deliver_introspection_updates(IntrospectionType::ComputeDependencies, updates);
762    }
763
764    /// Update the tracked hydration status for an operator according to a received status update.
765    fn update_operator_hydration_status(
766        &mut self,
767        replica_id: ReplicaId,
768        status: OperatorHydrationStatus,
769    ) {
770        let Some(replica) = self.replicas.get_mut(&replica_id) else {
771            tracing::error!(
772                %replica_id, ?status,
773                "status update for an unknown replica"
774            );
775            return;
776        };
777        let Some(collection) = replica.collections.get_mut(&status.collection_id) else {
778            tracing::error!(
779                %replica_id, ?status,
780                "status update for an unknown collection"
781            );
782            return;
783        };
784
785        collection.introspection.operator_hydrated(
786            status.lir_id,
787            status.worker_id,
788            status.hydrated,
789        );
790    }
791
792    /// Returns `true` if the given collection is hydrated on at least one
793    /// replica.
794    ///
795    /// This also returns `true` in case this cluster does not have any
796    /// replicas.
797    #[mz_ore::instrument(level = "debug")]
798    pub fn collection_hydrated(
799        &self,
800        collection_id: GlobalId,
801    ) -> Result<bool, CollectionLookupError> {
802        if self.replicas.is_empty() {
803            return Ok(true);
804        }
805
806        for replica_state in self.replicas.values() {
807            let collection_state = replica_state
808                .collections
809                .get(&collection_id)
810                .ok_or(CollectionLookupError::CollectionMissing(collection_id))?;
811
812            if collection_state.hydrated() {
813                return Ok(true);
814            }
815        }
816
817        Ok(false)
818    }
819
820    /// Returns `true` if each non-transient, non-excluded collection is hydrated on at
821    /// least one replica.
822    ///
823    /// This also returns `true` in case this cluster does not have any
824    /// replicas.
825    #[mz_ore::instrument(level = "debug")]
826    pub fn collections_hydrated_on_replicas(
827        &self,
828        target_replica_ids: Option<Vec<ReplicaId>>,
829        exclude_collections: &BTreeSet<GlobalId>,
830    ) -> Result<bool, HydrationCheckBadTarget> {
831        if self.replicas.is_empty() {
832            return Ok(true);
833        }
834        let mut all_hydrated = true;
835        let target_replicas: BTreeSet<ReplicaId> = self
836            .replicas
837            .keys()
838            .filter_map(|id| match target_replica_ids {
839                None => Some(id.clone()),
840                Some(ref ids) if ids.contains(id) => Some(id.clone()),
841                Some(_) => None,
842            })
843            .collect();
844        if let Some(targets) = target_replica_ids {
845            if target_replicas.is_empty() {
846                return Err(HydrationCheckBadTarget(targets));
847            }
848        }
849
850        for (id, _collection) in self.collections_iter() {
851            if id.is_transient() || exclude_collections.contains(&id) {
852                continue;
853            }
854
855            let mut collection_hydrated = false;
856            for replica_state in self.replicas.values() {
857                if !target_replicas.contains(&replica_state.id) {
858                    continue;
859                }
860                let collection_state = replica_state
861                    .collections
862                    .get(&id)
863                    .expect("missing collection state");
864
865                if collection_state.hydrated() {
866                    collection_hydrated = true;
867                    break;
868                }
869            }
870
871            if !collection_hydrated {
872                tracing::info!("collection {id} is not hydrated on any replica");
873                all_hydrated = false;
874                // We continue with our loop instead of breaking out early, so
875                // that we log all non-hydrated replicas.
876            }
877        }
878
879        Ok(all_hydrated)
880    }
881
882    /// Returns `true` if all non-transient, non-excluded collections are hydrated on at least one
883    /// replica.
884    ///
885    /// This also returns `true` in case this cluster does not have any
886    /// replicas.
887    #[mz_ore::instrument(level = "debug")]
888    pub fn collections_hydrated(&self, exclude_collections: &BTreeSet<GlobalId>) -> bool {
889        self.collections_hydrated_on_replicas(None, exclude_collections)
890            .expect("Cannot error if target_replica_ids is None")
891    }
892
893    /// Clean up collection state that is not needed anymore.
894    ///
895    /// Three conditions need to be true before we can remove state for a collection:
896    ///
897    ///  1. A client must have explicitly dropped the collection. If that is not the case, clients
898    ///     can still reasonably assume that the controller knows about the collection and can
899    ///     answer queries about it.
900    ///  2. There must be no outstanding read capabilities on the collection. As long as someone
901    ///     still holds read capabilities on a collection, we need to keep it around to be able
902    ///     to properly handle downgrading of said capabilities.
903    ///  3. All replica frontiers for the collection must have advanced to the empty frontier.
904    ///     Advancement to the empty frontiers signals that replicas are done computing the
905    ///     collection and that they won't send more `ComputeResponse`s for it. As long as we might
906    ///     receive responses for a collection we want to keep it around to be able to validate and
907    ///     handle these responses.
908    fn cleanup_collections(&mut self) {
909        let to_remove: Vec<_> = self
910            .collections_iter()
911            .filter(|(id, collection)| {
912                collection.dropped
913                    && collection.shared.lock_read_capabilities(|c| c.is_empty())
914                    && self
915                        .replicas
916                        .values()
917                        .all(|r| r.collection_frontiers_empty(*id))
918            })
919            .map(|(id, _collection)| id)
920            .collect();
921
922        for id in to_remove {
923            self.remove_collection(id);
924        }
925    }
926
927    /// Returns the state of the [`Instance`] formatted as JSON.
928    ///
929    /// The returned value is not guaranteed to be stable and may change at any point in time.
930    #[mz_ore::instrument(level = "debug")]
931    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
932        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
933        // returned object as a tradeoff between usability and stability. `serde_json` will fail
934        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
935        // prevents a future unrelated change from silently breaking this method.
936
937        // Destructure `self` here so we don't forget to consider dumping newly added fields.
938        let Self {
939            build_info: _,
940            storage_collections: _,
941            peek_stash_persist_location: _,
942            initialized,
943            read_only,
944            workload_class,
945            replicas,
946            collections,
947            log_sources: _,
948            peeks,
949            subscribes,
950            copy_tos,
951            history: _,
952            command_rx: _,
953            response_tx: _,
954            introspection_tx: _,
955            metrics: _,
956            dyncfg: _,
957            now: _,
958            wallclock_lag: _,
959            wallclock_lag_last_recorded,
960            read_hold_tx: _,
961            replica_tx: _,
962            replica_rx: _,
963        } = self;
964
965        fn field(
966            key: &str,
967            value: impl Serialize,
968        ) -> Result<(String, serde_json::Value), anyhow::Error> {
969            let value = serde_json::to_value(value)?;
970            Ok((key.to_string(), value))
971        }
972
973        let replicas: BTreeMap<_, _> = replicas
974            .iter()
975            .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
976            .collect::<Result<_, anyhow::Error>>()?;
977        let collections: BTreeMap<_, _> = collections
978            .iter()
979            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
980            .collect();
981        let peeks: BTreeMap<_, _> = peeks
982            .iter()
983            .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
984            .collect();
985        let subscribes: BTreeMap<_, _> = subscribes
986            .iter()
987            .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
988            .collect();
989        let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
990        let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
991
992        let map = serde_json::Map::from_iter([
993            field("initialized", initialized)?,
994            field("read_only", read_only)?,
995            field("workload_class", workload_class)?,
996            field("replicas", replicas)?,
997            field("collections", collections)?,
998            field("peeks", peeks)?,
999            field("subscribes", subscribes)?,
1000            field("copy_tos", copy_tos)?,
1001            field("wallclock_lag_last_recorded", wallclock_lag_last_recorded)?,
1002        ]);
1003        Ok(serde_json::Value::Object(map))
1004    }
1005}
1006
1007impl<T> Instance<T>
1008where
1009    T: ComputeControllerTimestamp,
1010    ComputeGrpcClient: ComputeClient<T>,
1011{
1012    fn new(
1013        build_info: &'static BuildInfo,
1014        storage: StorageCollections<T>,
1015        peek_stash_persist_location: PersistLocation,
1016        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
1017        metrics: InstanceMetrics,
1018        now: NowFn,
1019        wallclock_lag: WallclockLagFn<T>,
1020        dyncfg: Arc<ConfigSet>,
1021        command_rx: mpsc::UnboundedReceiver<Command<T>>,
1022        response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
1023        read_hold_tx: read_holds::ChangeTx<T>,
1024        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
1025    ) -> Self {
1026        let mut collections = BTreeMap::new();
1027        let mut log_sources = BTreeMap::new();
1028        for (log, id, shared) in arranged_logs {
1029            let collection = CollectionState::new_log_collection(
1030                id,
1031                shared,
1032                Arc::clone(&read_hold_tx),
1033                introspection_tx.clone(),
1034            );
1035            collections.insert(id, collection);
1036            log_sources.insert(log, id);
1037        }
1038
1039        let history = ComputeCommandHistory::new(metrics.for_history());
1040
1041        let send_count = metrics.response_send_count.clone();
1042        let recv_count = metrics.response_recv_count.clone();
1043        let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
1044
1045        let now_dt = mz_ore::now::to_datetime(now());
1046
1047        Self {
1048            build_info,
1049            storage_collections: storage,
1050            peek_stash_persist_location,
1051            initialized: false,
1052            read_only: true,
1053            workload_class: None,
1054            replicas: Default::default(),
1055            collections,
1056            log_sources,
1057            peeks: Default::default(),
1058            subscribes: Default::default(),
1059            copy_tos: Default::default(),
1060            history,
1061            command_rx,
1062            response_tx,
1063            introspection_tx,
1064            metrics,
1065            dyncfg,
1066            now,
1067            wallclock_lag,
1068            wallclock_lag_last_recorded: now_dt,
1069            read_hold_tx,
1070            replica_tx,
1071            replica_rx,
1072        }
1073    }
1074
1075    async fn run(mut self) {
1076        self.send(ComputeCommand::Hello {
1077            // The nonce is protocol iteration-specific and will be set in
1078            // `ReplicaTask::specialize_command`.
1079            nonce: Uuid::default(),
1080        });
1081
1082        let instance_config = InstanceConfig {
1083            peek_stash_persist_location: self.peek_stash_persist_location.clone(),
1084            // The remaining fields are replica-specific and will be set in
1085            // `ReplicaTask::specialize_command`.
1086            logging: Default::default(),
1087            expiration_offset: Default::default(),
1088        };
1089
1090        self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
1091
1092        loop {
1093            tokio::select! {
1094                command = self.command_rx.recv() => match command {
1095                    Some(cmd) => cmd(&mut self),
1096                    None => break,
1097                },
1098                response = self.replica_rx.recv() => match response {
1099                    Some(response) => self.handle_response(response),
1100                    None => unreachable!("self owns a sender side of the channel"),
1101                }
1102            }
1103        }
1104    }
1105
1106    /// Update instance configuration.
1107    #[mz_ore::instrument(level = "debug")]
1108    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
1109        if let Some(workload_class) = &config_params.workload_class {
1110            self.workload_class = workload_class.clone();
1111        }
1112
1113        let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
1114        self.send(command);
1115    }
1116
1117    /// Marks the end of any initialization commands.
1118    ///
1119    /// Intended to be called by `Controller`, rather than by other code.
1120    /// Calling this method repeatedly has no effect.
1121    #[mz_ore::instrument(level = "debug")]
1122    pub fn initialization_complete(&mut self) {
1123        // The compute protocol requires that `InitializationComplete` is sent only once.
1124        if !self.initialized {
1125            self.send(ComputeCommand::InitializationComplete);
1126            self.initialized = true;
1127        }
1128    }
1129
1130    /// Allows this instance to affect writes to external systems (persist).
1131    ///
1132    /// Calling this method repeatedly has no effect.
1133    #[mz_ore::instrument(level = "debug")]
1134    pub fn allow_writes(&mut self) {
1135        if self.read_only {
1136            self.read_only = false;
1137            self.send(ComputeCommand::AllowWrites);
1138        }
1139    }
1140
1141    /// Shut down this instance.
1142    ///
1143    /// This method runs various assertions ensuring the instance state is empty. It exists to help
1144    /// us find bugs where the client drops a compute instance that still has replicas or
1145    /// collections installed, and later assumes that said replicas/collections still exists.
1146    ///
1147    /// # Panics
1148    ///
1149    /// Panics if the compute instance still has active replicas.
1150    /// Panics if the compute instance still has collections installed.
1151    #[mz_ore::instrument(level = "debug")]
1152    pub fn shutdown(&mut self) {
1153        // Taking the `command_rx` ensures that the [`Instance::run`] loop terminates.
1154        let (_tx, rx) = mpsc::unbounded_channel();
1155        let mut command_rx = std::mem::replace(&mut self.command_rx, rx);
1156
1157        // Apply all outstanding read hold changes. This might cause read hold downgrades to be
1158        // added to `command_tx`, so we need to apply those in a loop.
1159        //
1160        // TODO(teskje): Make `Command` an enum and assert that all received commands are read
1161        // hold downgrades.
1162        while let Ok(cmd) = command_rx.try_recv() {
1163            cmd(self);
1164        }
1165
1166        // Collections might have been dropped but not cleaned up yet.
1167        self.cleanup_collections();
1168
1169        let stray_replicas: Vec<_> = self.replicas.keys().collect();
1170        soft_assert_or_log!(
1171            stray_replicas.is_empty(),
1172            "dropped instance still has provisioned replicas: {stray_replicas:?}",
1173        );
1174
1175        let collections = self.collections.iter();
1176        let stray_collections: Vec<_> = collections
1177            .filter(|(_, c)| !c.log_collection)
1178            .map(|(id, _)| id)
1179            .collect();
1180        soft_assert_or_log!(
1181            stray_collections.is_empty(),
1182            "dropped instance still has installed collections: {stray_collections:?}",
1183        );
1184    }
1185
1186    /// Sends a command to all replicas of this instance.
1187    #[mz_ore::instrument(level = "debug")]
1188    fn send(&mut self, cmd: ComputeCommand<T>) {
1189        // Record the command so that new replicas can be brought up to speed.
1190        self.history.push(cmd.clone());
1191
1192        // Clone the command for each active replica.
1193        for replica in self.replicas.values_mut() {
1194            // Swallow error, we'll notice because the replica task has stopped.
1195            let _ = replica.client.send(cmd.clone());
1196        }
1197    }
1198
1199    /// Add a new instance replica, by ID.
1200    #[mz_ore::instrument(level = "debug")]
1201    pub fn add_replica(
1202        &mut self,
1203        id: ReplicaId,
1204        mut config: ReplicaConfig,
1205        epoch: Option<u64>,
1206    ) -> Result<(), ReplicaExists> {
1207        if self.replica_exists(id) {
1208            return Err(ReplicaExists(id));
1209        }
1210
1211        config.logging.index_logs = self.log_sources.clone();
1212
1213        let epoch = epoch.unwrap_or(1);
1214        let metrics = self.metrics.for_replica(id);
1215        let client = ReplicaClient::spawn(
1216            id,
1217            self.build_info,
1218            config.clone(),
1219            epoch,
1220            metrics.clone(),
1221            Arc::clone(&self.dyncfg),
1222            self.replica_tx.clone(),
1223        );
1224
1225        // Take this opportunity to clean up the history we should present.
1226        self.history.reduce();
1227
1228        // Advance the uppers of source imports
1229        self.history.update_source_uppers(&self.storage_collections);
1230
1231        // Replay the commands at the client, creating new dataflow identifiers.
1232        for command in self.history.iter() {
1233            if client.send(command.clone()).is_err() {
1234                // We swallow the error here. On the next send, we will fail again, and
1235                // restart the connection as well as this rehydration.
1236                tracing::warn!("Replica {:?} connection terminated during hydration", id);
1237                break;
1238            }
1239        }
1240
1241        // Add replica to tracked state.
1242        self.add_replica_state(id, client, config, epoch);
1243
1244        Ok(())
1245    }
1246
1247    /// Remove an existing instance replica, by ID.
1248    #[mz_ore::instrument(level = "debug")]
1249    pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1250        self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1251
1252        // Subscribes targeting this replica either won't be served anymore (if the replica is
1253        // dropped) or might produce inconsistent output (if the target collection is an
1254        // introspection index). We produce an error to inform upstream.
1255        let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1256        for subscribe_id in to_drop {
1257            let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1258            let response = ComputeControllerResponse::SubscribeResponse(
1259                subscribe_id,
1260                SubscribeBatch {
1261                    lower: subscribe.frontier.clone(),
1262                    upper: subscribe.frontier,
1263                    updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1264                },
1265            );
1266            self.deliver_response(response);
1267        }
1268
1269        // Peeks targeting this replica might not be served anymore (if the replica is dropped).
1270        // If the replica has failed it might come back and respond to the peek later, but it still
1271        // seems like a good idea to cancel the peek to inform the caller about the failure. This
1272        // is consistent with how we handle targeted subscribes above.
1273        let mut peek_responses = Vec::new();
1274        let mut to_drop = Vec::new();
1275        for (uuid, peek) in self.peeks_targeting(id) {
1276            peek_responses.push(ComputeControllerResponse::PeekNotification(
1277                uuid,
1278                PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1279                peek.otel_ctx.clone(),
1280            ));
1281            to_drop.push(uuid);
1282        }
1283        for response in peek_responses {
1284            self.deliver_response(response);
1285        }
1286        for uuid in to_drop {
1287            let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1288            self.finish_peek(uuid, response);
1289        }
1290
1291        // We might have a chance to forward implied capabilities and reduce the cost of bringing
1292        // up the next replica, if the dropped replica was the only one in the cluster.
1293        self.forward_implied_capabilities();
1294
1295        Ok(())
1296    }
1297
1298    /// Rehydrate the given instance replica.
1299    ///
1300    /// # Panics
1301    ///
1302    /// Panics if the specified replica does not exist.
1303    fn rehydrate_replica(&mut self, id: ReplicaId) {
1304        let config = self.replicas[&id].config.clone();
1305        let epoch = self.replicas[&id].epoch + 1;
1306
1307        self.remove_replica(id).expect("replica must exist");
1308        let result = self.add_replica(id, config, Some(epoch));
1309
1310        match result {
1311            Ok(()) => (),
1312            Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1313        }
1314    }
1315
1316    /// Rehydrate any failed replicas of this instance.
1317    fn rehydrate_failed_replicas(&mut self) {
1318        let replicas = self.replicas.iter();
1319        let failed_replicas: Vec<_> = replicas
1320            .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1321            .collect();
1322
1323        for replica_id in failed_replicas {
1324            self.rehydrate_replica(replica_id);
1325        }
1326    }
1327
1328    /// Creates the described dataflow and initializes state for its output.
1329    ///
1330    /// This method expects a `DataflowDescription` with an `as_of` frontier specified, as well as
1331    /// for each imported collection a read hold in `import_read_holds` at at least the `as_of`.
1332    ///
1333    /// If a `subscribe_target_replica` is given, any subscribes exported by the dataflow are
1334    /// configured to target that replica, i.e., only subscribe responses sent by that replica are
1335    /// considered.
1336    #[mz_ore::instrument(level = "debug")]
1337    pub fn create_dataflow(
1338        &mut self,
1339        dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1340        import_read_holds: Vec<ReadHold<T>>,
1341        subscribe_target_replica: Option<ReplicaId>,
1342        mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState<T>>,
1343    ) -> Result<(), DataflowCreationError> {
1344        use DataflowCreationError::*;
1345
1346        if let Some(replica_id) = subscribe_target_replica {
1347            if !self.replica_exists(replica_id) {
1348                return Err(ReplicaMissing(replica_id));
1349            }
1350        }
1351
1352        // Simple sanity checks around `as_of`
1353        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1354        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1355            return Err(EmptyAsOfForSubscribe);
1356        }
1357        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1358            return Err(EmptyAsOfForCopyTo);
1359        }
1360
1361        // Collect all dependencies of the dataflow, and read holds on them at the `as_of`.
1362        let mut storage_dependencies = BTreeMap::new();
1363        let mut compute_dependencies = BTreeMap::new();
1364
1365        // When we install per-replica input read holds, we cannot use the `as_of` because of
1366        // reconciliation: Existing slow replicas might be reading from the inputs at times before
1367        // the `as_of` and we would rather not crash them by allowing their inputs to compact too
1368        // far. So instead we take read holds at the least time available.
1369        let mut replica_input_read_holds = Vec::new();
1370
1371        let mut import_read_holds: BTreeMap<_, _> =
1372            import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1373
1374        for &id in dataflow.source_imports.keys() {
1375            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1376            replica_input_read_holds.push(read_hold.clone());
1377
1378            read_hold
1379                .try_downgrade(as_of.clone())
1380                .map_err(|_| ReadHoldInsufficient(id))?;
1381            storage_dependencies.insert(id, read_hold);
1382        }
1383
1384        for &id in dataflow.index_imports.keys() {
1385            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1386            read_hold
1387                .try_downgrade(as_of.clone())
1388                .map_err(|_| ReadHoldInsufficient(id))?;
1389            compute_dependencies.insert(id, read_hold);
1390        }
1391
1392        // If the `as_of` is empty, we are not going to create a dataflow, so replicas won't read
1393        // from the inputs.
1394        if as_of.is_empty() {
1395            replica_input_read_holds = Default::default();
1396        }
1397
1398        // Install collection state for each of the exports.
1399        for export_id in dataflow.export_ids() {
1400            let shared = shared_collection_state
1401                .remove(&export_id)
1402                .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1403            let write_only = dataflow.sink_exports.contains_key(&export_id);
1404            let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1405
1406            self.add_collection(
1407                export_id,
1408                as_of.clone(),
1409                shared,
1410                storage_dependencies.clone(),
1411                compute_dependencies.clone(),
1412                replica_input_read_holds.clone(),
1413                write_only,
1414                storage_sink,
1415                dataflow.initial_storage_as_of.clone(),
1416                dataflow.refresh_schedule.clone(),
1417            );
1418
1419            // If the export is a storage sink, we can advance its write frontier to the write
1420            // frontier of the target storage collection.
1421            if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1422                self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1423            }
1424        }
1425
1426        // Initialize tracking of subscribes.
1427        for subscribe_id in dataflow.subscribe_ids() {
1428            self.subscribes
1429                .insert(subscribe_id, ActiveSubscribe::new(subscribe_target_replica));
1430        }
1431
1432        // Initialize tracking of copy tos.
1433        for copy_to_id in dataflow.copy_to_ids() {
1434            self.copy_tos.insert(copy_to_id);
1435        }
1436
1437        // Here we augment all imported sources and all exported sinks with the appropriate
1438        // storage metadata needed by the compute instance.
1439        let mut source_imports = BTreeMap::new();
1440        for (id, (si, monotonic, _upper)) in dataflow.source_imports {
1441            let frontiers = self
1442                .storage_collections
1443                .collection_frontiers(id)
1444                .expect("collection exists");
1445
1446            let collection_metadata = self
1447                .storage_collections
1448                .collection_metadata(id)
1449                .expect("we have a read hold on this collection");
1450
1451            let desc = SourceInstanceDesc {
1452                storage_metadata: collection_metadata.clone(),
1453                arguments: si.arguments,
1454                typ: si.typ.clone(),
1455            };
1456            source_imports.insert(id, (desc, monotonic, frontiers.write_frontier));
1457        }
1458
1459        let mut sink_exports = BTreeMap::new();
1460        for (id, se) in dataflow.sink_exports {
1461            let connection = match se.connection {
1462                ComputeSinkConnection::MaterializedView(conn) => {
1463                    let metadata = self
1464                        .storage_collections
1465                        .collection_metadata(id)
1466                        .map_err(|_| CollectionMissing(id))?
1467                        .clone();
1468                    let conn = MaterializedViewSinkConnection {
1469                        value_desc: conn.value_desc,
1470                        storage_metadata: metadata,
1471                    };
1472                    ComputeSinkConnection::MaterializedView(conn)
1473                }
1474                ComputeSinkConnection::ContinualTask(conn) => {
1475                    let metadata = self
1476                        .storage_collections
1477                        .collection_metadata(id)
1478                        .map_err(|_| DataflowCreationError::CollectionMissing(id))?
1479                        .clone();
1480                    let conn = ContinualTaskConnection {
1481                        input_id: conn.input_id,
1482                        storage_metadata: metadata,
1483                    };
1484                    ComputeSinkConnection::ContinualTask(conn)
1485                }
1486                ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1487                ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1488                    ComputeSinkConnection::CopyToS3Oneshot(conn)
1489                }
1490            };
1491            let desc = ComputeSinkDesc {
1492                from: se.from,
1493                from_desc: se.from_desc,
1494                connection,
1495                with_snapshot: se.with_snapshot,
1496                up_to: se.up_to,
1497                non_null_assertions: se.non_null_assertions,
1498                refresh_schedule: se.refresh_schedule,
1499            };
1500            sink_exports.insert(id, desc);
1501        }
1502
1503        // Flatten the dataflow plans into the representation expected by replicas.
1504        let objects_to_build = dataflow
1505            .objects_to_build
1506            .into_iter()
1507            .map(|object| BuildDesc {
1508                id: object.id,
1509                plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1510            })
1511            .collect();
1512
1513        let augmented_dataflow = DataflowDescription {
1514            source_imports,
1515            sink_exports,
1516            objects_to_build,
1517            // The rest of the fields are identical
1518            index_imports: dataflow.index_imports,
1519            index_exports: dataflow.index_exports,
1520            as_of: dataflow.as_of.clone(),
1521            until: dataflow.until,
1522            initial_storage_as_of: dataflow.initial_storage_as_of,
1523            refresh_schedule: dataflow.refresh_schedule,
1524            debug_name: dataflow.debug_name,
1525            time_dependence: dataflow.time_dependence,
1526        };
1527
1528        if augmented_dataflow.is_transient() {
1529            tracing::debug!(
1530                name = %augmented_dataflow.debug_name,
1531                import_ids = %augmented_dataflow.display_import_ids(),
1532                export_ids = %augmented_dataflow.display_export_ids(),
1533                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1534                until = ?augmented_dataflow.until.elements(),
1535                "creating dataflow",
1536            );
1537        } else {
1538            tracing::info!(
1539                name = %augmented_dataflow.debug_name,
1540                import_ids = %augmented_dataflow.display_import_ids(),
1541                export_ids = %augmented_dataflow.display_export_ids(),
1542                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1543                until = ?augmented_dataflow.until.elements(),
1544                "creating dataflow",
1545            );
1546        }
1547
1548        // Skip the actual dataflow creation for an empty `as_of`. (Happens e.g. for the
1549        // bootstrapping of a REFRESH AT mat view that is past its last refresh.)
1550        if as_of.is_empty() {
1551            tracing::info!(
1552                name = %augmented_dataflow.debug_name,
1553                "not sending `CreateDataflow`, because of empty `as_of`",
1554            );
1555        } else {
1556            let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1557            let dataflow = Box::new(augmented_dataflow);
1558            self.send(ComputeCommand::CreateDataflow(dataflow));
1559
1560            for id in collections {
1561                self.maybe_schedule_collection(id);
1562            }
1563        }
1564
1565        Ok(())
1566    }
1567
1568    /// Schedule the identified collection if all its inputs are available.
1569    ///
1570    /// # Panics
1571    ///
1572    /// Panics if the identified collection does not exist.
1573    fn maybe_schedule_collection(&mut self, id: GlobalId) {
1574        let collection = self.expect_collection(id);
1575
1576        // Don't schedule collections twice.
1577        if collection.scheduled {
1578            return;
1579        }
1580
1581        let as_of = collection.read_frontier();
1582
1583        // If the collection has an empty `as_of`, it was either never installed on the replica or
1584        // has since been dropped. In either case the replica does not expect any commands for it.
1585        if as_of.is_empty() {
1586            return;
1587        }
1588
1589        let ready = if id.is_transient() {
1590            // Always schedule transient collections immediately. The assumption is that those are
1591            // created by interactive user commands and we want to schedule them as quickly as
1592            // possible. Inputs might not yet be available, but when they become available, we
1593            // don't need to wait for the controller to become aware and for the scheduling check
1594            // to run again.
1595            true
1596        } else {
1597            // Ignore self-dependencies. Any self-dependencies do not need to be
1598            // available at the as_of for the dataflow to make progress, so we
1599            // can ignore them here. At the moment, only continual tasks have
1600            // self-dependencies, but this logic is correct for any dataflow, so
1601            // we don't special case it to CTs.
1602            let not_self_dep = |x: &GlobalId| *x != id;
1603
1604            // Check dependency frontiers to determine if all inputs are
1605            // available. An input is available when its frontier is greater
1606            // than the `as_of`, i.e., all input data up to and including the
1607            // `as_of` has been sealed.
1608            let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1609            let compute_frontiers = compute_deps.map(|id| {
1610                let dep = &self.expect_collection(id);
1611                dep.write_frontier()
1612            });
1613
1614            let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1615            let storage_frontiers = self
1616                .storage_collections
1617                .collections_frontiers(storage_deps.collect())
1618                .expect("must exist");
1619            let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1620
1621            let ready = compute_frontiers
1622                .chain(storage_frontiers)
1623                .all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1624
1625            ready
1626        };
1627
1628        if ready {
1629            self.send(ComputeCommand::Schedule(id));
1630            let collection = self.expect_collection_mut(id);
1631            collection.scheduled = true;
1632        }
1633    }
1634
1635    /// Schedule any unscheduled collections that are ready.
1636    fn schedule_collections(&mut self) {
1637        let ids: Vec<_> = self.collections.keys().copied().collect();
1638        for id in ids {
1639            self.maybe_schedule_collection(id);
1640        }
1641    }
1642
1643    /// Drops the read capability for the given collections and allows their resources to be
1644    /// reclaimed.
1645    #[mz_ore::instrument(level = "debug")]
1646    pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1647        for id in &ids {
1648            let collection = self.collection_mut(*id)?;
1649
1650            // Mark the collection as dropped to allow it to be removed from the controller state.
1651            collection.dropped = true;
1652
1653            // Drop the implied and warmup read holds to announce that clients are not
1654            // interested in the collection anymore.
1655            collection.implied_read_hold.release();
1656            collection.warmup_read_hold.release();
1657
1658            // If the collection is a subscribe, stop tracking it. This ensures that the controller
1659            // ceases to produce `SubscribeResponse`s for this subscribe.
1660            self.subscribes.remove(id);
1661            // If the collection is a copy to, stop tracking it. This ensures that the controller
1662            // ceases to produce `CopyToResponse`s` for this copy to.
1663            self.copy_tos.remove(id);
1664        }
1665
1666        Ok(())
1667    }
1668
1669    /// Initiate a peek request for the contents of `id` at `timestamp`.
1670    #[mz_ore::instrument(level = "debug")]
1671    pub fn peek(
1672        &mut self,
1673        peek_target: PeekTarget,
1674        literal_constraints: Option<Vec<Row>>,
1675        uuid: Uuid,
1676        timestamp: T,
1677        result_desc: RelationDesc,
1678        finishing: RowSetFinishing,
1679        map_filter_project: mz_expr::SafeMfpPlan,
1680        mut read_hold: ReadHold<T>,
1681        target_replica: Option<ReplicaId>,
1682        peek_response_tx: oneshot::Sender<PeekResponse>,
1683    ) -> Result<(), PeekError> {
1684        use PeekError::*;
1685
1686        // Downgrade the provided read hold to the peek time.
1687        let target_id = peek_target.id();
1688        if read_hold.id() != target_id {
1689            return Err(ReadHoldIdMismatch(read_hold.id()));
1690        }
1691        read_hold
1692            .try_downgrade(Antichain::from_elem(timestamp.clone()))
1693            .map_err(|_| ReadHoldInsufficient(target_id))?;
1694
1695        if let Some(target) = target_replica {
1696            if !self.replica_exists(target) {
1697                return Err(ReplicaMissing(target));
1698            }
1699        }
1700
1701        let otel_ctx = OpenTelemetryContext::obtain();
1702
1703        self.peeks.insert(
1704            uuid,
1705            PendingPeek {
1706                target_replica,
1707                // TODO(guswynn): can we just hold the `tracing::Span` here instead?
1708                otel_ctx: otel_ctx.clone(),
1709                requested_at: Instant::now(),
1710                read_hold,
1711                peek_response_tx,
1712                limit: finishing.limit.map(usize::cast_from),
1713                offset: finishing.offset,
1714            },
1715        );
1716
1717        let peek = Peek {
1718            literal_constraints,
1719            uuid,
1720            timestamp,
1721            finishing,
1722            map_filter_project,
1723            // Obtain an `OpenTelemetryContext` from the thread-local tracing
1724            // tree to forward it on to the compute worker.
1725            otel_ctx,
1726            target: peek_target,
1727            result_desc,
1728        };
1729        self.send(ComputeCommand::Peek(Box::new(peek)));
1730
1731        Ok(())
1732    }
1733
1734    /// Cancels an existing peek request.
1735    #[mz_ore::instrument(level = "debug")]
1736    pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1737        let Some(peek) = self.peeks.get_mut(&uuid) else {
1738            tracing::warn!("did not find pending peek for {uuid}");
1739            return;
1740        };
1741
1742        let duration = peek.requested_at.elapsed();
1743        self.metrics
1744            .observe_peek_response(&PeekResponse::Canceled, duration);
1745
1746        // Enqueue a notification for the cancellation.
1747        let otel_ctx = peek.otel_ctx.clone();
1748        otel_ctx.attach_as_parent();
1749
1750        self.deliver_response(ComputeControllerResponse::PeekNotification(
1751            uuid,
1752            PeekNotification::Canceled,
1753            otel_ctx,
1754        ));
1755
1756        // Finish the peek.
1757        // This will also propagate the cancellation to the replicas.
1758        self.finish_peek(uuid, reason);
1759    }
1760
1761    /// Assigns a read policy to specific identifiers.
1762    ///
1763    /// The policies are assigned in the order presented, and repeated identifiers should
1764    /// conclude with the last policy. Changing a policy will immediately downgrade the read
1765    /// capability if appropriate, but it will not "recover" the read capability if the prior
1766    /// capability is already ahead of it.
1767    ///
1768    /// Identifiers not present in `policies` retain their existing read policies.
1769    ///
1770    /// It is an error to attempt to set a read policy for a collection that is not readable in the
1771    /// context of compute. At this time, only indexes are readable compute collections.
1772    #[mz_ore::instrument(level = "debug")]
1773    pub fn set_read_policy(
1774        &mut self,
1775        policies: Vec<(GlobalId, ReadPolicy<T>)>,
1776    ) -> Result<(), ReadPolicyError> {
1777        // Do error checking upfront, to avoid introducing inconsistencies between a collection's
1778        // `implied_capability` and `read_capabilities`.
1779        for (id, _policy) in &policies {
1780            let collection = self.collection(*id)?;
1781            if collection.read_policy.is_none() {
1782                return Err(ReadPolicyError::WriteOnlyCollection(*id));
1783            }
1784        }
1785
1786        for (id, new_policy) in policies {
1787            let collection = self.expect_collection_mut(id);
1788            let new_since = new_policy.frontier(collection.write_frontier().borrow());
1789            let _ = collection.implied_read_hold.try_downgrade(new_since);
1790            collection.read_policy = Some(new_policy);
1791        }
1792
1793        Ok(())
1794    }
1795
1796    /// Advance the global write frontier of the given collection.
1797    ///
1798    /// Frontier regressions are gracefully ignored.
1799    ///
1800    /// # Panics
1801    ///
1802    /// Panics if the identified collection does not exist.
1803    #[mz_ore::instrument(level = "debug")]
1804    fn maybe_update_global_write_frontier(&mut self, id: GlobalId, new_frontier: Antichain<T>) {
1805        let collection = self.expect_collection_mut(id);
1806
1807        let advanced = collection.shared.lock_write_frontier(|f| {
1808            let advanced = PartialOrder::less_than(f, &new_frontier);
1809            if advanced {
1810                f.clone_from(&new_frontier);
1811            }
1812            advanced
1813        });
1814
1815        if !advanced {
1816            return;
1817        }
1818
1819        // Relax the implied read hold according to the read policy.
1820        let new_since = match &collection.read_policy {
1821            Some(read_policy) => {
1822                // For readable collections the read frontier is determined by applying the
1823                // client-provided read policy to the write frontier.
1824                read_policy.frontier(new_frontier.borrow())
1825            }
1826            None => {
1827                // Write-only collections cannot be read within the context of the compute
1828                // controller, so their read frontier only controls the read holds taken on their
1829                // inputs. We can safely downgrade the input read holds to any time less than the
1830                // write frontier.
1831                //
1832                // Note that some write-only collections (continual tasks) need to observe changes
1833                // at their current write frontier during hydration. Thus, we cannot downgrade the
1834                // read frontier to the write frontier and instead step it back by one.
1835                Antichain::from_iter(
1836                    new_frontier
1837                        .iter()
1838                        .map(|t| t.step_back().unwrap_or_else(T::minimum)),
1839                )
1840            }
1841        };
1842        let _ = collection.implied_read_hold.try_downgrade(new_since);
1843
1844        // Report the frontier advancement.
1845        self.deliver_response(ComputeControllerResponse::FrontierUpper {
1846            id,
1847            upper: new_frontier,
1848        });
1849    }
1850
1851    /// Apply a collection read hold change.
1852    fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch<T>) {
1853        let Some(collection) = self.collections.get_mut(&id) else {
1854            soft_panic_or_log!(
1855                "read hold change for absent collection (id={id}, changes={update:?})"
1856            );
1857            return;
1858        };
1859
1860        let new_since = collection.shared.lock_read_capabilities(|caps| {
1861            // Sanity check to prevent corrupted `read_capabilities`, which can cause hard-to-debug
1862            // issues (usually stuck read frontiers).
1863            let read_frontier = caps.frontier();
1864            for (time, diff) in update.iter() {
1865                let count = caps.count_for(time) + diff;
1866                assert!(
1867                    count >= 0,
1868                    "invalid read capabilities update: negative capability \
1869             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1870                );
1871                assert!(
1872                    count == 0 || read_frontier.less_equal(time),
1873                    "invalid read capabilities update: frontier regression \
1874             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1875                );
1876            }
1877
1878            // Apply read capability updates and learn about resulting changes to the read
1879            // frontier.
1880            let changes = caps.update_iter(update.drain());
1881
1882            let changed = changes.count() > 0;
1883            changed.then(|| caps.frontier().to_owned())
1884        });
1885
1886        let Some(new_since) = new_since else {
1887            return; // read frontier did not change
1888        };
1889
1890        // Propagate read frontier update to dependencies.
1891        for read_hold in collection.compute_dependencies.values_mut() {
1892            read_hold
1893                .try_downgrade(new_since.clone())
1894                .expect("frontiers don't regress");
1895        }
1896        for read_hold in collection.storage_dependencies.values_mut() {
1897            read_hold
1898                .try_downgrade(new_since.clone())
1899                .expect("frontiers don't regress");
1900        }
1901
1902        // Produce `AllowCompaction` command.
1903        self.send(ComputeCommand::AllowCompaction {
1904            id,
1905            frontier: new_since,
1906        });
1907    }
1908
1909    /// Fulfills a registered peek and cleans up associated state.
1910    ///
1911    /// As part of this we:
1912    ///  * Send a `PeekResponse` through the peek's response channel.
1913    ///  * Emit a `CancelPeek` command to instruct replicas to stop spending resources on this
1914    ///    peek, and to allow the `ComputeCommandHistory` to reduce away the corresponding `Peek`
1915    ///    command.
1916    ///  * Remove the read hold for this peek, unblocking compaction that might have waited on it.
1917    fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1918        let Some(peek) = self.peeks.remove(&uuid) else {
1919            return;
1920        };
1921
1922        // The recipient might not be interested in the peek response anymore, which is fine.
1923        let _ = peek.peek_response_tx.send(response);
1924
1925        // NOTE: We need to send the `CancelPeek` command _before_ we release the peek's read hold
1926        // (by dropping it), to avoid the edge case that caused database-issues#4812.
1927        self.send(ComputeCommand::CancelPeek { uuid });
1928
1929        drop(peek.read_hold);
1930    }
1931
1932    /// Handles a response from a replica. Replica IDs are re-used across replica restarts, so we
1933    /// use the replica epoch to drop stale responses.
1934    fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse<T>) {
1935        // Filter responses from non-existing or stale replicas.
1936        if self
1937            .replicas
1938            .get(&replica_id)
1939            .filter(|replica| replica.epoch == epoch)
1940            .is_none()
1941        {
1942            return;
1943        }
1944
1945        // Invariant: the replica exists and has the expected epoch.
1946
1947        match response {
1948            ComputeResponse::Frontiers(id, frontiers) => {
1949                self.handle_frontiers_response(id, frontiers, replica_id);
1950            }
1951            ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
1952                self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
1953            }
1954            ComputeResponse::CopyToResponse(id, response) => {
1955                self.handle_copy_to_response(id, response, replica_id);
1956            }
1957            ComputeResponse::SubscribeResponse(id, response) => {
1958                self.handle_subscribe_response(id, response, replica_id);
1959            }
1960            ComputeResponse::Status(response) => {
1961                self.handle_status_response(response, replica_id);
1962            }
1963        }
1964    }
1965
1966    /// Handle new frontiers, returning any compute response that needs to
1967    /// be sent to the client.
1968    fn handle_frontiers_response(
1969        &mut self,
1970        id: GlobalId,
1971        frontiers: FrontiersResponse<T>,
1972        replica_id: ReplicaId,
1973    ) {
1974        if !self.collections.contains_key(&id) {
1975            soft_panic_or_log!(
1976                "frontiers update for an unknown collection \
1977                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1978            );
1979            return;
1980        }
1981        let Some(replica) = self.replicas.get_mut(&replica_id) else {
1982            soft_panic_or_log!(
1983                "frontiers update for an unknown replica \
1984                 (replica_id={replica_id}, frontiers={frontiers:?})"
1985            );
1986            return;
1987        };
1988        let Some(replica_collection) = replica.collections.get_mut(&id) else {
1989            soft_panic_or_log!(
1990                "frontiers update for an unknown replica collection \
1991                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1992            );
1993            return;
1994        };
1995
1996        if let Some(new_frontier) = frontiers.input_frontier {
1997            replica_collection.update_input_frontier(new_frontier.clone());
1998        }
1999        if let Some(new_frontier) = frontiers.output_frontier {
2000            replica_collection.update_output_frontier(new_frontier.clone());
2001        }
2002        if let Some(new_frontier) = frontiers.write_frontier {
2003            replica_collection.update_write_frontier(new_frontier.clone());
2004            self.maybe_update_global_write_frontier(id, new_frontier);
2005        }
2006    }
2007
2008    #[mz_ore::instrument(level = "debug")]
2009    fn handle_peek_response(
2010        &mut self,
2011        uuid: Uuid,
2012        response: PeekResponse,
2013        otel_ctx: OpenTelemetryContext,
2014        replica_id: ReplicaId,
2015    ) {
2016        otel_ctx.attach_as_parent();
2017
2018        // We might not be tracking this peek anymore, because we have served a response already or
2019        // because it was canceled. If this is the case, we ignore the response.
2020        let Some(peek) = self.peeks.get(&uuid) else {
2021            return;
2022        };
2023
2024        // If the peek is targeting a replica, ignore responses from other replicas.
2025        let target_replica = peek.target_replica.unwrap_or(replica_id);
2026        if target_replica != replica_id {
2027            return;
2028        }
2029
2030        let duration = peek.requested_at.elapsed();
2031        self.metrics.observe_peek_response(&response, duration);
2032
2033        let notification = PeekNotification::new(&response, peek.offset, peek.limit);
2034        // NOTE: We use the `otel_ctx` from the response, not the pending peek, because we
2035        // currently want the parent to be whatever the compute worker did with this peek.
2036        self.deliver_response(ComputeControllerResponse::PeekNotification(
2037            uuid,
2038            notification,
2039            otel_ctx,
2040        ));
2041
2042        self.finish_peek(uuid, response)
2043    }
2044
2045    fn handle_copy_to_response(
2046        &mut self,
2047        sink_id: GlobalId,
2048        response: CopyToResponse,
2049        replica_id: ReplicaId,
2050    ) {
2051        if !self.collections.contains_key(&sink_id) {
2052            soft_panic_or_log!(
2053                "received response for an unknown copy-to \
2054                 (sink_id={sink_id}, replica_id={replica_id})",
2055            );
2056            return;
2057        }
2058        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2059            soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
2060            return;
2061        };
2062        let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
2063            soft_panic_or_log!(
2064                "copy-to response for an unknown replica collection \
2065                 (sink_id={sink_id}, replica_id={replica_id})"
2066            );
2067            return;
2068        };
2069
2070        // Downgrade the replica frontiers, to enable dropping of input read holds and clean up of
2071        // collection state.
2072        // TODO(database-issues#4701): report copy-to frontiers through `Frontiers` responses
2073        replica_collection.update_write_frontier(Antichain::new());
2074        replica_collection.update_input_frontier(Antichain::new());
2075        replica_collection.update_output_frontier(Antichain::new());
2076
2077        // We might not be tracking this COPY TO because we have already returned a response
2078        // from one of the replicas. In that case, we ignore the response.
2079        if !self.copy_tos.remove(&sink_id) {
2080            return;
2081        }
2082
2083        let result = match response {
2084            CopyToResponse::RowCount(count) => Ok(count),
2085            CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
2086            // We should never get here: Replicas only drop copy to collections in response
2087            // to the controller allowing them to do so, and when the controller drops a
2088            // copy to it also removes it from the list of tracked copy_tos (see
2089            // [`Instance::drop_collections`]).
2090            CopyToResponse::Dropped => {
2091                tracing::error!(
2092                    %sink_id, %replica_id,
2093                    "received `Dropped` response for a tracked copy to",
2094                );
2095                return;
2096            }
2097        };
2098
2099        self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2100    }
2101
2102    fn handle_subscribe_response(
2103        &mut self,
2104        subscribe_id: GlobalId,
2105        response: SubscribeResponse<T>,
2106        replica_id: ReplicaId,
2107    ) {
2108        if !self.collections.contains_key(&subscribe_id) {
2109            soft_panic_or_log!(
2110                "received response for an unknown subscribe \
2111                 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2112            );
2113            return;
2114        }
2115        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2116            soft_panic_or_log!(
2117                "subscribe response for an unknown replica (replica_id={replica_id})"
2118            );
2119            return;
2120        };
2121        let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2122            soft_panic_or_log!(
2123                "subscribe response for an unknown replica collection \
2124                 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2125            );
2126            return;
2127        };
2128
2129        // Always apply replica write frontier updates. Even if the subscribe is not tracked
2130        // anymore, there might still be replicas reading from its inputs, so we need to track the
2131        // frontiers until all replicas have advanced to the empty one.
2132        let write_frontier = match &response {
2133            SubscribeResponse::Batch(batch) => batch.upper.clone(),
2134            SubscribeResponse::DroppedAt(_) => Antichain::new(),
2135        };
2136
2137        // For subscribes we downgrade all replica frontiers based on write frontiers. This should
2138        // be fine because the input and output frontier of a subscribe track its write frontier.
2139        // TODO(database-issues#4701): report subscribe frontiers through `Frontiers` responses
2140        replica_collection.update_write_frontier(write_frontier.clone());
2141        replica_collection.update_input_frontier(write_frontier.clone());
2142        replica_collection.update_output_frontier(write_frontier.clone());
2143
2144        // If the subscribe is not tracked, or targets a different replica, there is nothing to do.
2145        let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2146            return;
2147        };
2148        let replica_targeted = subscribe.target_replica.unwrap_or(replica_id) == replica_id;
2149        if !replica_targeted {
2150            return;
2151        }
2152
2153        // Apply a global frontier update.
2154        // If this is a replica-targeted subscribe, it is important that we advance the global
2155        // frontier only based on responses from the targeted replica. Otherwise, another replica
2156        // could advance to the empty frontier, making us drop the subscribe on the targeted
2157        // replica prematurely.
2158        self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2159
2160        match response {
2161            SubscribeResponse::Batch(batch) => {
2162                let upper = batch.upper;
2163                let mut updates = batch.updates;
2164
2165                // If this batch advances the subscribe's frontier, we emit all updates at times
2166                // greater or equal to the last frontier (to avoid emitting duplicate updates).
2167                if PartialOrder::less_than(&subscribe.frontier, &upper) {
2168                    let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2169
2170                    if upper.is_empty() {
2171                        // This subscribe cannot produce more data. Stop tracking it.
2172                        self.subscribes.remove(&subscribe_id);
2173                    } else {
2174                        // This subscribe can produce more data. Update our tracking of it.
2175                        self.subscribes.insert(subscribe_id, subscribe);
2176                    }
2177
2178                    if let Ok(updates) = updates.as_mut() {
2179                        updates.retain(|(time, _data, _diff)| lower.less_equal(time));
2180                    }
2181                    self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2182                        subscribe_id,
2183                        SubscribeBatch {
2184                            lower,
2185                            upper,
2186                            updates,
2187                        },
2188                    ));
2189                }
2190            }
2191            SubscribeResponse::DroppedAt(frontier) => {
2192                // We should never get here: Replicas only drop subscribe collections in response
2193                // to the controller allowing them to do so, and when the controller drops a
2194                // subscribe it also removes it from the list of tracked subscribes (see
2195                // [`Instance::drop_collections`]).
2196                tracing::error!(
2197                    %subscribe_id,
2198                    %replica_id,
2199                    frontier = ?frontier.elements(),
2200                    "received `DroppedAt` response for a tracked subscribe",
2201                );
2202                self.subscribes.remove(&subscribe_id);
2203            }
2204        }
2205    }
2206
2207    fn handle_status_response(&mut self, response: StatusResponse, replica_id: ReplicaId) {
2208        match response {
2209            StatusResponse::OperatorHydration(status) => {
2210                self.update_operator_hydration_status(replica_id, status)
2211            }
2212        }
2213    }
2214
2215    /// Return the write frontiers of the dependencies of the given collection.
2216    fn dependency_write_frontiers<'b>(
2217        &'b self,
2218        collection: &'b CollectionState<T>,
2219    ) -> impl Iterator<Item = Antichain<T>> + 'b {
2220        let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2221            let collection = self.collections.get(&dep_id);
2222            collection.map(|c| c.write_frontier())
2223        });
2224        let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2225            let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2226            frontiers.map(|f| f.write_frontier)
2227        });
2228
2229        compute_frontiers.chain(storage_frontiers)
2230    }
2231
2232    /// Return the write frontiers of transitive storage dependencies of the given collection.
2233    fn transitive_storage_dependency_write_frontiers<'b>(
2234        &'b self,
2235        collection: &'b CollectionState<T>,
2236    ) -> impl Iterator<Item = Antichain<T>> + 'b {
2237        let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2238        let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2239        let mut done = BTreeSet::new();
2240
2241        while let Some(id) = todo.pop() {
2242            if done.contains(&id) {
2243                continue;
2244            }
2245            if let Some(dep) = self.collections.get(&id) {
2246                storage_ids.extend(dep.storage_dependency_ids());
2247                todo.extend(dep.compute_dependency_ids())
2248            }
2249            done.insert(id);
2250        }
2251
2252        let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2253            let frontiers = self.storage_collections.collection_frontiers(id).ok();
2254            frontiers.map(|f| f.write_frontier)
2255        });
2256
2257        storage_frontiers
2258    }
2259
2260    /// Downgrade the warmup capabilities of collections as much as possible.
2261    ///
2262    /// The only requirement we have for a collection's warmup capability is that it is for a time
2263    /// that is available in all of the collection's inputs. For each input the latest time that is
2264    /// the case for is `write_frontier - 1`. So the farthest we can downgrade a collection's
2265    /// warmup capability is the minimum of `write_frontier - 1` of all its inputs.
2266    ///
2267    /// This method expects to be periodically called as part of instance maintenance work.
2268    /// We would like to instead update the warmup capabilities synchronously in response to
2269    /// frontier updates of dependency collections, but that is not generally possible because we
2270    /// don't learn about frontier updates of storage collections synchronously. We could do
2271    /// synchronous updates for compute dependencies, but we refrain from doing for simplicity.
2272    fn downgrade_warmup_capabilities(&mut self) {
2273        let mut new_capabilities = BTreeMap::new();
2274        for (id, collection) in &self.collections {
2275            // For write-only collections that have advanced to the empty frontier, we can drop the
2276            // warmup capability entirely. There is no reason why we would need to hydrate those
2277            // collections again, so being able to warm them up is not useful.
2278            if collection.read_policy.is_none()
2279                && collection.shared.lock_write_frontier(|f| f.is_empty())
2280            {
2281                new_capabilities.insert(*id, Antichain::new());
2282                continue;
2283            }
2284
2285            let mut new_capability = Antichain::new();
2286            for frontier in self.dependency_write_frontiers(collection) {
2287                for time in frontier {
2288                    new_capability.insert(time.step_back().unwrap_or(time));
2289                }
2290            }
2291
2292            new_capabilities.insert(*id, new_capability);
2293        }
2294
2295        for (id, new_capability) in new_capabilities {
2296            let collection = self.expect_collection_mut(id);
2297            let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2298        }
2299    }
2300
2301    /// Forward the implied capabilities of collections, if possible.
2302    ///
2303    /// The implied capability of a collection controls (a) which times are still readable (for
2304    /// indexes) and (b) with which as-of the collection gets installed on a new replica. We are
2305    /// usually not allowed to advance an implied capability beyond the frontier that follows from
2306    /// the collection's read policy applied to its write frontier:
2307    ///
2308    ///  * For sink collections, some external consumer might rely on seeing all distinct times in
2309    ///    the input reflected in the output. If we'd forward the implied capability of a sink,
2310    ///    we'd risk skipping times in the output across replica restarts.
2311    ///  * For index collections, we might make the index unreadable by advancing its read frontier
2312    ///    beyond its write frontier.
2313    ///
2314    /// There is one case where forwarding an implied capability is fine though: an index installed
2315    /// on a cluster that has no replicas. Such indexes are not readable anyway until a new replica
2316    /// is added, so advancing its read frontier can't make it unreadable. We can thus advance the
2317    /// implied capability as long as we make sure that when a new replica is added, the expected
2318    /// relationship between write frontier, read policy, and implied capability can be restored
2319    /// immediately (modulo computation time).
2320    ///
2321    /// Forwarding implied capabilities is not necessary for the correct functioning of the
2322    /// controller but an optimization that is beneficial in two ways:
2323    ///
2324    ///  * It relaxes read holds on inputs to forwarded collections, allowing their compaction.
2325    ///  * It reduces the amount of historical detail new replicas need to process when computing
2326    ///    forwarded collections, as forwarding the implied capability also forwards the corresponding
2327    ///    dataflow as-of.
2328    fn forward_implied_capabilities(&mut self) {
2329        if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2330            return;
2331        }
2332        if !self.replicas.is_empty() {
2333            return;
2334        }
2335
2336        let mut new_capabilities = BTreeMap::new();
2337        for (id, collection) in &self.collections {
2338            let Some(read_policy) = &collection.read_policy else {
2339                // Collection is write-only, i.e. a sink.
2340                continue;
2341            };
2342
2343            // When a new replica is started, it will immediately be able to compute all collection
2344            // output up to the write frontier of its transitive storage inputs. So the new implied
2345            // read capability should be the read policy applied to that frontier.
2346            let mut dep_frontier = Antichain::new();
2347            for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2348                dep_frontier.extend(frontier);
2349            }
2350
2351            let new_capability = read_policy.frontier(dep_frontier.borrow());
2352            if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2353                new_capabilities.insert(*id, new_capability);
2354            }
2355        }
2356
2357        for (id, new_capability) in new_capabilities {
2358            let collection = self.expect_collection_mut(id);
2359            let _ = collection.implied_read_hold.try_downgrade(new_capability);
2360        }
2361    }
2362
2363    /// Process pending maintenance work.
2364    ///
2365    /// This method is invoked periodically by the global controller.
2366    /// It is a good place to perform maintenance work that arises from various controller state
2367    /// changes and that cannot conveniently be handled synchronously with those state changes.
2368    #[mz_ore::instrument(level = "debug")]
2369    pub fn maintain(&mut self) {
2370        self.rehydrate_failed_replicas();
2371        self.downgrade_warmup_capabilities();
2372        self.forward_implied_capabilities();
2373        self.schedule_collections();
2374        self.cleanup_collections();
2375        self.update_frontier_introspection();
2376        self.refresh_state_metrics();
2377        self.refresh_wallclock_lag();
2378    }
2379}
2380
2381/// State maintained about individual compute collections.
2382///
2383/// A compute collection is either an index, or a storage sink, or a subscribe, exported by a
2384/// compute dataflow.
2385#[derive(Debug)]
2386struct CollectionState<T: ComputeControllerTimestamp> {
2387    /// Whether this collection is a log collection.
2388    ///
2389    /// Log collections are special in that they are only maintained by a subset of all replicas.
2390    log_collection: bool,
2391    /// Whether this collection has been dropped by a controller client.
2392    ///
2393    /// The controller is allowed to remove the `CollectionState` for a collection only when
2394    /// `dropped == true`. Otherwise, clients might still expect to be able to query information
2395    /// about this collection.
2396    dropped: bool,
2397    /// Whether this collection has been scheduled, i.e., the controller has sent a `Schedule`
2398    /// command for it.
2399    scheduled: bool,
2400
2401    /// State shared with the `ComputeController`.
2402    shared: SharedCollectionState<T>,
2403
2404    /// A read hold maintaining the implicit capability of the collection.
2405    ///
2406    /// This capability is kept to ensure that the collection remains readable according to its
2407    /// `read_policy`. It also ensures that read holds on the collection's dependencies are kept at
2408    /// some time not greater than the collection's `write_frontier`, guaranteeing that the
2409    /// collection's next outputs can always be computed without skipping times.
2410    implied_read_hold: ReadHold<T>,
2411    /// A read hold held to enable dataflow warmup.
2412    ///
2413    /// Dataflow warmup is an optimization that allows dataflows to immediately start hydrating
2414    /// even when their next output time (as implied by the `write_frontier`) is in the future.
2415    /// By installing a read capability derived from the write frontiers of the collection's
2416    /// inputs, we ensure that the as-of of new dataflows installed for the collection is at a time
2417    /// that is immediately available, so hydration can begin immediately too.
2418    warmup_read_hold: ReadHold<T>,
2419    /// The policy to use to downgrade `self.implied_read_hold`.
2420    ///
2421    /// If `None`, the collection is a write-only collection (i.e. a sink). For write-only
2422    /// collections, the `implied_read_hold` is only required for maintaining read holds on the
2423    /// inputs, so we can immediately downgrade it to the `write_frontier`.
2424    read_policy: Option<ReadPolicy<T>>,
2425
2426    /// Storage identifiers on which this collection depends, and read holds this collection
2427    /// requires on them.
2428    storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2429    /// Compute identifiers on which this collection depends, and read holds this collection
2430    /// requires on them.
2431    compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2432
2433    /// Introspection state associated with this collection.
2434    introspection: CollectionIntrospection<T>,
2435
2436    /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
2437    /// introspection update.
2438    ///
2439    /// Keys are `(period, lag, labels)` triples, values are counts.
2440    ///
2441    /// If this is `None`, wallclock lag is not tracked for this collection.
2442    wallclock_lag_histogram_stash: Option<
2443        BTreeMap<
2444            (
2445                WallclockLagHistogramPeriod,
2446                WallclockLag,
2447                BTreeMap<&'static str, String>,
2448            ),
2449            Diff,
2450        >,
2451    >,
2452}
2453
2454impl<T: ComputeControllerTimestamp> CollectionState<T> {
2455    /// Creates a new collection state, with an initial read policy valid from `since`.
2456    fn new(
2457        collection_id: GlobalId,
2458        as_of: Antichain<T>,
2459        shared: SharedCollectionState<T>,
2460        storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2461        compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2462        read_hold_tx: read_holds::ChangeTx<T>,
2463        introspection: CollectionIntrospection<T>,
2464    ) -> Self {
2465        // A collection is not readable before the `as_of`.
2466        let since = as_of.clone();
2467        // A collection won't produce updates for times before the `as_of`.
2468        let upper = as_of;
2469
2470        // Ensure that the provided `shared` is valid for the given `as_of`.
2471        assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2472        assert!(shared.lock_write_frontier(|f| f == &upper));
2473
2474        // Initialize collection read holds.
2475        // Note that the implied read hold was already added to the `read_capabilities` when
2476        // `shared` was created, so we only need to add the warmup read hold here.
2477        let implied_read_hold =
2478            ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2479        let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2480
2481        let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2482        shared.lock_read_capabilities(|c| {
2483            c.update_iter(updates);
2484        });
2485
2486        // In an effort to keep the produced wallclock lag introspection data small and
2487        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
2488        // select indexes and subscribes.
2489        let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2490            true => None,
2491            false => Some(Default::default()),
2492        };
2493
2494        Self {
2495            log_collection: false,
2496            dropped: false,
2497            scheduled: false,
2498            shared,
2499            implied_read_hold,
2500            warmup_read_hold,
2501            read_policy: Some(ReadPolicy::ValidFrom(since)),
2502            storage_dependencies,
2503            compute_dependencies,
2504            introspection,
2505            wallclock_lag_histogram_stash,
2506        }
2507    }
2508
2509    /// Creates a new collection state for a log collection.
2510    fn new_log_collection(
2511        id: GlobalId,
2512        shared: SharedCollectionState<T>,
2513        read_hold_tx: read_holds::ChangeTx<T>,
2514        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2515    ) -> Self {
2516        let since = Antichain::from_elem(T::minimum());
2517        let introspection =
2518            CollectionIntrospection::new(id, introspection_tx, since.clone(), false, None, None);
2519        let mut state = Self::new(
2520            id,
2521            since,
2522            shared,
2523            Default::default(),
2524            Default::default(),
2525            read_hold_tx,
2526            introspection,
2527        );
2528        state.log_collection = true;
2529        // Log collections are created and scheduled implicitly as part of replica initialization.
2530        state.scheduled = true;
2531        state
2532    }
2533
2534    /// Reports the current read frontier.
2535    fn read_frontier(&self) -> Antichain<T> {
2536        self.shared
2537            .lock_read_capabilities(|c| c.frontier().to_owned())
2538    }
2539
2540    /// Reports the current write frontier.
2541    fn write_frontier(&self) -> Antichain<T> {
2542        self.shared.lock_write_frontier(|f| f.clone())
2543    }
2544
2545    fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2546        self.storage_dependencies.keys().copied()
2547    }
2548
2549    fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2550        self.compute_dependencies.keys().copied()
2551    }
2552
2553    /// Reports the IDs of the dependencies of this collection.
2554    fn dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2555        self.compute_dependency_ids()
2556            .chain(self.storage_dependency_ids())
2557    }
2558}
2559
2560/// Collection state shared with the `ComputeController`.
2561///
2562/// Having this allows certain controller APIs, such as `ComputeController::collection_frontiers`
2563/// and `ComputeController::acquire_read_hold` to be non-`async`. This comes at the cost of
2564/// complexity (by introducing shared mutable state) and performance (by introducing locking). We
2565/// should aim to reduce the amount of shared state over time, rather than expand it.
2566///
2567/// Note that [`SharedCollectionState`]s are initialized by the `ComputeController` prior to the
2568/// collection's creation in the [`Instance`]. This is to allow compute clients to query frontiers
2569/// and take new read holds immediately, without having to wait for the [`Instance`] to update.
2570#[derive(Clone, Debug)]
2571pub(super) struct SharedCollectionState<T> {
2572    /// Accumulation of read capabilities for the collection.
2573    ///
2574    /// This accumulation contains the capabilities held by all [`ReadHold`]s given out for the
2575    /// collection, including `implied_read_hold` and `warmup_read_hold`.
2576    ///
2577    /// NOTE: This field may only be modified by [`Instance::apply_read_hold_change`] and
2578    /// `ComputeController::acquire_read_hold`. Nobody else should modify read capabilities
2579    /// directly. Instead, collection users should manage read holds through [`ReadHold`] objects
2580    /// acquired through `ComputeController::acquire_read_hold`.
2581    ///
2582    /// TODO(teskje): Restructure the code to enforce the above in the type system.
2583    read_capabilities: Arc<Mutex<MutableAntichain<T>>>,
2584    /// The write frontier of this collection.
2585    write_frontier: Arc<Mutex<Antichain<T>>>,
2586}
2587
2588impl<T: Timestamp> SharedCollectionState<T> {
2589    pub fn new(as_of: Antichain<T>) -> Self {
2590        // A collection is not readable before the `as_of`.
2591        let since = as_of.clone();
2592        // A collection won't produce updates for times before the `as_of`.
2593        let upper = as_of;
2594
2595        // Initialize read capabilities to the `since`.
2596        // The is the implied read capability. The corresponding [`ReadHold`] is created in
2597        // [`CollectionState::new`].
2598        let mut read_capabilities = MutableAntichain::new();
2599        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2600
2601        Self {
2602            read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2603            write_frontier: Arc::new(Mutex::new(upper)),
2604        }
2605    }
2606
2607    pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2608    where
2609        F: FnOnce(&mut MutableAntichain<T>) -> R,
2610    {
2611        let mut caps = self.read_capabilities.lock().expect("poisoned");
2612        f(&mut *caps)
2613    }
2614
2615    pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2616    where
2617        F: FnOnce(&mut Antichain<T>) -> R,
2618    {
2619        let mut frontier = self.write_frontier.lock().expect("poisoned");
2620        f(&mut *frontier)
2621    }
2622}
2623
2624/// Manages certain introspection relations associated with a collection. Upon creation, it adds
2625/// rows to introspection relations. When dropped, it retracts its managed rows.
2626///
2627/// TODO: `ComputeDependencies` could be moved under this.
2628#[derive(Debug)]
2629struct CollectionIntrospection<T: ComputeControllerTimestamp> {
2630    /// The ID of the compute collection.
2631    collection_id: GlobalId,
2632    /// A channel through which introspection updates are delivered.
2633    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2634    /// Introspection state for `IntrospectionType::Frontiers`.
2635    ///
2636    /// `Some` if the collection does _not_ sink into a storage collection (i.e. is not an MV). If
2637    /// the collection sinks into storage, the storage controller reports its frontiers instead.
2638    frontiers: Option<FrontiersIntrospectionState<T>>,
2639    /// Introspection state for `IntrospectionType::ComputeMaterializedViewRefreshes`.
2640    ///
2641    /// `Some` if the collection is a REFRESH MV.
2642    refresh: Option<RefreshIntrospectionState<T>>,
2643}
2644
2645impl<T: ComputeControllerTimestamp> CollectionIntrospection<T> {
2646    fn new(
2647        collection_id: GlobalId,
2648        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2649        as_of: Antichain<T>,
2650        storage_sink: bool,
2651        initial_as_of: Option<Antichain<T>>,
2652        refresh_schedule: Option<RefreshSchedule>,
2653    ) -> Self {
2654        let refresh =
2655            match (refresh_schedule, initial_as_of) {
2656                (Some(refresh_schedule), Some(initial_as_of)) => Some(
2657                    RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2658                ),
2659                (refresh_schedule, _) => {
2660                    // If we have a `refresh_schedule`, then the collection is a MV, so we should also have
2661                    // an `initial_as_of`.
2662                    soft_assert_or_log!(
2663                        refresh_schedule.is_none(),
2664                        "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2665                    );
2666                    None
2667                }
2668            };
2669        let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2670
2671        let self_ = Self {
2672            collection_id,
2673            introspection_tx,
2674            frontiers,
2675            refresh,
2676        };
2677
2678        self_.report_initial_state();
2679        self_
2680    }
2681
2682    /// Reports the initial introspection state.
2683    fn report_initial_state(&self) {
2684        if let Some(frontiers) = &self.frontiers {
2685            let row = frontiers.row_for_collection(self.collection_id);
2686            let updates = vec![(row, Diff::ONE)];
2687            self.send(IntrospectionType::Frontiers, updates);
2688        }
2689
2690        if let Some(refresh) = &self.refresh {
2691            let row = refresh.row_for_collection(self.collection_id);
2692            let updates = vec![(row, Diff::ONE)];
2693            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2694        }
2695    }
2696
2697    /// Observe the given current collection frontiers and update the introspection state as
2698    /// necessary.
2699    fn observe_frontiers(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2700        self.update_frontier_introspection(read_frontier, write_frontier);
2701        self.update_refresh_introspection(write_frontier);
2702    }
2703
2704    fn update_frontier_introspection(
2705        &mut self,
2706        read_frontier: &Antichain<T>,
2707        write_frontier: &Antichain<T>,
2708    ) {
2709        let Some(frontiers) = &mut self.frontiers else {
2710            return;
2711        };
2712
2713        if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2714        {
2715            return; // no change
2716        };
2717
2718        let retraction = frontiers.row_for_collection(self.collection_id);
2719        frontiers.update(read_frontier, write_frontier);
2720        let insertion = frontiers.row_for_collection(self.collection_id);
2721        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2722        self.send(IntrospectionType::Frontiers, updates);
2723    }
2724
2725    fn update_refresh_introspection(&mut self, write_frontier: &Antichain<T>) {
2726        let Some(refresh) = &mut self.refresh else {
2727            return;
2728        };
2729
2730        let retraction = refresh.row_for_collection(self.collection_id);
2731        refresh.frontier_update(write_frontier);
2732        let insertion = refresh.row_for_collection(self.collection_id);
2733
2734        if retraction == insertion {
2735            return; // no change
2736        }
2737
2738        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2739        self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2740    }
2741
2742    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2743        // Failure to send means the `ComputeController` has been dropped and doesn't care about
2744        // introspection updates anymore.
2745        let _ = self.introspection_tx.send((introspection_type, updates));
2746    }
2747}
2748
2749impl<T: ComputeControllerTimestamp> Drop for CollectionIntrospection<T> {
2750    fn drop(&mut self) {
2751        // Retract collection frontiers.
2752        if let Some(frontiers) = &self.frontiers {
2753            let row = frontiers.row_for_collection(self.collection_id);
2754            let updates = vec![(row, Diff::MINUS_ONE)];
2755            self.send(IntrospectionType::Frontiers, updates);
2756        }
2757
2758        // Retract MV refresh state.
2759        if let Some(refresh) = &self.refresh {
2760            let retraction = refresh.row_for_collection(self.collection_id);
2761            let updates = vec![(retraction, Diff::MINUS_ONE)];
2762            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2763        }
2764    }
2765}
2766
2767#[derive(Debug)]
2768struct FrontiersIntrospectionState<T> {
2769    read_frontier: Antichain<T>,
2770    write_frontier: Antichain<T>,
2771}
2772
2773impl<T: ComputeControllerTimestamp> FrontiersIntrospectionState<T> {
2774    fn new(as_of: Antichain<T>) -> Self {
2775        Self {
2776            read_frontier: as_of.clone(),
2777            write_frontier: as_of,
2778        }
2779    }
2780
2781    /// Return a `Row` reflecting the current collection frontiers.
2782    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2783        let read_frontier = self
2784            .read_frontier
2785            .as_option()
2786            .map_or(Datum::Null, |ts| ts.clone().into());
2787        let write_frontier = self
2788            .write_frontier
2789            .as_option()
2790            .map_or(Datum::Null, |ts| ts.clone().into());
2791        Row::pack_slice(&[
2792            Datum::String(&collection_id.to_string()),
2793            read_frontier,
2794            write_frontier,
2795        ])
2796    }
2797
2798    /// Update the introspection state with the given new frontiers.
2799    fn update(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2800        if read_frontier != &self.read_frontier {
2801            self.read_frontier.clone_from(read_frontier);
2802        }
2803        if write_frontier != &self.write_frontier {
2804            self.write_frontier.clone_from(write_frontier);
2805        }
2806    }
2807}
2808
2809/// Information needed to compute introspection updates for a REFRESH materialized view when the
2810/// write frontier advances.
2811#[derive(Debug)]
2812struct RefreshIntrospectionState<T> {
2813    // Immutable properties of the MV
2814    refresh_schedule: RefreshSchedule,
2815    initial_as_of: Antichain<T>,
2816    // Refresh state
2817    next_refresh: Datum<'static>,           // Null or an MzTimestamp
2818    last_completed_refresh: Datum<'static>, // Null or an MzTimestamp
2819}
2820
2821impl<T> RefreshIntrospectionState<T> {
2822    /// Return a `Row` reflecting the current refresh introspection state.
2823    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2824        Row::pack_slice(&[
2825            Datum::String(&collection_id.to_string()),
2826            self.last_completed_refresh,
2827            self.next_refresh,
2828        ])
2829    }
2830}
2831
2832impl<T: ComputeControllerTimestamp> RefreshIntrospectionState<T> {
2833    /// Construct a new [`RefreshIntrospectionState`], and apply an initial `frontier_update()` at
2834    /// the `upper`.
2835    fn new(
2836        refresh_schedule: RefreshSchedule,
2837        initial_as_of: Antichain<T>,
2838        upper: &Antichain<T>,
2839    ) -> Self {
2840        let mut self_ = Self {
2841            refresh_schedule: refresh_schedule.clone(),
2842            initial_as_of: initial_as_of.clone(),
2843            next_refresh: Datum::Null,
2844            last_completed_refresh: Datum::Null,
2845        };
2846        self_.frontier_update(upper);
2847        self_
2848    }
2849
2850    /// Should be called whenever the write frontier of the collection advances. It updates the
2851    /// state that should be recorded in introspection relations, but doesn't send the updates yet.
2852    fn frontier_update(&mut self, write_frontier: &Antichain<T>) {
2853        if write_frontier.is_empty() {
2854            self.last_completed_refresh =
2855                if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2856                    last_refresh.into()
2857                } else {
2858                    // If there is no last refresh, then we have a `REFRESH EVERY`, in which case
2859                    // the saturating roundup puts a refresh at the maximum possible timestamp.
2860                    T::maximum().into()
2861                };
2862            self.next_refresh = Datum::Null;
2863        } else {
2864            if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2865                // We are before the first refresh.
2866                self.last_completed_refresh = Datum::Null;
2867                let initial_as_of = self.initial_as_of.as_option().expect(
2868                    "initial_as_of can't be [], because then there would be no refreshes at all",
2869                );
2870                let first_refresh = initial_as_of
2871                    .round_up(&self.refresh_schedule)
2872                    .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2873                soft_assert_or_log!(
2874                    first_refresh == *initial_as_of,
2875                    "initial_as_of should be set to the first refresh"
2876                );
2877                self.next_refresh = first_refresh.into();
2878            } else {
2879                // The first refresh has already happened.
2880                let write_frontier = write_frontier.as_option().expect("checked above");
2881                self.last_completed_refresh = write_frontier
2882                    .round_down_minus_1(&self.refresh_schedule)
2883                    .map_or_else(
2884                        || {
2885                            soft_panic_or_log!(
2886                                "rounding down should have returned the first refresh or later"
2887                            );
2888                            Datum::Null
2889                        },
2890                        |last_completed_refresh| last_completed_refresh.into(),
2891                    );
2892                self.next_refresh = write_frontier.clone().into();
2893            }
2894        }
2895    }
2896}
2897
2898/// A note of an outstanding peek response.
2899#[derive(Debug)]
2900struct PendingPeek<T: Timestamp> {
2901    /// For replica-targeted peeks, this specifies the replica whose response we should pass on.
2902    ///
2903    /// If this value is `None`, we pass on the first response.
2904    target_replica: Option<ReplicaId>,
2905    /// The OpenTelemetry context for this peek.
2906    otel_ctx: OpenTelemetryContext,
2907    /// The time at which the peek was requested.
2908    ///
2909    /// Used to track peek durations.
2910    requested_at: Instant,
2911    /// The read hold installed to serve this peek.
2912    read_hold: ReadHold<T>,
2913    /// The channel to send peek results.
2914    peek_response_tx: oneshot::Sender<PeekResponse>,
2915    /// An optional limit of the peek's result size.
2916    limit: Option<usize>,
2917    /// The offset into the peek's result.
2918    offset: usize,
2919}
2920
2921#[derive(Debug, Clone)]
2922struct ActiveSubscribe<T> {
2923    /// Current upper frontier of this subscribe.
2924    frontier: Antichain<T>,
2925    /// For replica-targeted subscribes, this specifies the replica whose responses we should pass on.
2926    ///
2927    /// If this value is `None`, we pass on the first response for each time slice.
2928    target_replica: Option<ReplicaId>,
2929}
2930
2931impl<T: ComputeControllerTimestamp> ActiveSubscribe<T> {
2932    fn new(target_replica: Option<ReplicaId>) -> Self {
2933        Self {
2934            frontier: Antichain::from_elem(T::minimum()),
2935            target_replica,
2936        }
2937    }
2938}
2939
2940/// State maintained about individual replicas.
2941#[derive(Debug)]
2942struct ReplicaState<T: ComputeControllerTimestamp> {
2943    /// The ID of the replica.
2944    id: ReplicaId,
2945    /// Client for the running replica task.
2946    client: ReplicaClient<T>,
2947    /// The replica configuration.
2948    config: ReplicaConfig,
2949    /// Replica metrics.
2950    metrics: ReplicaMetrics,
2951    /// A channel through which introspection updates are delivered.
2952    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2953    /// Per-replica collection state.
2954    collections: BTreeMap<GlobalId, ReplicaCollectionState<T>>,
2955    /// The epoch of the replica.
2956    epoch: u64,
2957}
2958
2959impl<T: ComputeControllerTimestamp> ReplicaState<T> {
2960    fn new(
2961        id: ReplicaId,
2962        client: ReplicaClient<T>,
2963        config: ReplicaConfig,
2964        metrics: ReplicaMetrics,
2965        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2966        epoch: u64,
2967    ) -> Self {
2968        Self {
2969            id,
2970            client,
2971            config,
2972            metrics,
2973            introspection_tx,
2974            epoch,
2975            collections: Default::default(),
2976        }
2977    }
2978
2979    /// Add a collection to the replica state.
2980    ///
2981    /// # Panics
2982    ///
2983    /// Panics if a collection with the same ID exists already.
2984    fn add_collection(
2985        &mut self,
2986        id: GlobalId,
2987        as_of: Antichain<T>,
2988        input_read_holds: Vec<ReadHold<T>>,
2989    ) {
2990        let metrics = self.metrics.for_collection(id);
2991        let introspection = ReplicaCollectionIntrospection::new(
2992            self.id,
2993            id,
2994            self.introspection_tx.clone(),
2995            as_of.clone(),
2996        );
2997        let mut state =
2998            ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
2999
3000        // In an effort to keep the produced wallclock lag introspection data small and
3001        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
3002        // select indexes and subscribes.
3003        if id.is_transient() {
3004            state.wallclock_lag_max = None;
3005        }
3006
3007        if let Some(previous) = self.collections.insert(id, state) {
3008            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
3009        }
3010    }
3011
3012    /// Remove state for a collection.
3013    fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState<T>> {
3014        self.collections.remove(&id)
3015    }
3016
3017    /// Returns whether all replica frontiers of the given collection are empty.
3018    fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
3019        self.collections.get(&id).map_or(true, |c| {
3020            c.write_frontier.is_empty()
3021                && c.input_frontier.is_empty()
3022                && c.output_frontier.is_empty()
3023        })
3024    }
3025
3026    /// Returns the state of the [`ReplicaState`] formatted as JSON.
3027    ///
3028    /// The returned value is not guaranteed to be stable and may change at any point in time.
3029    #[mz_ore::instrument(level = "debug")]
3030    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3031        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
3032        // returned object as a tradeoff between usability and stability. `serde_json` will fail
3033        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
3034        // prevents a future unrelated change from silently breaking this method.
3035
3036        // Destructure `self` here so we don't forget to consider dumping newly added fields.
3037        let Self {
3038            id,
3039            client: _,
3040            config: _,
3041            metrics: _,
3042            introspection_tx: _,
3043            epoch,
3044            collections,
3045        } = self;
3046
3047        fn field(
3048            key: &str,
3049            value: impl Serialize,
3050        ) -> Result<(String, serde_json::Value), anyhow::Error> {
3051            let value = serde_json::to_value(value)?;
3052            Ok((key.to_string(), value))
3053        }
3054
3055        let collections: BTreeMap<_, _> = collections
3056            .iter()
3057            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
3058            .collect();
3059
3060        let map = serde_json::Map::from_iter([
3061            field("id", id.to_string())?,
3062            field("collections", collections)?,
3063            field("epoch", epoch)?,
3064        ]);
3065        Ok(serde_json::Value::Object(map))
3066    }
3067}
3068
3069#[derive(Debug)]
3070struct ReplicaCollectionState<T: ComputeControllerTimestamp> {
3071    /// The replica write frontier of this collection.
3072    ///
3073    /// See [`FrontiersResponse::write_frontier`].
3074    write_frontier: Antichain<T>,
3075    /// The replica input frontier of this collection.
3076    ///
3077    /// See [`FrontiersResponse::input_frontier`].
3078    input_frontier: Antichain<T>,
3079    /// The replica output frontier of this collection.
3080    ///
3081    /// See [`FrontiersResponse::output_frontier`].
3082    output_frontier: Antichain<T>,
3083
3084    /// Metrics tracked for this collection.
3085    ///
3086    /// If this is `None`, no metrics are collected.
3087    metrics: Option<ReplicaCollectionMetrics>,
3088    /// As-of frontier with which this collection was installed on the replica.
3089    as_of: Antichain<T>,
3090    /// Tracks introspection state for this collection.
3091    introspection: ReplicaCollectionIntrospection<T>,
3092    /// Read holds on storage inputs to this collection.
3093    ///
3094    /// These read holds are kept to ensure that the replica is able to read from storage inputs at
3095    /// all times it hasn't read yet. We only need to install read holds for storage inputs since
3096    /// compaction of compute inputs is implicitly held back by Timely/DD.
3097    input_read_holds: Vec<ReadHold<T>>,
3098
3099    /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
3100    ///
3101    /// If this is `None`, wallclock lag is not tracked for this collection.
3102    wallclock_lag_max: Option<WallclockLag>,
3103}
3104
3105impl<T: ComputeControllerTimestamp> ReplicaCollectionState<T> {
3106    fn new(
3107        metrics: Option<ReplicaCollectionMetrics>,
3108        as_of: Antichain<T>,
3109        introspection: ReplicaCollectionIntrospection<T>,
3110        input_read_holds: Vec<ReadHold<T>>,
3111    ) -> Self {
3112        Self {
3113            write_frontier: as_of.clone(),
3114            input_frontier: as_of.clone(),
3115            output_frontier: as_of.clone(),
3116            metrics,
3117            as_of,
3118            introspection,
3119            input_read_holds,
3120            wallclock_lag_max: Some(WallclockLag::MIN),
3121        }
3122    }
3123
3124    /// Returns whether this collection is hydrated.
3125    fn hydrated(&self) -> bool {
3126        // If the observed frontier is greater than the collection's as-of, the collection has
3127        // produced some output and is therefore hydrated.
3128        //
3129        // We need to consider the edge case where the as-of is the empty frontier. Such an as-of
3130        // is not useful for indexes, because they wouldn't be readable. For write-only
3131        // collections, an empty as-of means that the collection has been fully written and no new
3132        // dataflow needs to be created for it. Consequently, no hydration will happen either.
3133        //
3134        // Based on this, we could respond in two ways:
3135        //  * `false`, as in "the dataflow was never created"
3136        //  * `true`, as in "the dataflow completed immediately"
3137        //
3138        // Since hydration is often used as a measure of dataflow progress and we don't want to
3139        // give the impression that certain dataflows are somehow stuck when they are not, we go
3140        // with the second interpretation here.
3141        self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3142    }
3143
3144    /// Updates the replica write frontier of this collection.
3145    fn update_write_frontier(&mut self, new_frontier: Antichain<T>) {
3146        if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3147            soft_panic_or_log!(
3148                "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3149                self.write_frontier,
3150            );
3151            return;
3152        } else if new_frontier == self.write_frontier {
3153            return;
3154        }
3155
3156        self.write_frontier = new_frontier;
3157    }
3158
3159    /// Updates the replica input frontier of this collection.
3160    fn update_input_frontier(&mut self, new_frontier: Antichain<T>) {
3161        if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3162            soft_panic_or_log!(
3163                "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3164                self.input_frontier,
3165            );
3166            return;
3167        } else if new_frontier == self.input_frontier {
3168            return;
3169        }
3170
3171        self.input_frontier = new_frontier;
3172
3173        // Relax our read holds on collection inputs.
3174        for read_hold in &mut self.input_read_holds {
3175            let result = read_hold.try_downgrade(self.input_frontier.clone());
3176            soft_assert_or_log!(
3177                result.is_ok(),
3178                "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3179                self.input_frontier,
3180            );
3181        }
3182    }
3183
3184    /// Updates the replica output frontier of this collection.
3185    fn update_output_frontier(&mut self, new_frontier: Antichain<T>) {
3186        if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3187            soft_panic_or_log!(
3188                "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3189                self.output_frontier,
3190            );
3191            return;
3192        } else if new_frontier == self.output_frontier {
3193            return;
3194        }
3195
3196        self.output_frontier = new_frontier;
3197    }
3198}
3199
3200/// Maintains the introspection state for a given replica and collection, and ensures that reported
3201/// introspection data is retracted when the collection is dropped.
3202#[derive(Debug)]
3203struct ReplicaCollectionIntrospection<T: ComputeControllerTimestamp> {
3204    /// The ID of the replica.
3205    replica_id: ReplicaId,
3206    /// The ID of the compute collection.
3207    collection_id: GlobalId,
3208    /// Operator-level hydration state.
3209    /// (lir_id, worker_id) -> hydrated
3210    operators: BTreeMap<(LirId, usize), bool>,
3211    /// The collection's reported replica write frontier.
3212    write_frontier: Antichain<T>,
3213    /// A channel through which introspection updates are delivered.
3214    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3215}
3216
3217impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
3218    /// Create a new `HydrationState` and initialize introspection.
3219    fn new(
3220        replica_id: ReplicaId,
3221        collection_id: GlobalId,
3222        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3223        as_of: Antichain<T>,
3224    ) -> Self {
3225        let self_ = Self {
3226            replica_id,
3227            collection_id,
3228            operators: Default::default(),
3229            write_frontier: as_of,
3230            introspection_tx,
3231        };
3232
3233        self_.report_initial_state();
3234        self_
3235    }
3236
3237    /// Reports the initial introspection state.
3238    fn report_initial_state(&self) {
3239        let row = self.write_frontier_row();
3240        let updates = vec![(row, Diff::ONE)];
3241        self.send(IntrospectionType::ReplicaFrontiers, updates);
3242    }
3243
3244    /// Update the given (lir_id, worker_id) pair as hydrated.
3245    fn operator_hydrated(&mut self, lir_id: LirId, worker_id: usize, hydrated: bool) {
3246        let retraction = self.operator_hydration_row(lir_id, worker_id);
3247        self.operators.insert((lir_id, worker_id), hydrated);
3248        let insertion = self.operator_hydration_row(lir_id, worker_id);
3249
3250        if retraction == insertion {
3251            return; // no change
3252        }
3253
3254        let updates = retraction
3255            .map(|r| (r, Diff::MINUS_ONE))
3256            .into_iter()
3257            .chain(insertion.map(|r| (r, Diff::ONE)))
3258            .collect();
3259        self.send(IntrospectionType::ComputeOperatorHydrationStatus, updates);
3260    }
3261
3262    /// Return a `Row` reflecting the current hydration status of the identified operator.
3263    ///
3264    /// Returns `None` if the identified operator is not tracked.
3265    fn operator_hydration_row(&self, lir_id: LirId, worker_id: usize) -> Option<Row> {
3266        self.operators.get(&(lir_id, worker_id)).map(|hydrated| {
3267            Row::pack_slice(&[
3268                Datum::String(&self.collection_id.to_string()),
3269                Datum::UInt64(lir_id.into()),
3270                Datum::String(&self.replica_id.to_string()),
3271                Datum::UInt64(u64::cast_from(worker_id)),
3272                Datum::from(*hydrated),
3273            ])
3274        })
3275    }
3276
3277    /// Observe the given current write frontier and update the introspection state as necessary.
3278    fn observe_frontier(&mut self, write_frontier: &Antichain<T>) {
3279        if self.write_frontier == *write_frontier {
3280            return; // no change
3281        }
3282
3283        let retraction = self.write_frontier_row();
3284        self.write_frontier.clone_from(write_frontier);
3285        let insertion = self.write_frontier_row();
3286
3287        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3288        self.send(IntrospectionType::ReplicaFrontiers, updates);
3289    }
3290
3291    /// Return a `Row` reflecting the current replica write frontier.
3292    fn write_frontier_row(&self) -> Row {
3293        let write_frontier = self
3294            .write_frontier
3295            .as_option()
3296            .map_or(Datum::Null, |ts| ts.clone().into());
3297        Row::pack_slice(&[
3298            Datum::String(&self.collection_id.to_string()),
3299            Datum::String(&self.replica_id.to_string()),
3300            write_frontier,
3301        ])
3302    }
3303
3304    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3305        // Failure to send means the `ComputeController` has been dropped and doesn't care about
3306        // introspection updates anymore.
3307        let _ = self.introspection_tx.send((introspection_type, updates));
3308    }
3309}
3310
3311impl<T: ComputeControllerTimestamp> Drop for ReplicaCollectionIntrospection<T> {
3312    fn drop(&mut self) {
3313        // Retract operator hydration status.
3314        let operators: Vec<_> = self.operators.keys().collect();
3315        let updates: Vec<_> = operators
3316            .into_iter()
3317            .flat_map(|(lir_id, worker_id)| self.operator_hydration_row(*lir_id, *worker_id))
3318            .map(|r| (r, Diff::MINUS_ONE))
3319            .collect();
3320        if !updates.is_empty() {
3321            self.send(IntrospectionType::ComputeOperatorHydrationStatus, updates);
3322        }
3323
3324        // Retract the write frontier.
3325        let row = self.write_frontier_row();
3326        let updates = vec![(row, Diff::MINUS_ONE)];
3327        self.send(IntrospectionType::ReplicaFrontiers, updates);
3328    }
3329}