mz_compute_client/controller/
instance.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller for a compute instance.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::ComputeInstanceId;
21use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
22use mz_compute_types::plan::render_plan::RenderPlan;
23use mz_compute_types::sinks::{
24    ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, MaterializedViewSinkConnection,
25};
26use mz_compute_types::sources::SourceInstanceDesc;
27use mz_controller_types::dyncfgs::{
28    ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, WALLCLOCK_LAG_RECORDING_INTERVAL,
29};
30use mz_dyncfg::ConfigSet;
31use mz_expr::RowSetFinishing;
32use mz_ore::cast::CastFrom;
33use mz_ore::channel::instrumented_unbounded_channel;
34use mz_ore::now::NowFn;
35use mz_ore::tracing::OpenTelemetryContext;
36use mz_ore::{soft_assert_or_log, soft_panic_or_log};
37use mz_persist_types::PersistLocation;
38use mz_repr::adt::timestamp::CheckedTimestamp;
39use mz_repr::refresh_schedule::RefreshSchedule;
40use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row};
41use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
42use mz_storage_types::read_holds::{self, ReadHold};
43use mz_storage_types::read_policy::ReadPolicy;
44use thiserror::Error;
45use timely::PartialOrder;
46use timely::progress::frontier::MutableAntichain;
47use timely::progress::{Antichain, ChangeBatch, Timestamp};
48use tokio::sync::mpsc::error::SendError;
49use tokio::sync::{mpsc, oneshot};
50use tracing::debug_span;
51use uuid::Uuid;
52
53use crate::controller::error::{
54    CollectionLookupError, CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
55};
56use crate::controller::replica::{ReplicaClient, ReplicaConfig};
57use crate::controller::{
58    ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, PeekNotification,
59    ReplicaId, StorageCollections,
60};
61use crate::logging::LogVariant;
62use crate::metrics::IntCounter;
63use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
64use crate::protocol::command::{
65    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
66};
67use crate::protocol::history::ComputeCommandHistory;
68use crate::protocol::response::{
69    ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StatusResponse,
70    SubscribeBatch, SubscribeResponse,
71};
72
73#[derive(Error, Debug)]
74#[error("replica exists already: {0}")]
75pub(super) struct ReplicaExists(pub ReplicaId);
76
77#[derive(Error, Debug)]
78#[error("replica does not exist: {0}")]
79pub(super) struct ReplicaMissing(pub ReplicaId);
80
81#[derive(Error, Debug)]
82pub(super) enum DataflowCreationError {
83    #[error("collection does not exist: {0}")]
84    CollectionMissing(GlobalId),
85    #[error("replica does not exist: {0}")]
86    ReplicaMissing(ReplicaId),
87    #[error("dataflow definition lacks an as_of value")]
88    MissingAsOf,
89    #[error("subscribe dataflow has an empty as_of")]
90    EmptyAsOfForSubscribe,
91    #[error("copy to dataflow has an empty as_of")]
92    EmptyAsOfForCopyTo,
93    #[error("no read hold provided for dataflow import: {0}")]
94    ReadHoldMissing(GlobalId),
95    #[error("insufficient read hold provided for dataflow import: {0}")]
96    ReadHoldInsufficient(GlobalId),
97}
98
99impl From<CollectionMissing> for DataflowCreationError {
100    fn from(error: CollectionMissing) -> Self {
101        Self::CollectionMissing(error.0)
102    }
103}
104
105#[derive(Error, Debug)]
106#[error("the instance has shut down")]
107pub(super) struct InstanceShutDown;
108
109/// Errors arising during peek processing.
110#[derive(Error, Debug)]
111pub enum PeekError {
112    /// The replica that the peek was issued against does not exist.
113    #[error("replica does not exist: {0}")]
114    ReplicaMissing(ReplicaId),
115    /// The read hold that was passed in is against the wrong collection.
116    #[error("read hold ID does not match peeked collection: {0}")]
117    ReadHoldIdMismatch(GlobalId),
118    /// The read hold that was passed in is for a later time than the peek's timestamp.
119    #[error("insufficient read hold provided: {0}")]
120    ReadHoldInsufficient(GlobalId),
121    /// The peek's target instance has shut down.
122    #[error("the instance has shut down")]
123    InstanceShutDown,
124}
125
126impl From<InstanceShutDown> for PeekError {
127    fn from(_error: InstanceShutDown) -> Self {
128        Self::InstanceShutDown
129    }
130}
131
132#[derive(Error, Debug)]
133pub(super) enum ReadPolicyError {
134    #[error("collection does not exist: {0}")]
135    CollectionMissing(GlobalId),
136    #[error("collection is write-only: {0}")]
137    WriteOnlyCollection(GlobalId),
138}
139
140impl From<CollectionMissing> for ReadPolicyError {
141    fn from(error: CollectionMissing) -> Self {
142        Self::CollectionMissing(error.0)
143    }
144}
145
146/// A command sent to an [`Instance`] task.
147type Command<T> = Box<dyn FnOnce(&mut Instance<T>) + Send>;
148
149/// A client for an `Instance` task.
150#[derive(Clone, derivative::Derivative)]
151#[derivative(Debug)]
152pub struct Client<T: ComputeControllerTimestamp> {
153    /// A sender for commands for the instance.
154    command_tx: mpsc::UnboundedSender<Command<T>>,
155    /// A sender for read hold changes for collections installed on the instance.
156    #[derivative(Debug = "ignore")]
157    read_hold_tx: read_holds::ChangeTx<T>,
158}
159
160impl<T: ComputeControllerTimestamp> Client<T> {
161    pub(super) fn read_hold_tx(&self) -> read_holds::ChangeTx<T> {
162        Arc::clone(&self.read_hold_tx)
163    }
164
165    /// Call a method to be run on the instance task, by sending a message to the instance.
166    /// Does not wait for a response message.
167    pub(super) fn call<F>(&self, f: F) -> Result<(), InstanceShutDown>
168    where
169        F: FnOnce(&mut Instance<T>) + Send + 'static,
170    {
171        let otel_ctx = OpenTelemetryContext::obtain();
172        self.command_tx
173            .send(Box::new(move |instance| {
174                let _span = debug_span!("instance::call").entered();
175                otel_ctx.attach_as_parent();
176
177                f(instance)
178            }))
179            .map_err(|_send_error| InstanceShutDown)
180    }
181
182    /// Call a method to be run on the instance task, by sending a message to the instance and
183    /// waiting for a response message.
184    pub(super) async fn call_sync<F, R>(&self, f: F) -> Result<R, InstanceShutDown>
185    where
186        F: FnOnce(&mut Instance<T>) -> R + Send + 'static,
187        R: Send + 'static,
188    {
189        let (tx, rx) = oneshot::channel();
190        let otel_ctx = OpenTelemetryContext::obtain();
191        self.command_tx
192            .send(Box::new(move |instance| {
193                let _span = debug_span!("instance::call_sync").entered();
194                otel_ctx.attach_as_parent();
195                let result = f(instance);
196                let _ = tx.send(result);
197            }))
198            .map_err(|_send_error| InstanceShutDown)?;
199
200        rx.await.map_err(|_| InstanceShutDown)
201    }
202}
203
204impl<T> Client<T>
205where
206    T: ComputeControllerTimestamp,
207{
208    pub(super) fn spawn(
209        id: ComputeInstanceId,
210        build_info: &'static BuildInfo,
211        storage: StorageCollections<T>,
212        peek_stash_persist_location: PersistLocation,
213        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
214        metrics: InstanceMetrics,
215        now: NowFn,
216        wallclock_lag: WallclockLagFn<T>,
217        dyncfg: Arc<ConfigSet>,
218        response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
219        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
220        read_only: bool,
221    ) -> Self {
222        let (command_tx, command_rx) = mpsc::unbounded_channel();
223
224        let read_hold_tx: read_holds::ChangeTx<_> = {
225            let command_tx = command_tx.clone();
226            Arc::new(move |id, change: ChangeBatch<_>| {
227                let cmd: Command<_> = {
228                    let change = change.clone();
229                    Box::new(move |i| i.apply_read_hold_change(id, change.clone()))
230                };
231                command_tx.send(cmd).map_err(|_| SendError((id, change)))
232            })
233        };
234
235        mz_ore::task::spawn(
236            || format!("compute-instance-{id}"),
237            Instance::new(
238                build_info,
239                storage,
240                peek_stash_persist_location,
241                arranged_logs,
242                metrics,
243                now,
244                wallclock_lag,
245                dyncfg,
246                command_rx,
247                response_tx,
248                Arc::clone(&read_hold_tx),
249                introspection_tx,
250                read_only,
251            )
252            .run(),
253        );
254
255        Self {
256            command_tx,
257            read_hold_tx,
258        }
259    }
260
261    /// Acquires a `ReadHold` and collection write frontier for each of the identified compute
262    /// collections.
263    pub async fn acquire_read_holds_and_collection_write_frontiers(
264        &self,
265        ids: Vec<GlobalId>,
266    ) -> Result<Vec<(GlobalId, ReadHold<T>, Antichain<T>)>, CollectionLookupError> {
267        self.call_sync(move |i| {
268            let mut result = Vec::new();
269            for id in ids.into_iter() {
270                result.push((
271                    id,
272                    i.acquire_read_hold(id)?,
273                    i.collection_write_frontier(id)?,
274                ));
275            }
276            Ok(result)
277        })
278        .await?
279    }
280
281    /// Issue a peek by calling into the instance task.
282    ///
283    /// If this returns an error, then it didn't modify any `Instance` state.
284    pub async fn peek(
285        &self,
286        peek_target: PeekTarget,
287        literal_constraints: Option<Vec<Row>>,
288        uuid: Uuid,
289        timestamp: T,
290        result_desc: RelationDesc,
291        finishing: RowSetFinishing,
292        map_filter_project: mz_expr::SafeMfpPlan,
293        target_read_hold: ReadHold<T>,
294        target_replica: Option<ReplicaId>,
295        peek_response_tx: oneshot::Sender<PeekResponse>,
296    ) -> Result<(), PeekError> {
297        self.call_sync(move |i| {
298            i.peek(
299                peek_target,
300                literal_constraints,
301                uuid,
302                timestamp,
303                result_desc,
304                finishing,
305                map_filter_project,
306                target_read_hold,
307                target_replica,
308                peek_response_tx,
309            )
310        })
311        .await?
312    }
313}
314
315/// A response from a replica, composed of a replica ID, the replica's current epoch, and the
316/// compute response itself.
317pub(super) type ReplicaResponse<T> = (ReplicaId, u64, ComputeResponse<T>);
318
319/// The state we keep for a compute instance.
320pub(super) struct Instance<T: ComputeControllerTimestamp> {
321    /// Build info for spawning replicas
322    build_info: &'static BuildInfo,
323    /// A handle providing access to storage collections.
324    storage_collections: StorageCollections<T>,
325    /// Whether instance initialization has been completed.
326    initialized: bool,
327    /// Whether this instance is in read-only mode.
328    ///
329    /// When in read-only mode, this instance will not update persistent state, such as
330    /// wallclock lag introspection.
331    read_only: bool,
332    /// The workload class of this instance.
333    ///
334    /// This is currently only used to annotate metrics.
335    workload_class: Option<String>,
336    /// The replicas of this compute instance.
337    replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
338    /// Currently installed compute collections.
339    ///
340    /// New entries are added for all collections exported from dataflows created through
341    /// [`Instance::create_dataflow`].
342    ///
343    /// Entries are removed by [`Instance::cleanup_collections`]. See that method's documentation
344    /// about the conditions for removing collection state.
345    collections: BTreeMap<GlobalId, CollectionState<T>>,
346    /// IDs of log sources maintained by this compute instance.
347    log_sources: BTreeMap<LogVariant, GlobalId>,
348    /// Currently outstanding peeks.
349    ///
350    /// New entries are added for all peeks initiated through [`Instance::peek`].
351    ///
352    /// The entry for a peek is only removed once all replicas have responded to the peek. This is
353    /// currently required to ensure all replicas have stopped reading from the peeked collection's
354    /// inputs before we allow them to compact. database-issues#4822 tracks changing this so we only have to wait
355    /// for the first peek response.
356    peeks: BTreeMap<Uuid, PendingPeek<T>>,
357    /// Currently in-progress subscribes.
358    ///
359    /// New entries are added for all subscribes exported from dataflows created through
360    /// [`Instance::create_dataflow`].
361    ///
362    /// The entry for a subscribe is removed once at least one replica has reported the subscribe
363    /// to have advanced to the empty frontier or to have been dropped, implying that no further
364    /// updates will be emitted for this subscribe.
365    ///
366    /// Note that subscribes are tracked both in `collections` and `subscribes`. `collections`
367    /// keeps track of the subscribe's upper and since frontiers and ensures appropriate read holds
368    /// on the subscribe's input. `subscribes` is only used to track which updates have been
369    /// emitted, to decide if new ones should be emitted or suppressed.
370    subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>,
371    /// Tracks all in-progress COPY TOs.
372    ///
373    /// New entries are added for all s3 oneshot sinks (corresponding to a COPY TO) exported from
374    /// dataflows created through [`Instance::create_dataflow`].
375    ///
376    /// The entry for a copy to is removed once at least one replica has finished
377    /// or the exporting collection is dropped.
378    copy_tos: BTreeSet<GlobalId>,
379    /// The command history, used when introducing new replicas or restarting existing replicas.
380    history: ComputeCommandHistory<UIntGauge, T>,
381    /// Receiver for commands to be executed.
382    command_rx: mpsc::UnboundedReceiver<Command<T>>,
383    /// Sender for responses to be delivered.
384    response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
385    /// Sender for introspection updates to be recorded.
386    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
387    /// The registry the controller uses to report metrics.
388    metrics: InstanceMetrics,
389    /// Dynamic system configuration.
390    dyncfg: Arc<ConfigSet>,
391
392    /// The persist location where we can stash large peek results.
393    peek_stash_persist_location: PersistLocation,
394
395    /// A function that produces the current wallclock time.
396    now: NowFn,
397    /// A function that computes the lag between the given time and wallclock time.
398    wallclock_lag: WallclockLagFn<T>,
399    /// The last time wallclock lag introspection was recorded.
400    wallclock_lag_last_recorded: DateTime<Utc>,
401
402    /// Sender for updates to collection read holds.
403    ///
404    /// Copies of this sender are given to [`ReadHold`]s that are created in
405    /// [`CollectionState::new`].
406    read_hold_tx: read_holds::ChangeTx<T>,
407    /// A sender for responses from replicas.
408    replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
409    /// A receiver for responses from replicas.
410    replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse<T>, IntCounter>,
411}
412
413impl<T: ComputeControllerTimestamp> Instance<T> {
414    /// Acquire a handle to the collection state associated with `id`.
415    fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, CollectionMissing> {
416        self.collections.get(&id).ok_or(CollectionMissing(id))
417    }
418
419    /// Acquire a mutable handle to the collection state associated with `id`.
420    fn collection_mut(
421        &mut self,
422        id: GlobalId,
423    ) -> Result<&mut CollectionState<T>, CollectionMissing> {
424        self.collections.get_mut(&id).ok_or(CollectionMissing(id))
425    }
426
427    /// Acquire a handle to the collection state associated with `id`.
428    ///
429    /// # Panics
430    ///
431    /// Panics if the identified collection does not exist.
432    fn expect_collection(&self, id: GlobalId) -> &CollectionState<T> {
433        self.collections.get(&id).expect("collection must exist")
434    }
435
436    /// Acquire a mutable handle to the collection state associated with `id`.
437    ///
438    /// # Panics
439    ///
440    /// Panics if the identified collection does not exist.
441    fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T> {
442        self.collections
443            .get_mut(&id)
444            .expect("collection must exist")
445    }
446
447    fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState<T>)> {
448        self.collections.iter().map(|(id, coll)| (*id, coll))
449    }
450
451    /// Add a collection to the instance state.
452    ///
453    /// # Panics
454    ///
455    /// Panics if a collection with the same ID exists already.
456    fn add_collection(
457        &mut self,
458        id: GlobalId,
459        as_of: Antichain<T>,
460        shared: SharedCollectionState<T>,
461        storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
462        compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
463        replica_input_read_holds: Vec<ReadHold<T>>,
464        write_only: bool,
465        storage_sink: bool,
466        initial_as_of: Option<Antichain<T>>,
467        refresh_schedule: Option<RefreshSchedule>,
468    ) {
469        // Add global collection state.
470        let introspection = CollectionIntrospection::new(
471            id,
472            self.introspection_tx.clone(),
473            as_of.clone(),
474            storage_sink,
475            initial_as_of,
476            refresh_schedule,
477        );
478        let mut state = CollectionState::new(
479            id,
480            as_of.clone(),
481            shared,
482            storage_dependencies,
483            compute_dependencies,
484            Arc::clone(&self.read_hold_tx),
485            introspection,
486        );
487        // If the collection is write-only, clear its read policy to reflect that.
488        if write_only {
489            state.read_policy = None;
490        }
491
492        if let Some(previous) = self.collections.insert(id, state) {
493            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
494        }
495
496        // Add per-replica collection state.
497        for replica in self.replicas.values_mut() {
498            replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
499        }
500
501        // Update introspection.
502        self.report_dependency_updates(id, Diff::ONE);
503    }
504
505    fn remove_collection(&mut self, id: GlobalId) {
506        // Update introspection.
507        self.report_dependency_updates(id, Diff::MINUS_ONE);
508
509        // Remove per-replica collection state.
510        for replica in self.replicas.values_mut() {
511            replica.remove_collection(id);
512        }
513
514        // Remove global collection state.
515        self.collections.remove(&id);
516    }
517
518    fn add_replica_state(
519        &mut self,
520        id: ReplicaId,
521        client: ReplicaClient<T>,
522        config: ReplicaConfig,
523        epoch: u64,
524    ) {
525        let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
526
527        let metrics = self.metrics.for_replica(id);
528        let mut replica = ReplicaState::new(
529            id,
530            client,
531            config,
532            metrics,
533            self.introspection_tx.clone(),
534            epoch,
535        );
536
537        // Add per-replica collection state.
538        for (collection_id, collection) in &self.collections {
539            // Skip log collections not maintained by this replica.
540            if collection.log_collection && !log_ids.contains(collection_id) {
541                continue;
542            }
543
544            let as_of = if collection.log_collection {
545                // For log collections, we don't send a `CreateDataflow` command to the replica, so
546                // it doesn't know which as-of the controler chose and defaults to the minimum
547                // frontier instead. We need to initialize the controller-side tracking with the
548                // same frontier, to avoid observing regressions in the reported frontiers.
549                Antichain::from_elem(T::minimum())
550            } else {
551                collection.read_frontier().to_owned()
552            };
553
554            let input_read_holds = collection.storage_dependencies.values().cloned().collect();
555            replica.add_collection(*collection_id, as_of, input_read_holds);
556        }
557
558        self.replicas.insert(id, replica);
559    }
560
561    /// Enqueue the given response for delivery to the controller clients.
562    fn deliver_response(&self, response: ComputeControllerResponse<T>) {
563        // Failure to send means the `ComputeController` has been dropped and doesn't care about
564        // responses anymore.
565        let _ = self.response_tx.send(response);
566    }
567
568    /// Enqueue the given introspection updates for recording.
569    fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
570        // Failure to send means the `ComputeController` has been dropped and doesn't care about
571        // introspection updates anymore.
572        let _ = self.introspection_tx.send((type_, updates));
573    }
574
575    /// Returns whether the identified replica exists.
576    fn replica_exists(&self, id: ReplicaId) -> bool {
577        self.replicas.contains_key(&id)
578    }
579
580    /// Return the IDs of pending peeks targeting the specified replica.
581    fn peeks_targeting(
582        &self,
583        replica_id: ReplicaId,
584    ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)> {
585        self.peeks.iter().filter_map(move |(uuid, peek)| {
586            if peek.target_replica == Some(replica_id) {
587                Some((*uuid, peek))
588            } else {
589                None
590            }
591        })
592    }
593
594    /// Return the IDs of in-progress subscribes targeting the specified replica.
595    fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
596        self.subscribes.iter().filter_map(move |(id, subscribe)| {
597            let targeting = subscribe.target_replica == Some(replica_id);
598            targeting.then_some(*id)
599        })
600    }
601
602    /// Update introspection with the current collection frontiers.
603    ///
604    /// We could also do this directly in response to frontier changes, but doing it periodically
605    /// lets us avoid emitting some introspection updates that can be consolidated (e.g. a write
606    /// frontier updated immediately followed by a read frontier update).
607    ///
608    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
609    /// per second during normal operation.
610    fn update_frontier_introspection(&mut self) {
611        for collection in self.collections.values_mut() {
612            collection
613                .introspection
614                .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
615        }
616
617        for replica in self.replicas.values_mut() {
618            for collection in replica.collections.values_mut() {
619                collection
620                    .introspection
621                    .observe_frontier(&collection.write_frontier);
622            }
623        }
624    }
625
626    /// Refresh the controller state metrics for this instance.
627    ///
628    /// We could also do state metric updates directly in response to state changes, but that would
629    /// mean littering the code with metric update calls. Encapsulating state metric maintenance in
630    /// a single method is less noisy.
631    ///
632    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
633    /// per second during normal operation.
634    fn refresh_state_metrics(&self) {
635        let unscheduled_collections_count =
636            self.collections.values().filter(|c| !c.scheduled).count();
637        let connected_replica_count = self
638            .replicas
639            .values()
640            .filter(|r| r.client.is_connected())
641            .count();
642
643        self.metrics
644            .replica_count
645            .set(u64::cast_from(self.replicas.len()));
646        self.metrics
647            .collection_count
648            .set(u64::cast_from(self.collections.len()));
649        self.metrics
650            .collection_unscheduled_count
651            .set(u64::cast_from(unscheduled_collections_count));
652        self.metrics
653            .peek_count
654            .set(u64::cast_from(self.peeks.len()));
655        self.metrics
656            .subscribe_count
657            .set(u64::cast_from(self.subscribes.len()));
658        self.metrics
659            .copy_to_count
660            .set(u64::cast_from(self.copy_tos.len()));
661        self.metrics
662            .connected_replica_count
663            .set(u64::cast_from(connected_replica_count));
664    }
665
666    /// Refresh the wallclock lag introspection and metrics with the current lag values.
667    ///
668    /// This method produces wallclock lag metrics of two different shapes:
669    ///
670    /// * Histories: For each replica and each collection, we measure the lag of the write frontier
671    ///   behind the wallclock time every second. Every minute we emit the maximum lag observed
672    ///   over the last minute, together with the current time.
673    /// * Histograms: For each collection, we measure the lag of the write frontier behind
674    ///   wallclock time every second. Every minute we emit all lags observed over the last minute,
675    ///   together with the current histogram period.
676    ///
677    /// Histories are emitted to both Mz introspection and Prometheus, histograms only to
678    /// introspection. We treat lags of unreadable collections (i.e. collections that contain no
679    /// readable times) as undefined and set them to NULL in introspection and `u64::MAX` in
680    /// Prometheus.
681    ///
682    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
683    /// per second during normal operation.
684    fn refresh_wallclock_lag(&mut self) {
685        let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
686            Some(ts) => (self.wallclock_lag)(ts.clone()),
687            None => Duration::ZERO,
688        };
689
690        let now_ms = (self.now)();
691        let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
692        let histogram_labels = match &self.workload_class {
693            Some(wc) => [("workload_class", wc.clone())].into(),
694            None => BTreeMap::new(),
695        };
696
697        // First, iterate over all collections and collect histogram measurements.
698        // We keep a record of unreadable collections, so we can emit undefined lags for those here
699        // and below when we collect history measurements.
700        let mut unreadable_collections = BTreeSet::new();
701        for (id, collection) in &mut self.collections {
702            // We need to ask the storage controller for the read frontiers of storage collections.
703            let read_frontier = match self.storage_collections.collection_frontiers(*id) {
704                Ok(f) => f.read_capabilities,
705                Err(_) => collection.read_frontier(),
706            };
707            let write_frontier = collection.write_frontier();
708            let collection_unreadable = PartialOrder::less_equal(&write_frontier, &read_frontier);
709            if collection_unreadable {
710                unreadable_collections.insert(id);
711            }
712
713            if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
714                let bucket = if collection_unreadable {
715                    WallclockLag::Undefined
716                } else {
717                    let lag = frontier_lag(&write_frontier);
718                    let lag = lag.as_secs().next_power_of_two();
719                    WallclockLag::Seconds(lag)
720                };
721
722                let key = (histogram_period, bucket, histogram_labels.clone());
723                *stash.entry(key).or_default() += Diff::ONE;
724            }
725        }
726
727        // Second, iterate over all per-replica collections and collect history measurements.
728        for replica in self.replicas.values_mut() {
729            for (id, collection) in &mut replica.collections {
730                let lag = if unreadable_collections.contains(&id) {
731                    WallclockLag::Undefined
732                } else {
733                    let lag = frontier_lag(&collection.write_frontier);
734                    WallclockLag::Seconds(lag.as_secs())
735                };
736
737                if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
738                    *wallclock_lag_max = (*wallclock_lag_max).max(lag);
739                }
740
741                if let Some(metrics) = &mut collection.metrics {
742                    // No way to specify values as undefined in Prometheus metrics, so we use the
743                    // maximum value instead.
744                    let secs = lag.unwrap_seconds_or(u64::MAX);
745                    metrics.wallclock_lag.observe(secs);
746                };
747            }
748        }
749
750        // Record lags to persist, if it's time.
751        self.maybe_record_wallclock_lag();
752    }
753
754    /// Produce new wallclock lag introspection updates, provided enough time has passed since the
755    /// last recording.
756    //
757    /// We emit new introspection updates if the system time has passed into a new multiple of the
758    /// recording interval (typically 1 minute) since the last refresh. The storage controller uses
759    /// the same approach, ensuring that both controllers commit their lags at roughly the same
760    /// time, avoiding confusion caused by inconsistencies.
761    fn maybe_record_wallclock_lag(&mut self) {
762        if self.read_only {
763            return;
764        }
765
766        let duration_trunc = |datetime: DateTime<_>, interval| {
767            let td = TimeDelta::from_std(interval).ok()?;
768            datetime.duration_trunc(td).ok()
769        };
770
771        let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
772        let now_dt = mz_ore::now::to_datetime((self.now)());
773        let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
774            soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
775            let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
776            duration_trunc(now_dt, *default).unwrap()
777        });
778        if now_trunc <= self.wallclock_lag_last_recorded {
779            return;
780        }
781
782        let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
783
784        let mut history_updates = Vec::new();
785        for (replica_id, replica) in &mut self.replicas {
786            for (collection_id, collection) in &mut replica.collections {
787                let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
788                    continue;
789                };
790
791                let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
792                let row = Row::pack_slice(&[
793                    Datum::String(&collection_id.to_string()),
794                    Datum::String(&replica_id.to_string()),
795                    max_lag.into_interval_datum(),
796                    Datum::TimestampTz(now_ts),
797                ]);
798                history_updates.push((row, Diff::ONE));
799            }
800        }
801        if !history_updates.is_empty() {
802            self.deliver_introspection_updates(
803                IntrospectionType::WallclockLagHistory,
804                history_updates,
805            );
806        }
807
808        let mut histogram_updates = Vec::new();
809        let mut row_buf = Row::default();
810        for (collection_id, collection) in &mut self.collections {
811            let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
812                continue;
813            };
814
815            for ((period, lag, labels), count) in std::mem::take(stash) {
816                let mut packer = row_buf.packer();
817                packer.extend([
818                    Datum::TimestampTz(period.start),
819                    Datum::TimestampTz(period.end),
820                    Datum::String(&collection_id.to_string()),
821                    lag.into_uint64_datum(),
822                ]);
823                let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
824                packer.push_dict(labels);
825
826                histogram_updates.push((row_buf.clone(), count));
827            }
828        }
829        if !histogram_updates.is_empty() {
830            self.deliver_introspection_updates(
831                IntrospectionType::WallclockLagHistogram,
832                histogram_updates,
833            );
834        }
835
836        self.wallclock_lag_last_recorded = now_trunc;
837    }
838
839    /// Report updates (inserts or retractions) to the identified collection's dependencies.
840    ///
841    /// # Panics
842    ///
843    /// Panics if the identified collection does not exist.
844    fn report_dependency_updates(&self, id: GlobalId, diff: Diff) {
845        let collection = self.expect_collection(id);
846        let dependencies = collection.dependency_ids();
847
848        let updates = dependencies
849            .map(|dependency_id| {
850                let row = Row::pack_slice(&[
851                    Datum::String(&id.to_string()),
852                    Datum::String(&dependency_id.to_string()),
853                ]);
854                (row, diff)
855            })
856            .collect();
857
858        self.deliver_introspection_updates(IntrospectionType::ComputeDependencies, updates);
859    }
860
861    /// Returns `true` if the given collection is hydrated on at least one
862    /// replica.
863    ///
864    /// This also returns `true` in case this cluster does not have any
865    /// replicas.
866    #[mz_ore::instrument(level = "debug")]
867    pub fn collection_hydrated(
868        &self,
869        collection_id: GlobalId,
870    ) -> Result<bool, CollectionLookupError> {
871        if self.replicas.is_empty() {
872            return Ok(true);
873        }
874
875        for replica_state in self.replicas.values() {
876            let collection_state = replica_state
877                .collections
878                .get(&collection_id)
879                .ok_or(CollectionLookupError::CollectionMissing(collection_id))?;
880
881            if collection_state.hydrated() {
882                return Ok(true);
883            }
884        }
885
886        Ok(false)
887    }
888
889    /// Returns `true` if each non-transient, non-excluded collection is hydrated on at
890    /// least one replica.
891    ///
892    /// This also returns `true` in case this cluster does not have any
893    /// replicas.
894    #[mz_ore::instrument(level = "debug")]
895    pub fn collections_hydrated_on_replicas(
896        &self,
897        target_replica_ids: Option<Vec<ReplicaId>>,
898        exclude_collections: &BTreeSet<GlobalId>,
899    ) -> Result<bool, HydrationCheckBadTarget> {
900        if self.replicas.is_empty() {
901            return Ok(true);
902        }
903        let mut all_hydrated = true;
904        let target_replicas: BTreeSet<ReplicaId> = self
905            .replicas
906            .keys()
907            .filter_map(|id| match target_replica_ids {
908                None => Some(id.clone()),
909                Some(ref ids) if ids.contains(id) => Some(id.clone()),
910                Some(_) => None,
911            })
912            .collect();
913        if let Some(targets) = target_replica_ids {
914            if target_replicas.is_empty() {
915                return Err(HydrationCheckBadTarget(targets));
916            }
917        }
918
919        for (id, _collection) in self.collections_iter() {
920            if id.is_transient() || exclude_collections.contains(&id) {
921                continue;
922            }
923
924            let mut collection_hydrated = false;
925            for replica_state in self.replicas.values() {
926                if !target_replicas.contains(&replica_state.id) {
927                    continue;
928                }
929                let collection_state = replica_state
930                    .collections
931                    .get(&id)
932                    .expect("missing collection state");
933
934                if collection_state.hydrated() {
935                    collection_hydrated = true;
936                    break;
937                }
938            }
939
940            if !collection_hydrated {
941                tracing::info!("collection {id} is not hydrated on any replica");
942                all_hydrated = false;
943                // We continue with our loop instead of breaking out early, so
944                // that we log all non-hydrated replicas.
945            }
946        }
947
948        Ok(all_hydrated)
949    }
950
951    /// Clean up collection state that is not needed anymore.
952    ///
953    /// Three conditions need to be true before we can remove state for a collection:
954    ///
955    ///  1. A client must have explicitly dropped the collection. If that is not the case, clients
956    ///     can still reasonably assume that the controller knows about the collection and can
957    ///     answer queries about it.
958    ///  2. There must be no outstanding read capabilities on the collection. As long as someone
959    ///     still holds read capabilities on a collection, we need to keep it around to be able
960    ///     to properly handle downgrading of said capabilities.
961    ///  3. All replica frontiers for the collection must have advanced to the empty frontier.
962    ///     Advancement to the empty frontiers signals that replicas are done computing the
963    ///     collection and that they won't send more `ComputeResponse`s for it. As long as we might
964    ///     receive responses for a collection we want to keep it around to be able to validate and
965    ///     handle these responses.
966    fn cleanup_collections(&mut self) {
967        let to_remove: Vec<_> = self
968            .collections_iter()
969            .filter(|(id, collection)| {
970                collection.dropped
971                    && collection.shared.lock_read_capabilities(|c| c.is_empty())
972                    && self
973                        .replicas
974                        .values()
975                        .all(|r| r.collection_frontiers_empty(*id))
976            })
977            .map(|(id, _collection)| id)
978            .collect();
979
980        for id in to_remove {
981            self.remove_collection(id);
982        }
983    }
984
985    /// Returns the state of the [`Instance`] formatted as JSON.
986    ///
987    /// The returned value is not guaranteed to be stable and may change at any point in time.
988    #[mz_ore::instrument(level = "debug")]
989    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
990        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
991        // returned object as a tradeoff between usability and stability. `serde_json` will fail
992        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
993        // prevents a future unrelated change from silently breaking this method.
994
995        // Destructure `self` here so we don't forget to consider dumping newly added fields.
996        let Self {
997            build_info: _,
998            storage_collections: _,
999            peek_stash_persist_location: _,
1000            initialized,
1001            read_only,
1002            workload_class,
1003            replicas,
1004            collections,
1005            log_sources: _,
1006            peeks,
1007            subscribes,
1008            copy_tos,
1009            history: _,
1010            command_rx: _,
1011            response_tx: _,
1012            introspection_tx: _,
1013            metrics: _,
1014            dyncfg: _,
1015            now: _,
1016            wallclock_lag: _,
1017            wallclock_lag_last_recorded,
1018            read_hold_tx: _,
1019            replica_tx: _,
1020            replica_rx: _,
1021        } = self;
1022
1023        let replicas: BTreeMap<_, _> = replicas
1024            .iter()
1025            .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
1026            .collect::<Result<_, anyhow::Error>>()?;
1027        let collections: BTreeMap<_, _> = collections
1028            .iter()
1029            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
1030            .collect();
1031        let peeks: BTreeMap<_, _> = peeks
1032            .iter()
1033            .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
1034            .collect();
1035        let subscribes: BTreeMap<_, _> = subscribes
1036            .iter()
1037            .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
1038            .collect();
1039        let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
1040        let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
1041
1042        Ok(serde_json::json!({
1043            "initialized": initialized,
1044            "read_only": read_only,
1045            "workload_class": workload_class,
1046            "replicas": replicas,
1047            "collections": collections,
1048            "peeks": peeks,
1049            "subscribes": subscribes,
1050            "copy_tos": copy_tos,
1051            "wallclock_lag_last_recorded": wallclock_lag_last_recorded,
1052        }))
1053    }
1054
1055    /// Reports the current write frontier for the identified compute collection.
1056    fn collection_write_frontier(&self, id: GlobalId) -> Result<Antichain<T>, CollectionMissing> {
1057        Ok(self.collection(id)?.write_frontier())
1058    }
1059}
1060
1061impl<T> Instance<T>
1062where
1063    T: ComputeControllerTimestamp,
1064{
1065    fn new(
1066        build_info: &'static BuildInfo,
1067        storage: StorageCollections<T>,
1068        peek_stash_persist_location: PersistLocation,
1069        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
1070        metrics: InstanceMetrics,
1071        now: NowFn,
1072        wallclock_lag: WallclockLagFn<T>,
1073        dyncfg: Arc<ConfigSet>,
1074        command_rx: mpsc::UnboundedReceiver<Command<T>>,
1075        response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
1076        read_hold_tx: read_holds::ChangeTx<T>,
1077        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
1078        read_only: bool,
1079    ) -> Self {
1080        let mut collections = BTreeMap::new();
1081        let mut log_sources = BTreeMap::new();
1082        for (log, id, shared) in arranged_logs {
1083            let collection = CollectionState::new_log_collection(
1084                id,
1085                shared,
1086                Arc::clone(&read_hold_tx),
1087                introspection_tx.clone(),
1088            );
1089            collections.insert(id, collection);
1090            log_sources.insert(log, id);
1091        }
1092
1093        let history = ComputeCommandHistory::new(metrics.for_history());
1094
1095        let send_count = metrics.response_send_count.clone();
1096        let recv_count = metrics.response_recv_count.clone();
1097        let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
1098
1099        let now_dt = mz_ore::now::to_datetime(now());
1100
1101        Self {
1102            build_info,
1103            storage_collections: storage,
1104            peek_stash_persist_location,
1105            initialized: false,
1106            read_only,
1107            workload_class: None,
1108            replicas: Default::default(),
1109            collections,
1110            log_sources,
1111            peeks: Default::default(),
1112            subscribes: Default::default(),
1113            copy_tos: Default::default(),
1114            history,
1115            command_rx,
1116            response_tx,
1117            introspection_tx,
1118            metrics,
1119            dyncfg,
1120            now,
1121            wallclock_lag,
1122            wallclock_lag_last_recorded: now_dt,
1123            read_hold_tx,
1124            replica_tx,
1125            replica_rx,
1126        }
1127    }
1128
1129    async fn run(mut self) {
1130        self.send(ComputeCommand::Hello {
1131            // The nonce is protocol iteration-specific and will be set in
1132            // `ReplicaTask::specialize_command`.
1133            nonce: Uuid::default(),
1134        });
1135
1136        let instance_config = InstanceConfig {
1137            peek_stash_persist_location: self.peek_stash_persist_location.clone(),
1138            // The remaining fields are replica-specific and will be set in
1139            // `ReplicaTask::specialize_command`.
1140            logging: Default::default(),
1141            expiration_offset: Default::default(),
1142        };
1143
1144        self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
1145
1146        loop {
1147            tokio::select! {
1148                command = self.command_rx.recv() => match command {
1149                    Some(cmd) => cmd(&mut self),
1150                    None => break,
1151                },
1152                response = self.replica_rx.recv() => match response {
1153                    Some(response) => self.handle_response(response),
1154                    None => unreachable!("self owns a sender side of the channel"),
1155                }
1156            }
1157        }
1158    }
1159
1160    /// Update instance configuration.
1161    #[mz_ore::instrument(level = "debug")]
1162    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
1163        if let Some(workload_class) = &config_params.workload_class {
1164            self.workload_class = workload_class.clone();
1165        }
1166
1167        let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
1168        self.send(command);
1169    }
1170
1171    /// Marks the end of any initialization commands.
1172    ///
1173    /// Intended to be called by `Controller`, rather than by other code.
1174    /// Calling this method repeatedly has no effect.
1175    #[mz_ore::instrument(level = "debug")]
1176    pub fn initialization_complete(&mut self) {
1177        // The compute protocol requires that `InitializationComplete` is sent only once.
1178        if !self.initialized {
1179            self.send(ComputeCommand::InitializationComplete);
1180            self.initialized = true;
1181        }
1182    }
1183
1184    /// Allows collections to affect writes to external systems (persist).
1185    ///
1186    /// Calling this method repeatedly has no effect.
1187    #[mz_ore::instrument(level = "debug")]
1188    pub fn allow_writes(&mut self, collection_id: GlobalId) -> Result<(), CollectionMissing> {
1189        let collection = self.collection_mut(collection_id)?;
1190
1191        // Do not send redundant allow-writes commands.
1192        if !collection.read_only {
1193            return Ok(());
1194        }
1195
1196        // Don't send allow-writes for collections that are not installed.
1197        let as_of = collection.read_frontier();
1198
1199        // If the collection has an empty `as_of`, it was either never installed on the replica or
1200        // has since been dropped. In either case the replica does not expect any commands for it.
1201        if as_of.is_empty() {
1202            return Ok(());
1203        }
1204
1205        collection.read_only = false;
1206        self.send(ComputeCommand::AllowWrites(collection_id));
1207
1208        Ok(())
1209    }
1210
1211    /// Shut down this instance.
1212    ///
1213    /// This method runs various assertions ensuring the instance state is empty. It exists to help
1214    /// us find bugs where the client drops a compute instance that still has replicas or
1215    /// collections installed, and later assumes that said replicas/collections still exists.
1216    ///
1217    /// # Panics
1218    ///
1219    /// Panics if the compute instance still has active replicas.
1220    /// Panics if the compute instance still has collections installed.
1221    #[mz_ore::instrument(level = "debug")]
1222    pub fn shutdown(&mut self) {
1223        // Taking the `command_rx` ensures that the [`Instance::run`] loop terminates.
1224        let (_tx, rx) = mpsc::unbounded_channel();
1225        let mut command_rx = std::mem::replace(&mut self.command_rx, rx);
1226
1227        // Apply all outstanding read hold changes. This might cause read hold downgrades to be
1228        // added to `command_tx`, so we need to apply those in a loop.
1229        //
1230        // TODO(teskje): Make `Command` an enum and assert that all received commands are read
1231        // hold downgrades.
1232        while let Ok(cmd) = command_rx.try_recv() {
1233            cmd(self);
1234        }
1235
1236        // Collections might have been dropped but not cleaned up yet.
1237        self.cleanup_collections();
1238
1239        let stray_replicas: Vec<_> = self.replicas.keys().collect();
1240        soft_assert_or_log!(
1241            stray_replicas.is_empty(),
1242            "dropped instance still has provisioned replicas: {stray_replicas:?}",
1243        );
1244
1245        let collections = self.collections.iter();
1246        let stray_collections: Vec<_> = collections
1247            .filter(|(_, c)| !c.log_collection)
1248            .map(|(id, _)| id)
1249            .collect();
1250        soft_assert_or_log!(
1251            stray_collections.is_empty(),
1252            "dropped instance still has installed collections: {stray_collections:?}",
1253        );
1254    }
1255
1256    /// Sends a command to all replicas of this instance.
1257    #[mz_ore::instrument(level = "debug")]
1258    fn send(&mut self, cmd: ComputeCommand<T>) {
1259        // Record the command so that new replicas can be brought up to speed.
1260        self.history.push(cmd.clone());
1261
1262        // Clone the command for each active replica.
1263        for replica in self.replicas.values_mut() {
1264            // Swallow error, we'll notice because the replica task has stopped.
1265            let _ = replica.client.send(cmd.clone());
1266        }
1267    }
1268
1269    /// Add a new instance replica, by ID.
1270    #[mz_ore::instrument(level = "debug")]
1271    pub fn add_replica(
1272        &mut self,
1273        id: ReplicaId,
1274        mut config: ReplicaConfig,
1275        epoch: Option<u64>,
1276    ) -> Result<(), ReplicaExists> {
1277        if self.replica_exists(id) {
1278            return Err(ReplicaExists(id));
1279        }
1280
1281        config.logging.index_logs = self.log_sources.clone();
1282
1283        let epoch = epoch.unwrap_or(1);
1284        let metrics = self.metrics.for_replica(id);
1285        let client = ReplicaClient::spawn(
1286            id,
1287            self.build_info,
1288            config.clone(),
1289            epoch,
1290            metrics.clone(),
1291            Arc::clone(&self.dyncfg),
1292            self.replica_tx.clone(),
1293        );
1294
1295        // Take this opportunity to clean up the history we should present.
1296        self.history.reduce();
1297
1298        // Advance the uppers of source imports
1299        self.history.update_source_uppers(&self.storage_collections);
1300
1301        // Replay the commands at the client, creating new dataflow identifiers.
1302        for command in self.history.iter() {
1303            if client.send(command.clone()).is_err() {
1304                // We swallow the error here. On the next send, we will fail again, and
1305                // restart the connection as well as this rehydration.
1306                tracing::warn!("Replica {:?} connection terminated during hydration", id);
1307                break;
1308            }
1309        }
1310
1311        // Add replica to tracked state.
1312        self.add_replica_state(id, client, config, epoch);
1313
1314        Ok(())
1315    }
1316
1317    /// Remove an existing instance replica, by ID.
1318    #[mz_ore::instrument(level = "debug")]
1319    pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1320        self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1321
1322        // Subscribes targeting this replica either won't be served anymore (if the replica is
1323        // dropped) or might produce inconsistent output (if the target collection is an
1324        // introspection index). We produce an error to inform upstream.
1325        let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1326        for subscribe_id in to_drop {
1327            let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1328            let response = ComputeControllerResponse::SubscribeResponse(
1329                subscribe_id,
1330                SubscribeBatch {
1331                    lower: subscribe.frontier.clone(),
1332                    upper: subscribe.frontier,
1333                    updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1334                },
1335            );
1336            self.deliver_response(response);
1337        }
1338
1339        // Peeks targeting this replica might not be served anymore (if the replica is dropped).
1340        // If the replica has failed it might come back and respond to the peek later, but it still
1341        // seems like a good idea to cancel the peek to inform the caller about the failure. This
1342        // is consistent with how we handle targeted subscribes above.
1343        let mut peek_responses = Vec::new();
1344        let mut to_drop = Vec::new();
1345        for (uuid, peek) in self.peeks_targeting(id) {
1346            peek_responses.push(ComputeControllerResponse::PeekNotification(
1347                uuid,
1348                PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1349                peek.otel_ctx.clone(),
1350            ));
1351            to_drop.push(uuid);
1352        }
1353        for response in peek_responses {
1354            self.deliver_response(response);
1355        }
1356        for uuid in to_drop {
1357            let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1358            self.finish_peek(uuid, response);
1359        }
1360
1361        // We might have a chance to forward implied capabilities and reduce the cost of bringing
1362        // up the next replica, if the dropped replica was the only one in the cluster.
1363        self.forward_implied_capabilities();
1364
1365        Ok(())
1366    }
1367
1368    /// Rehydrate the given instance replica.
1369    ///
1370    /// # Panics
1371    ///
1372    /// Panics if the specified replica does not exist.
1373    fn rehydrate_replica(&mut self, id: ReplicaId) {
1374        let config = self.replicas[&id].config.clone();
1375        let epoch = self.replicas[&id].epoch + 1;
1376
1377        self.remove_replica(id).expect("replica must exist");
1378        let result = self.add_replica(id, config, Some(epoch));
1379
1380        match result {
1381            Ok(()) => (),
1382            Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1383        }
1384    }
1385
1386    /// Rehydrate any failed replicas of this instance.
1387    fn rehydrate_failed_replicas(&mut self) {
1388        let replicas = self.replicas.iter();
1389        let failed_replicas: Vec<_> = replicas
1390            .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1391            .collect();
1392
1393        for replica_id in failed_replicas {
1394            self.rehydrate_replica(replica_id);
1395        }
1396    }
1397
1398    /// Creates the described dataflow and initializes state for its output.
1399    ///
1400    /// This method expects a `DataflowDescription` with an `as_of` frontier specified, as well as
1401    /// for each imported collection a read hold in `import_read_holds` at at least the `as_of`.
1402    ///
1403    /// If a `subscribe_target_replica` is given, any subscribes exported by the dataflow are
1404    /// configured to target that replica, i.e., only subscribe responses sent by that replica are
1405    /// considered.
1406    #[mz_ore::instrument(level = "debug")]
1407    pub fn create_dataflow(
1408        &mut self,
1409        dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1410        import_read_holds: Vec<ReadHold<T>>,
1411        subscribe_target_replica: Option<ReplicaId>,
1412        mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState<T>>,
1413    ) -> Result<(), DataflowCreationError> {
1414        use DataflowCreationError::*;
1415
1416        if let Some(replica_id) = subscribe_target_replica {
1417            if !self.replica_exists(replica_id) {
1418                return Err(ReplicaMissing(replica_id));
1419            }
1420        }
1421
1422        // Simple sanity checks around `as_of`
1423        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1424        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1425            return Err(EmptyAsOfForSubscribe);
1426        }
1427        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1428            return Err(EmptyAsOfForCopyTo);
1429        }
1430
1431        // Collect all dependencies of the dataflow, and read holds on them at the `as_of`.
1432        let mut storage_dependencies = BTreeMap::new();
1433        let mut compute_dependencies = BTreeMap::new();
1434
1435        // When we install per-replica input read holds, we cannot use the `as_of` because of
1436        // reconciliation: Existing slow replicas might be reading from the inputs at times before
1437        // the `as_of` and we would rather not crash them by allowing their inputs to compact too
1438        // far. So instead we take read holds at the least time available.
1439        let mut replica_input_read_holds = Vec::new();
1440
1441        let mut import_read_holds: BTreeMap<_, _> =
1442            import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1443
1444        for &id in dataflow.source_imports.keys() {
1445            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1446            replica_input_read_holds.push(read_hold.clone());
1447
1448            read_hold
1449                .try_downgrade(as_of.clone())
1450                .map_err(|_| ReadHoldInsufficient(id))?;
1451            storage_dependencies.insert(id, read_hold);
1452        }
1453
1454        for &id in dataflow.index_imports.keys() {
1455            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1456            read_hold
1457                .try_downgrade(as_of.clone())
1458                .map_err(|_| ReadHoldInsufficient(id))?;
1459            compute_dependencies.insert(id, read_hold);
1460        }
1461
1462        // If the `as_of` is empty, we are not going to create a dataflow, so replicas won't read
1463        // from the inputs.
1464        if as_of.is_empty() {
1465            replica_input_read_holds = Default::default();
1466        }
1467
1468        // Install collection state for each of the exports.
1469        for export_id in dataflow.export_ids() {
1470            let shared = shared_collection_state
1471                .remove(&export_id)
1472                .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1473            let write_only = dataflow.sink_exports.contains_key(&export_id);
1474            let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1475
1476            self.add_collection(
1477                export_id,
1478                as_of.clone(),
1479                shared,
1480                storage_dependencies.clone(),
1481                compute_dependencies.clone(),
1482                replica_input_read_holds.clone(),
1483                write_only,
1484                storage_sink,
1485                dataflow.initial_storage_as_of.clone(),
1486                dataflow.refresh_schedule.clone(),
1487            );
1488
1489            // If the export is a storage sink, we can advance its write frontier to the write
1490            // frontier of the target storage collection.
1491            if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1492                self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1493            }
1494        }
1495
1496        // Initialize tracking of subscribes.
1497        for subscribe_id in dataflow.subscribe_ids() {
1498            self.subscribes
1499                .insert(subscribe_id, ActiveSubscribe::new(subscribe_target_replica));
1500        }
1501
1502        // Initialize tracking of copy tos.
1503        for copy_to_id in dataflow.copy_to_ids() {
1504            self.copy_tos.insert(copy_to_id);
1505        }
1506
1507        // Here we augment all imported sources and all exported sinks with the appropriate
1508        // storage metadata needed by the compute instance.
1509        let mut source_imports = BTreeMap::new();
1510        for (id, (si, monotonic, _upper)) in dataflow.source_imports {
1511            let frontiers = self
1512                .storage_collections
1513                .collection_frontiers(id)
1514                .expect("collection exists");
1515
1516            let collection_metadata = self
1517                .storage_collections
1518                .collection_metadata(id)
1519                .expect("we have a read hold on this collection");
1520
1521            let desc = SourceInstanceDesc {
1522                storage_metadata: collection_metadata.clone(),
1523                arguments: si.arguments,
1524                typ: si.typ.clone(),
1525            };
1526            source_imports.insert(id, (desc, monotonic, frontiers.write_frontier));
1527        }
1528
1529        let mut sink_exports = BTreeMap::new();
1530        for (id, se) in dataflow.sink_exports {
1531            let connection = match se.connection {
1532                ComputeSinkConnection::MaterializedView(conn) => {
1533                    let metadata = self
1534                        .storage_collections
1535                        .collection_metadata(id)
1536                        .map_err(|_| CollectionMissing(id))?
1537                        .clone();
1538                    let conn = MaterializedViewSinkConnection {
1539                        value_desc: conn.value_desc,
1540                        storage_metadata: metadata,
1541                    };
1542                    ComputeSinkConnection::MaterializedView(conn)
1543                }
1544                ComputeSinkConnection::ContinualTask(conn) => {
1545                    let metadata = self
1546                        .storage_collections
1547                        .collection_metadata(id)
1548                        .map_err(|_| DataflowCreationError::CollectionMissing(id))?
1549                        .clone();
1550                    let conn = ContinualTaskConnection {
1551                        input_id: conn.input_id,
1552                        storage_metadata: metadata,
1553                    };
1554                    ComputeSinkConnection::ContinualTask(conn)
1555                }
1556                ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1557                ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1558                    ComputeSinkConnection::CopyToS3Oneshot(conn)
1559                }
1560            };
1561            let desc = ComputeSinkDesc {
1562                from: se.from,
1563                from_desc: se.from_desc,
1564                connection,
1565                with_snapshot: se.with_snapshot,
1566                up_to: se.up_to,
1567                non_null_assertions: se.non_null_assertions,
1568                refresh_schedule: se.refresh_schedule,
1569            };
1570            sink_exports.insert(id, desc);
1571        }
1572
1573        // Flatten the dataflow plans into the representation expected by replicas.
1574        let objects_to_build = dataflow
1575            .objects_to_build
1576            .into_iter()
1577            .map(|object| BuildDesc {
1578                id: object.id,
1579                plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1580            })
1581            .collect();
1582
1583        let augmented_dataflow = DataflowDescription {
1584            source_imports,
1585            sink_exports,
1586            objects_to_build,
1587            // The rest of the fields are identical
1588            index_imports: dataflow.index_imports,
1589            index_exports: dataflow.index_exports,
1590            as_of: dataflow.as_of.clone(),
1591            until: dataflow.until,
1592            initial_storage_as_of: dataflow.initial_storage_as_of,
1593            refresh_schedule: dataflow.refresh_schedule,
1594            debug_name: dataflow.debug_name,
1595            time_dependence: dataflow.time_dependence,
1596        };
1597
1598        if augmented_dataflow.is_transient() {
1599            tracing::debug!(
1600                name = %augmented_dataflow.debug_name,
1601                import_ids = %augmented_dataflow.display_import_ids(),
1602                export_ids = %augmented_dataflow.display_export_ids(),
1603                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1604                until = ?augmented_dataflow.until.elements(),
1605                "creating dataflow",
1606            );
1607        } else {
1608            tracing::info!(
1609                name = %augmented_dataflow.debug_name,
1610                import_ids = %augmented_dataflow.display_import_ids(),
1611                export_ids = %augmented_dataflow.display_export_ids(),
1612                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1613                until = ?augmented_dataflow.until.elements(),
1614                "creating dataflow",
1615            );
1616        }
1617
1618        // Skip the actual dataflow creation for an empty `as_of`. (Happens e.g. for the
1619        // bootstrapping of a REFRESH AT mat view that is past its last refresh.)
1620        if as_of.is_empty() {
1621            tracing::info!(
1622                name = %augmented_dataflow.debug_name,
1623                "not sending `CreateDataflow`, because of empty `as_of`",
1624            );
1625        } else {
1626            let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1627            let dataflow = Box::new(augmented_dataflow);
1628            self.send(ComputeCommand::CreateDataflow(dataflow));
1629
1630            for id in collections {
1631                self.maybe_schedule_collection(id);
1632            }
1633        }
1634
1635        Ok(())
1636    }
1637
1638    /// Schedule the identified collection if all its inputs are available.
1639    ///
1640    /// # Panics
1641    ///
1642    /// Panics if the identified collection does not exist.
1643    fn maybe_schedule_collection(&mut self, id: GlobalId) {
1644        let collection = self.expect_collection(id);
1645
1646        // Don't schedule collections twice.
1647        if collection.scheduled {
1648            return;
1649        }
1650
1651        let as_of = collection.read_frontier();
1652
1653        // If the collection has an empty `as_of`, it was either never installed on the replica or
1654        // has since been dropped. In either case the replica does not expect any commands for it.
1655        if as_of.is_empty() {
1656            return;
1657        }
1658
1659        let ready = if id.is_transient() {
1660            // Always schedule transient collections immediately. The assumption is that those are
1661            // created by interactive user commands and we want to schedule them as quickly as
1662            // possible. Inputs might not yet be available, but when they become available, we
1663            // don't need to wait for the controller to become aware and for the scheduling check
1664            // to run again.
1665            true
1666        } else {
1667            // Ignore self-dependencies. Any self-dependencies do not need to be
1668            // available at the as_of for the dataflow to make progress, so we
1669            // can ignore them here. At the moment, only continual tasks have
1670            // self-dependencies, but this logic is correct for any dataflow, so
1671            // we don't special case it to CTs.
1672            let not_self_dep = |x: &GlobalId| *x != id;
1673
1674            // Check dependency frontiers to determine if all inputs are
1675            // available. An input is available when its frontier is greater
1676            // than the `as_of`, i.e., all input data up to and including the
1677            // `as_of` has been sealed.
1678            let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1679            let compute_frontiers = compute_deps.map(|id| {
1680                let dep = &self.expect_collection(id);
1681                dep.write_frontier()
1682            });
1683
1684            let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1685            let storage_frontiers = self
1686                .storage_collections
1687                .collections_frontiers(storage_deps.collect())
1688                .expect("must exist");
1689            let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1690
1691            let ready = compute_frontiers
1692                .chain(storage_frontiers)
1693                .all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1694
1695            ready
1696        };
1697
1698        if ready {
1699            self.send(ComputeCommand::Schedule(id));
1700            let collection = self.expect_collection_mut(id);
1701            collection.scheduled = true;
1702        }
1703    }
1704
1705    /// Schedule any unscheduled collections that are ready.
1706    fn schedule_collections(&mut self) {
1707        let ids: Vec<_> = self.collections.keys().copied().collect();
1708        for id in ids {
1709            self.maybe_schedule_collection(id);
1710        }
1711    }
1712
1713    /// Drops the read capability for the given collections and allows their resources to be
1714    /// reclaimed.
1715    #[mz_ore::instrument(level = "debug")]
1716    pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1717        for id in &ids {
1718            let collection = self.collection_mut(*id)?;
1719
1720            // Mark the collection as dropped to allow it to be removed from the controller state.
1721            collection.dropped = true;
1722
1723            // Drop the implied and warmup read holds to announce that clients are not
1724            // interested in the collection anymore.
1725            collection.implied_read_hold.release();
1726            collection.warmup_read_hold.release();
1727
1728            // If the collection is a subscribe, stop tracking it. This ensures that the controller
1729            // ceases to produce `SubscribeResponse`s for this subscribe.
1730            self.subscribes.remove(id);
1731            // If the collection is a copy to, stop tracking it. This ensures that the controller
1732            // ceases to produce `CopyToResponse`s` for this copy to.
1733            self.copy_tos.remove(id);
1734        }
1735
1736        Ok(())
1737    }
1738
1739    /// Initiate a peek request for the contents of `id` at `timestamp`.
1740    ///
1741    /// If this returns an error, then it didn't modify any `Instance` state.
1742    #[mz_ore::instrument(level = "debug")]
1743    pub fn peek(
1744        &mut self,
1745        peek_target: PeekTarget,
1746        literal_constraints: Option<Vec<Row>>,
1747        uuid: Uuid,
1748        timestamp: T,
1749        result_desc: RelationDesc,
1750        finishing: RowSetFinishing,
1751        map_filter_project: mz_expr::SafeMfpPlan,
1752        mut read_hold: ReadHold<T>,
1753        target_replica: Option<ReplicaId>,
1754        peek_response_tx: oneshot::Sender<PeekResponse>,
1755    ) -> Result<(), PeekError> {
1756        use PeekError::*;
1757
1758        let target_id = peek_target.id();
1759
1760        // Downgrade the provided read hold to the peek time.
1761        if read_hold.id() != target_id {
1762            return Err(ReadHoldIdMismatch(read_hold.id()));
1763        }
1764        read_hold
1765            .try_downgrade(Antichain::from_elem(timestamp.clone()))
1766            .map_err(|_| ReadHoldInsufficient(target_id))?;
1767
1768        if let Some(target) = target_replica {
1769            if !self.replica_exists(target) {
1770                return Err(ReplicaMissing(target));
1771            }
1772        }
1773
1774        let otel_ctx = OpenTelemetryContext::obtain();
1775
1776        self.peeks.insert(
1777            uuid,
1778            PendingPeek {
1779                target_replica,
1780                // TODO(guswynn): can we just hold the `tracing::Span` here instead?
1781                otel_ctx: otel_ctx.clone(),
1782                requested_at: Instant::now(),
1783                read_hold,
1784                peek_response_tx,
1785                limit: finishing.limit.map(usize::cast_from),
1786                offset: finishing.offset,
1787            },
1788        );
1789
1790        let peek = Peek {
1791            literal_constraints,
1792            uuid,
1793            timestamp,
1794            finishing,
1795            map_filter_project,
1796            // Obtain an `OpenTelemetryContext` from the thread-local tracing
1797            // tree to forward it on to the compute worker.
1798            otel_ctx,
1799            target: peek_target,
1800            result_desc,
1801        };
1802        self.send(ComputeCommand::Peek(Box::new(peek)));
1803
1804        Ok(())
1805    }
1806
1807    /// Cancels an existing peek request.
1808    #[mz_ore::instrument(level = "debug")]
1809    pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1810        let Some(peek) = self.peeks.get_mut(&uuid) else {
1811            tracing::warn!("did not find pending peek for {uuid}");
1812            return;
1813        };
1814
1815        let duration = peek.requested_at.elapsed();
1816        self.metrics
1817            .observe_peek_response(&PeekResponse::Canceled, duration);
1818
1819        // Enqueue a notification for the cancellation.
1820        let otel_ctx = peek.otel_ctx.clone();
1821        otel_ctx.attach_as_parent();
1822
1823        self.deliver_response(ComputeControllerResponse::PeekNotification(
1824            uuid,
1825            PeekNotification::Canceled,
1826            otel_ctx,
1827        ));
1828
1829        // Finish the peek.
1830        // This will also propagate the cancellation to the replicas.
1831        self.finish_peek(uuid, reason);
1832    }
1833
1834    /// Assigns a read policy to specific identifiers.
1835    ///
1836    /// The policies are assigned in the order presented, and repeated identifiers should
1837    /// conclude with the last policy. Changing a policy will immediately downgrade the read
1838    /// capability if appropriate, but it will not "recover" the read capability if the prior
1839    /// capability is already ahead of it.
1840    ///
1841    /// Identifiers not present in `policies` retain their existing read policies.
1842    ///
1843    /// It is an error to attempt to set a read policy for a collection that is not readable in the
1844    /// context of compute. At this time, only indexes are readable compute collections.
1845    #[mz_ore::instrument(level = "debug")]
1846    pub fn set_read_policy(
1847        &mut self,
1848        policies: Vec<(GlobalId, ReadPolicy<T>)>,
1849    ) -> Result<(), ReadPolicyError> {
1850        // Do error checking upfront, to avoid introducing inconsistencies between a collection's
1851        // `implied_capability` and `read_capabilities`.
1852        for (id, _policy) in &policies {
1853            let collection = self.collection(*id)?;
1854            if collection.read_policy.is_none() {
1855                return Err(ReadPolicyError::WriteOnlyCollection(*id));
1856            }
1857        }
1858
1859        for (id, new_policy) in policies {
1860            let collection = self.expect_collection_mut(id);
1861            let new_since = new_policy.frontier(collection.write_frontier().borrow());
1862            let _ = collection.implied_read_hold.try_downgrade(new_since);
1863            collection.read_policy = Some(new_policy);
1864        }
1865
1866        Ok(())
1867    }
1868
1869    /// Advance the global write frontier of the given collection.
1870    ///
1871    /// Frontier regressions are gracefully ignored.
1872    ///
1873    /// # Panics
1874    ///
1875    /// Panics if the identified collection does not exist.
1876    #[mz_ore::instrument(level = "debug")]
1877    fn maybe_update_global_write_frontier(&mut self, id: GlobalId, new_frontier: Antichain<T>) {
1878        let collection = self.expect_collection_mut(id);
1879
1880        let advanced = collection.shared.lock_write_frontier(|f| {
1881            let advanced = PartialOrder::less_than(f, &new_frontier);
1882            if advanced {
1883                f.clone_from(&new_frontier);
1884            }
1885            advanced
1886        });
1887
1888        if !advanced {
1889            return;
1890        }
1891
1892        // Relax the implied read hold according to the read policy.
1893        let new_since = match &collection.read_policy {
1894            Some(read_policy) => {
1895                // For readable collections the read frontier is determined by applying the
1896                // client-provided read policy to the write frontier.
1897                read_policy.frontier(new_frontier.borrow())
1898            }
1899            None => {
1900                // Write-only collections cannot be read within the context of the compute
1901                // controller, so their read frontier only controls the read holds taken on their
1902                // inputs. We can safely downgrade the input read holds to any time less than the
1903                // write frontier.
1904                //
1905                // Note that some write-only collections (continual tasks) need to observe changes
1906                // at their current write frontier during hydration. Thus, we cannot downgrade the
1907                // read frontier to the write frontier and instead step it back by one.
1908                Antichain::from_iter(
1909                    new_frontier
1910                        .iter()
1911                        .map(|t| t.step_back().unwrap_or_else(T::minimum)),
1912                )
1913            }
1914        };
1915        let _ = collection.implied_read_hold.try_downgrade(new_since);
1916
1917        // Report the frontier advancement.
1918        self.deliver_response(ComputeControllerResponse::FrontierUpper {
1919            id,
1920            upper: new_frontier,
1921        });
1922    }
1923
1924    /// Apply a collection read hold change.
1925    fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch<T>) {
1926        let Some(collection) = self.collections.get_mut(&id) else {
1927            soft_panic_or_log!(
1928                "read hold change for absent collection (id={id}, changes={update:?})"
1929            );
1930            return;
1931        };
1932
1933        let new_since = collection.shared.lock_read_capabilities(|caps| {
1934            // Sanity check to prevent corrupted `read_capabilities`, which can cause hard-to-debug
1935            // issues (usually stuck read frontiers).
1936            let read_frontier = caps.frontier();
1937            for (time, diff) in update.iter() {
1938                let count = caps.count_for(time) + diff;
1939                assert!(
1940                    count >= 0,
1941                    "invalid read capabilities update: negative capability \
1942             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1943                );
1944                assert!(
1945                    count == 0 || read_frontier.less_equal(time),
1946                    "invalid read capabilities update: frontier regression \
1947             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1948                );
1949            }
1950
1951            // Apply read capability updates and learn about resulting changes to the read
1952            // frontier.
1953            let changes = caps.update_iter(update.drain());
1954
1955            let changed = changes.count() > 0;
1956            changed.then(|| caps.frontier().to_owned())
1957        });
1958
1959        let Some(new_since) = new_since else {
1960            return; // read frontier did not change
1961        };
1962
1963        // Propagate read frontier update to dependencies.
1964        for read_hold in collection.compute_dependencies.values_mut() {
1965            read_hold
1966                .try_downgrade(new_since.clone())
1967                .expect("frontiers don't regress");
1968        }
1969        for read_hold in collection.storage_dependencies.values_mut() {
1970            read_hold
1971                .try_downgrade(new_since.clone())
1972                .expect("frontiers don't regress");
1973        }
1974
1975        // Produce `AllowCompaction` command.
1976        self.send(ComputeCommand::AllowCompaction {
1977            id,
1978            frontier: new_since,
1979        });
1980    }
1981
1982    /// Fulfills a registered peek and cleans up associated state.
1983    ///
1984    /// As part of this we:
1985    ///  * Send a `PeekResponse` through the peek's response channel.
1986    ///  * Emit a `CancelPeek` command to instruct replicas to stop spending resources on this
1987    ///    peek, and to allow the `ComputeCommandHistory` to reduce away the corresponding `Peek`
1988    ///    command.
1989    ///  * Remove the read hold for this peek, unblocking compaction that might have waited on it.
1990    fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1991        let Some(peek) = self.peeks.remove(&uuid) else {
1992            return;
1993        };
1994
1995        // The recipient might not be interested in the peek response anymore, which is fine.
1996        let _ = peek.peek_response_tx.send(response);
1997
1998        // NOTE: We need to send the `CancelPeek` command _before_ we release the peek's read hold
1999        // (by dropping it), to avoid the edge case that caused database-issues#4812.
2000        self.send(ComputeCommand::CancelPeek { uuid });
2001
2002        drop(peek.read_hold);
2003    }
2004
2005    /// Handles a response from a replica. Replica IDs are re-used across replica restarts, so we
2006    /// use the replica epoch to drop stale responses.
2007    fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse<T>) {
2008        // Filter responses from non-existing or stale replicas.
2009        if self
2010            .replicas
2011            .get(&replica_id)
2012            .filter(|replica| replica.epoch == epoch)
2013            .is_none()
2014        {
2015            return;
2016        }
2017
2018        // Invariant: the replica exists and has the expected epoch.
2019
2020        match response {
2021            ComputeResponse::Frontiers(id, frontiers) => {
2022                self.handle_frontiers_response(id, frontiers, replica_id);
2023            }
2024            ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
2025                self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
2026            }
2027            ComputeResponse::CopyToResponse(id, response) => {
2028                self.handle_copy_to_response(id, response, replica_id);
2029            }
2030            ComputeResponse::SubscribeResponse(id, response) => {
2031                self.handle_subscribe_response(id, response, replica_id);
2032            }
2033            ComputeResponse::Status(response) => {
2034                self.handle_status_response(response, replica_id);
2035            }
2036        }
2037    }
2038
2039    /// Handle new frontiers, returning any compute response that needs to
2040    /// be sent to the client.
2041    fn handle_frontiers_response(
2042        &mut self,
2043        id: GlobalId,
2044        frontiers: FrontiersResponse<T>,
2045        replica_id: ReplicaId,
2046    ) {
2047        if !self.collections.contains_key(&id) {
2048            soft_panic_or_log!(
2049                "frontiers update for an unknown collection \
2050                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
2051            );
2052            return;
2053        }
2054        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2055            soft_panic_or_log!(
2056                "frontiers update for an unknown replica \
2057                 (replica_id={replica_id}, frontiers={frontiers:?})"
2058            );
2059            return;
2060        };
2061        let Some(replica_collection) = replica.collections.get_mut(&id) else {
2062            soft_panic_or_log!(
2063                "frontiers update for an unknown replica collection \
2064                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
2065            );
2066            return;
2067        };
2068
2069        if let Some(new_frontier) = frontiers.input_frontier {
2070            replica_collection.update_input_frontier(new_frontier.clone());
2071        }
2072        if let Some(new_frontier) = frontiers.output_frontier {
2073            replica_collection.update_output_frontier(new_frontier.clone());
2074        }
2075        if let Some(new_frontier) = frontiers.write_frontier {
2076            replica_collection.update_write_frontier(new_frontier.clone());
2077            self.maybe_update_global_write_frontier(id, new_frontier);
2078        }
2079    }
2080
2081    #[mz_ore::instrument(level = "debug")]
2082    fn handle_peek_response(
2083        &mut self,
2084        uuid: Uuid,
2085        response: PeekResponse,
2086        otel_ctx: OpenTelemetryContext,
2087        replica_id: ReplicaId,
2088    ) {
2089        otel_ctx.attach_as_parent();
2090
2091        // We might not be tracking this peek anymore, because we have served a response already or
2092        // because it was canceled. If this is the case, we ignore the response.
2093        let Some(peek) = self.peeks.get(&uuid) else {
2094            return;
2095        };
2096
2097        // If the peek is targeting a replica, ignore responses from other replicas.
2098        let target_replica = peek.target_replica.unwrap_or(replica_id);
2099        if target_replica != replica_id {
2100            return;
2101        }
2102
2103        let duration = peek.requested_at.elapsed();
2104        self.metrics.observe_peek_response(&response, duration);
2105
2106        let notification = PeekNotification::new(&response, peek.offset, peek.limit);
2107        // NOTE: We use the `otel_ctx` from the response, not the pending peek, because we
2108        // currently want the parent to be whatever the compute worker did with this peek.
2109        self.deliver_response(ComputeControllerResponse::PeekNotification(
2110            uuid,
2111            notification,
2112            otel_ctx,
2113        ));
2114
2115        self.finish_peek(uuid, response)
2116    }
2117
2118    fn handle_copy_to_response(
2119        &mut self,
2120        sink_id: GlobalId,
2121        response: CopyToResponse,
2122        replica_id: ReplicaId,
2123    ) {
2124        if !self.collections.contains_key(&sink_id) {
2125            soft_panic_or_log!(
2126                "received response for an unknown copy-to \
2127                 (sink_id={sink_id}, replica_id={replica_id})",
2128            );
2129            return;
2130        }
2131        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2132            soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
2133            return;
2134        };
2135        let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
2136            soft_panic_or_log!(
2137                "copy-to response for an unknown replica collection \
2138                 (sink_id={sink_id}, replica_id={replica_id})"
2139            );
2140            return;
2141        };
2142
2143        // Downgrade the replica frontiers, to enable dropping of input read holds and clean up of
2144        // collection state.
2145        // TODO(database-issues#4701): report copy-to frontiers through `Frontiers` responses
2146        replica_collection.update_write_frontier(Antichain::new());
2147        replica_collection.update_input_frontier(Antichain::new());
2148        replica_collection.update_output_frontier(Antichain::new());
2149
2150        // We might not be tracking this COPY TO because we have already returned a response
2151        // from one of the replicas. In that case, we ignore the response.
2152        if !self.copy_tos.remove(&sink_id) {
2153            return;
2154        }
2155
2156        let result = match response {
2157            CopyToResponse::RowCount(count) => Ok(count),
2158            CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
2159            // We should never get here: Replicas only drop copy to collections in response
2160            // to the controller allowing them to do so, and when the controller drops a
2161            // copy to it also removes it from the list of tracked copy_tos (see
2162            // [`Instance::drop_collections`]).
2163            CopyToResponse::Dropped => {
2164                tracing::error!(
2165                    %sink_id, %replica_id,
2166                    "received `Dropped` response for a tracked copy to",
2167                );
2168                return;
2169            }
2170        };
2171
2172        self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2173    }
2174
2175    fn handle_subscribe_response(
2176        &mut self,
2177        subscribe_id: GlobalId,
2178        response: SubscribeResponse<T>,
2179        replica_id: ReplicaId,
2180    ) {
2181        if !self.collections.contains_key(&subscribe_id) {
2182            soft_panic_or_log!(
2183                "received response for an unknown subscribe \
2184                 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2185            );
2186            return;
2187        }
2188        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2189            soft_panic_or_log!(
2190                "subscribe response for an unknown replica (replica_id={replica_id})"
2191            );
2192            return;
2193        };
2194        let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2195            soft_panic_or_log!(
2196                "subscribe response for an unknown replica collection \
2197                 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2198            );
2199            return;
2200        };
2201
2202        // Always apply replica write frontier updates. Even if the subscribe is not tracked
2203        // anymore, there might still be replicas reading from its inputs, so we need to track the
2204        // frontiers until all replicas have advanced to the empty one.
2205        let write_frontier = match &response {
2206            SubscribeResponse::Batch(batch) => batch.upper.clone(),
2207            SubscribeResponse::DroppedAt(_) => Antichain::new(),
2208        };
2209
2210        // For subscribes we downgrade all replica frontiers based on write frontiers. This should
2211        // be fine because the input and output frontier of a subscribe track its write frontier.
2212        // TODO(database-issues#4701): report subscribe frontiers through `Frontiers` responses
2213        replica_collection.update_write_frontier(write_frontier.clone());
2214        replica_collection.update_input_frontier(write_frontier.clone());
2215        replica_collection.update_output_frontier(write_frontier.clone());
2216
2217        // If the subscribe is not tracked, or targets a different replica, there is nothing to do.
2218        let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2219            return;
2220        };
2221        let replica_targeted = subscribe.target_replica.unwrap_or(replica_id) == replica_id;
2222        if !replica_targeted {
2223            return;
2224        }
2225
2226        // Apply a global frontier update.
2227        // If this is a replica-targeted subscribe, it is important that we advance the global
2228        // frontier only based on responses from the targeted replica. Otherwise, another replica
2229        // could advance to the empty frontier, making us drop the subscribe on the targeted
2230        // replica prematurely.
2231        self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2232
2233        match response {
2234            SubscribeResponse::Batch(batch) => {
2235                let upper = batch.upper;
2236                let mut updates = batch.updates;
2237
2238                // If this batch advances the subscribe's frontier, we emit all updates at times
2239                // greater or equal to the last frontier (to avoid emitting duplicate updates).
2240                if PartialOrder::less_than(&subscribe.frontier, &upper) {
2241                    let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2242
2243                    if upper.is_empty() {
2244                        // This subscribe cannot produce more data. Stop tracking it.
2245                        self.subscribes.remove(&subscribe_id);
2246                    } else {
2247                        // This subscribe can produce more data. Update our tracking of it.
2248                        self.subscribes.insert(subscribe_id, subscribe);
2249                    }
2250
2251                    if let Ok(updates) = updates.as_mut() {
2252                        updates.retain(|(time, _data, _diff)| lower.less_equal(time));
2253                    }
2254                    self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2255                        subscribe_id,
2256                        SubscribeBatch {
2257                            lower,
2258                            upper,
2259                            updates,
2260                        },
2261                    ));
2262                }
2263            }
2264            SubscribeResponse::DroppedAt(frontier) => {
2265                // We should never get here: Replicas only drop subscribe collections in response
2266                // to the controller allowing them to do so, and when the controller drops a
2267                // subscribe it also removes it from the list of tracked subscribes (see
2268                // [`Instance::drop_collections`]).
2269                tracing::error!(
2270                    %subscribe_id,
2271                    %replica_id,
2272                    frontier = ?frontier.elements(),
2273                    "received `DroppedAt` response for a tracked subscribe",
2274                );
2275                self.subscribes.remove(&subscribe_id);
2276            }
2277        }
2278    }
2279
2280    fn handle_status_response(&self, response: StatusResponse, _replica_id: ReplicaId) {
2281        match response {
2282            StatusResponse::Placeholder => {}
2283        }
2284    }
2285
2286    /// Return the write frontiers of the dependencies of the given collection.
2287    fn dependency_write_frontiers<'b>(
2288        &'b self,
2289        collection: &'b CollectionState<T>,
2290    ) -> impl Iterator<Item = Antichain<T>> + 'b {
2291        let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2292            let collection = self.collections.get(&dep_id);
2293            collection.map(|c| c.write_frontier())
2294        });
2295        let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2296            let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2297            frontiers.map(|f| f.write_frontier)
2298        });
2299
2300        compute_frontiers.chain(storage_frontiers)
2301    }
2302
2303    /// Return the write frontiers of transitive storage dependencies of the given collection.
2304    fn transitive_storage_dependency_write_frontiers<'b>(
2305        &'b self,
2306        collection: &'b CollectionState<T>,
2307    ) -> impl Iterator<Item = Antichain<T>> + 'b {
2308        let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2309        let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2310        let mut done = BTreeSet::new();
2311
2312        while let Some(id) = todo.pop() {
2313            if done.contains(&id) {
2314                continue;
2315            }
2316            if let Some(dep) = self.collections.get(&id) {
2317                storage_ids.extend(dep.storage_dependency_ids());
2318                todo.extend(dep.compute_dependency_ids())
2319            }
2320            done.insert(id);
2321        }
2322
2323        let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2324            let frontiers = self.storage_collections.collection_frontiers(id).ok();
2325            frontiers.map(|f| f.write_frontier)
2326        });
2327
2328        storage_frontiers
2329    }
2330
2331    /// Downgrade the warmup capabilities of collections as much as possible.
2332    ///
2333    /// The only requirement we have for a collection's warmup capability is that it is for a time
2334    /// that is available in all of the collection's inputs. For each input the latest time that is
2335    /// the case for is `write_frontier - 1`. So the farthest we can downgrade a collection's
2336    /// warmup capability is the minimum of `write_frontier - 1` of all its inputs.
2337    ///
2338    /// This method expects to be periodically called as part of instance maintenance work.
2339    /// We would like to instead update the warmup capabilities synchronously in response to
2340    /// frontier updates of dependency collections, but that is not generally possible because we
2341    /// don't learn about frontier updates of storage collections synchronously. We could do
2342    /// synchronous updates for compute dependencies, but we refrain from doing for simplicity.
2343    fn downgrade_warmup_capabilities(&mut self) {
2344        let mut new_capabilities = BTreeMap::new();
2345        for (id, collection) in &self.collections {
2346            // For write-only collections that have advanced to the empty frontier, we can drop the
2347            // warmup capability entirely. There is no reason why we would need to hydrate those
2348            // collections again, so being able to warm them up is not useful.
2349            if collection.read_policy.is_none()
2350                && collection.shared.lock_write_frontier(|f| f.is_empty())
2351            {
2352                new_capabilities.insert(*id, Antichain::new());
2353                continue;
2354            }
2355
2356            let mut new_capability = Antichain::new();
2357            for frontier in self.dependency_write_frontiers(collection) {
2358                for time in frontier {
2359                    new_capability.insert(time.step_back().unwrap_or(time));
2360                }
2361            }
2362
2363            new_capabilities.insert(*id, new_capability);
2364        }
2365
2366        for (id, new_capability) in new_capabilities {
2367            let collection = self.expect_collection_mut(id);
2368            let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2369        }
2370    }
2371
2372    /// Forward the implied capabilities of collections, if possible.
2373    ///
2374    /// The implied capability of a collection controls (a) which times are still readable (for
2375    /// indexes) and (b) with which as-of the collection gets installed on a new replica. We are
2376    /// usually not allowed to advance an implied capability beyond the frontier that follows from
2377    /// the collection's read policy applied to its write frontier:
2378    ///
2379    ///  * For sink collections, some external consumer might rely on seeing all distinct times in
2380    ///    the input reflected in the output. If we'd forward the implied capability of a sink,
2381    ///    we'd risk skipping times in the output across replica restarts.
2382    ///  * For index collections, we might make the index unreadable by advancing its read frontier
2383    ///    beyond its write frontier.
2384    ///
2385    /// There is one case where forwarding an implied capability is fine though: an index installed
2386    /// on a cluster that has no replicas. Such indexes are not readable anyway until a new replica
2387    /// is added, so advancing its read frontier can't make it unreadable. We can thus advance the
2388    /// implied capability as long as we make sure that when a new replica is added, the expected
2389    /// relationship between write frontier, read policy, and implied capability can be restored
2390    /// immediately (modulo computation time).
2391    ///
2392    /// Forwarding implied capabilities is not necessary for the correct functioning of the
2393    /// controller but an optimization that is beneficial in two ways:
2394    ///
2395    ///  * It relaxes read holds on inputs to forwarded collections, allowing their compaction.
2396    ///  * It reduces the amount of historical detail new replicas need to process when computing
2397    ///    forwarded collections, as forwarding the implied capability also forwards the corresponding
2398    ///    dataflow as-of.
2399    fn forward_implied_capabilities(&mut self) {
2400        if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2401            return;
2402        }
2403        if !self.replicas.is_empty() {
2404            return;
2405        }
2406
2407        let mut new_capabilities = BTreeMap::new();
2408        for (id, collection) in &self.collections {
2409            let Some(read_policy) = &collection.read_policy else {
2410                // Collection is write-only, i.e. a sink.
2411                continue;
2412            };
2413
2414            // When a new replica is started, it will immediately be able to compute all collection
2415            // output up to the write frontier of its transitive storage inputs. So the new implied
2416            // read capability should be the read policy applied to that frontier.
2417            let mut dep_frontier = Antichain::new();
2418            for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2419                dep_frontier.extend(frontier);
2420            }
2421
2422            let new_capability = read_policy.frontier(dep_frontier.borrow());
2423            if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2424                new_capabilities.insert(*id, new_capability);
2425            }
2426        }
2427
2428        for (id, new_capability) in new_capabilities {
2429            let collection = self.expect_collection_mut(id);
2430            let _ = collection.implied_read_hold.try_downgrade(new_capability);
2431        }
2432    }
2433
2434    /// Acquires a `ReadHold` for the identified compute collection.
2435    ///
2436    /// This mirrors the logic used by the controller-side `InstanceState::acquire_read_hold`,
2437    /// but executes on the instance task itself.
2438    fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
2439        // Similarly to InstanceState::acquire_read_hold and StorageCollections::acquire_read_holds,
2440        // we acquire read holds at the earliest possible time rather than returning a copy
2441        // of the implied read hold. This is so that dependents can acquire read holds on
2442        // compute dependencies at frontiers that are held back by other read holds the caller
2443        // has previously taken.
2444        let collection = self.collection(id)?;
2445        let since = collection.shared.lock_read_capabilities(|caps| {
2446            let since = caps.frontier().to_owned();
2447            caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
2448            since
2449        });
2450        let hold = ReadHold::new(id, since, Arc::clone(&self.read_hold_tx));
2451        Ok(hold)
2452    }
2453
2454    /// Process pending maintenance work.
2455    ///
2456    /// This method is invoked periodically by the global controller.
2457    /// It is a good place to perform maintenance work that arises from various controller state
2458    /// changes and that cannot conveniently be handled synchronously with those state changes.
2459    #[mz_ore::instrument(level = "debug")]
2460    pub fn maintain(&mut self) {
2461        self.rehydrate_failed_replicas();
2462        self.downgrade_warmup_capabilities();
2463        self.forward_implied_capabilities();
2464        self.schedule_collections();
2465        self.cleanup_collections();
2466        self.update_frontier_introspection();
2467        self.refresh_state_metrics();
2468        self.refresh_wallclock_lag();
2469    }
2470}
2471
2472/// State maintained about individual compute collections.
2473///
2474/// A compute collection is either an index, or a storage sink, or a subscribe, exported by a
2475/// compute dataflow.
2476#[derive(Debug)]
2477struct CollectionState<T: ComputeControllerTimestamp> {
2478    /// Whether this collection is a log collection.
2479    ///
2480    /// Log collections are special in that they are only maintained by a subset of all replicas.
2481    log_collection: bool,
2482    /// Whether this collection has been dropped by a controller client.
2483    ///
2484    /// The controller is allowed to remove the `CollectionState` for a collection only when
2485    /// `dropped == true`. Otherwise, clients might still expect to be able to query information
2486    /// about this collection.
2487    dropped: bool,
2488    /// Whether this collection has been scheduled, i.e., the controller has sent a `Schedule`
2489    /// command for it.
2490    scheduled: bool,
2491
2492    /// Whether this collection is in read-only mode.
2493    ///
2494    /// When in read-only mode, the dataflow is not allowed to affect external state (largely persist).
2495    read_only: bool,
2496
2497    /// State shared with the `ComputeController`.
2498    shared: SharedCollectionState<T>,
2499
2500    /// A read hold maintaining the implicit capability of the collection.
2501    ///
2502    /// This capability is kept to ensure that the collection remains readable according to its
2503    /// `read_policy`. It also ensures that read holds on the collection's dependencies are kept at
2504    /// some time not greater than the collection's `write_frontier`, guaranteeing that the
2505    /// collection's next outputs can always be computed without skipping times.
2506    implied_read_hold: ReadHold<T>,
2507    /// A read hold held to enable dataflow warmup.
2508    ///
2509    /// Dataflow warmup is an optimization that allows dataflows to immediately start hydrating
2510    /// even when their next output time (as implied by the `write_frontier`) is in the future.
2511    /// By installing a read capability derived from the write frontiers of the collection's
2512    /// inputs, we ensure that the as-of of new dataflows installed for the collection is at a time
2513    /// that is immediately available, so hydration can begin immediately too.
2514    warmup_read_hold: ReadHold<T>,
2515    /// The policy to use to downgrade `self.implied_read_hold`.
2516    ///
2517    /// If `None`, the collection is a write-only collection (i.e. a sink). For write-only
2518    /// collections, the `implied_read_hold` is only required for maintaining read holds on the
2519    /// inputs, so we can immediately downgrade it to the `write_frontier`.
2520    read_policy: Option<ReadPolicy<T>>,
2521
2522    /// Storage identifiers on which this collection depends, and read holds this collection
2523    /// requires on them.
2524    storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2525    /// Compute identifiers on which this collection depends, and read holds this collection
2526    /// requires on them.
2527    compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2528
2529    /// Introspection state associated with this collection.
2530    introspection: CollectionIntrospection<T>,
2531
2532    /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
2533    /// introspection update.
2534    ///
2535    /// Keys are `(period, lag, labels)` triples, values are counts.
2536    ///
2537    /// If this is `None`, wallclock lag is not tracked for this collection.
2538    wallclock_lag_histogram_stash: Option<
2539        BTreeMap<
2540            (
2541                WallclockLagHistogramPeriod,
2542                WallclockLag,
2543                BTreeMap<&'static str, String>,
2544            ),
2545            Diff,
2546        >,
2547    >,
2548}
2549
2550impl<T: ComputeControllerTimestamp> CollectionState<T> {
2551    /// Creates a new collection state, with an initial read policy valid from `since`.
2552    fn new(
2553        collection_id: GlobalId,
2554        as_of: Antichain<T>,
2555        shared: SharedCollectionState<T>,
2556        storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2557        compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2558        read_hold_tx: read_holds::ChangeTx<T>,
2559        introspection: CollectionIntrospection<T>,
2560    ) -> Self {
2561        // A collection is not readable before the `as_of`.
2562        let since = as_of.clone();
2563        // A collection won't produce updates for times before the `as_of`.
2564        let upper = as_of;
2565
2566        // Ensure that the provided `shared` is valid for the given `as_of`.
2567        assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2568        assert!(shared.lock_write_frontier(|f| f == &upper));
2569
2570        // Initialize collection read holds.
2571        // Note that the implied read hold was already added to the `read_capabilities` when
2572        // `shared` was created, so we only need to add the warmup read hold here.
2573        let implied_read_hold =
2574            ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2575        let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2576
2577        let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2578        shared.lock_read_capabilities(|c| {
2579            c.update_iter(updates);
2580        });
2581
2582        // In an effort to keep the produced wallclock lag introspection data small and
2583        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
2584        // select indexes and subscribes.
2585        let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2586            true => None,
2587            false => Some(Default::default()),
2588        };
2589
2590        Self {
2591            log_collection: false,
2592            dropped: false,
2593            scheduled: false,
2594            read_only: true,
2595            shared,
2596            implied_read_hold,
2597            warmup_read_hold,
2598            read_policy: Some(ReadPolicy::ValidFrom(since)),
2599            storage_dependencies,
2600            compute_dependencies,
2601            introspection,
2602            wallclock_lag_histogram_stash,
2603        }
2604    }
2605
2606    /// Creates a new collection state for a log collection.
2607    fn new_log_collection(
2608        id: GlobalId,
2609        shared: SharedCollectionState<T>,
2610        read_hold_tx: read_holds::ChangeTx<T>,
2611        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2612    ) -> Self {
2613        let since = Antichain::from_elem(T::minimum());
2614        let introspection =
2615            CollectionIntrospection::new(id, introspection_tx, since.clone(), false, None, None);
2616        let mut state = Self::new(
2617            id,
2618            since,
2619            shared,
2620            Default::default(),
2621            Default::default(),
2622            read_hold_tx,
2623            introspection,
2624        );
2625        state.log_collection = true;
2626        // Log collections are created and scheduled implicitly as part of replica initialization.
2627        state.scheduled = true;
2628        state
2629    }
2630
2631    /// Reports the current read frontier.
2632    fn read_frontier(&self) -> Antichain<T> {
2633        self.shared
2634            .lock_read_capabilities(|c| c.frontier().to_owned())
2635    }
2636
2637    /// Reports the current write frontier.
2638    fn write_frontier(&self) -> Antichain<T> {
2639        self.shared.lock_write_frontier(|f| f.clone())
2640    }
2641
2642    fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2643        self.storage_dependencies.keys().copied()
2644    }
2645
2646    fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2647        self.compute_dependencies.keys().copied()
2648    }
2649
2650    /// Reports the IDs of the dependencies of this collection.
2651    fn dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2652        self.compute_dependency_ids()
2653            .chain(self.storage_dependency_ids())
2654    }
2655}
2656
2657/// Collection state shared with the `ComputeController`.
2658///
2659/// Having this allows certain controller APIs, such as `ComputeController::collection_frontiers`
2660/// and `ComputeController::acquire_read_hold` to be non-`async`. This comes at the cost of
2661/// complexity (by introducing shared mutable state) and performance (by introducing locking). We
2662/// should aim to reduce the amount of shared state over time, rather than expand it.
2663///
2664/// Note that [`SharedCollectionState`]s are initialized by the `ComputeController` prior to the
2665/// collection's creation in the [`Instance`]. This is to allow compute clients to query frontiers
2666/// and take new read holds immediately, without having to wait for the [`Instance`] to update.
2667#[derive(Clone, Debug)]
2668pub(super) struct SharedCollectionState<T> {
2669    /// Accumulation of read capabilities for the collection.
2670    ///
2671    /// This accumulation contains the capabilities held by all [`ReadHold`]s given out for the
2672    /// collection, including `implied_read_hold` and `warmup_read_hold`.
2673    ///
2674    /// NOTE: This field may only be modified by [`Instance::apply_read_hold_change`],
2675    /// [`Instance::acquire_read_hold`], and `ComputeController::acquire_read_hold`.
2676    /// Nobody else should modify read capabilities directly. Instead, collection users should
2677    /// manage read holds through [`ReadHold`] objects acquired through
2678    /// `ComputeController::acquire_read_hold`.
2679    ///
2680    /// TODO(teskje): Restructure the code to enforce the above in the type system.
2681    read_capabilities: Arc<Mutex<MutableAntichain<T>>>,
2682    /// The write frontier of this collection.
2683    write_frontier: Arc<Mutex<Antichain<T>>>,
2684}
2685
2686impl<T: Timestamp> SharedCollectionState<T> {
2687    pub fn new(as_of: Antichain<T>) -> Self {
2688        // A collection is not readable before the `as_of`.
2689        let since = as_of.clone();
2690        // A collection won't produce updates for times before the `as_of`.
2691        let upper = as_of;
2692
2693        // Initialize read capabilities to the `since`.
2694        // The is the implied read capability. The corresponding [`ReadHold`] is created in
2695        // [`CollectionState::new`].
2696        let mut read_capabilities = MutableAntichain::new();
2697        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2698
2699        Self {
2700            read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2701            write_frontier: Arc::new(Mutex::new(upper)),
2702        }
2703    }
2704
2705    pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2706    where
2707        F: FnOnce(&mut MutableAntichain<T>) -> R,
2708    {
2709        let mut caps = self.read_capabilities.lock().expect("poisoned");
2710        f(&mut *caps)
2711    }
2712
2713    pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2714    where
2715        F: FnOnce(&mut Antichain<T>) -> R,
2716    {
2717        let mut frontier = self.write_frontier.lock().expect("poisoned");
2718        f(&mut *frontier)
2719    }
2720}
2721
2722/// Manages certain introspection relations associated with a collection. Upon creation, it adds
2723/// rows to introspection relations. When dropped, it retracts its managed rows.
2724///
2725/// TODO: `ComputeDependencies` could be moved under this.
2726#[derive(Debug)]
2727struct CollectionIntrospection<T: ComputeControllerTimestamp> {
2728    /// The ID of the compute collection.
2729    collection_id: GlobalId,
2730    /// A channel through which introspection updates are delivered.
2731    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2732    /// Introspection state for `IntrospectionType::Frontiers`.
2733    ///
2734    /// `Some` if the collection does _not_ sink into a storage collection (i.e. is not an MV). If
2735    /// the collection sinks into storage, the storage controller reports its frontiers instead.
2736    frontiers: Option<FrontiersIntrospectionState<T>>,
2737    /// Introspection state for `IntrospectionType::ComputeMaterializedViewRefreshes`.
2738    ///
2739    /// `Some` if the collection is a REFRESH MV.
2740    refresh: Option<RefreshIntrospectionState<T>>,
2741}
2742
2743impl<T: ComputeControllerTimestamp> CollectionIntrospection<T> {
2744    fn new(
2745        collection_id: GlobalId,
2746        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2747        as_of: Antichain<T>,
2748        storage_sink: bool,
2749        initial_as_of: Option<Antichain<T>>,
2750        refresh_schedule: Option<RefreshSchedule>,
2751    ) -> Self {
2752        let refresh =
2753            match (refresh_schedule, initial_as_of) {
2754                (Some(refresh_schedule), Some(initial_as_of)) => Some(
2755                    RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2756                ),
2757                (refresh_schedule, _) => {
2758                    // If we have a `refresh_schedule`, then the collection is a MV, so we should also have
2759                    // an `initial_as_of`.
2760                    soft_assert_or_log!(
2761                        refresh_schedule.is_none(),
2762                        "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2763                    );
2764                    None
2765                }
2766            };
2767        let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2768
2769        let self_ = Self {
2770            collection_id,
2771            introspection_tx,
2772            frontiers,
2773            refresh,
2774        };
2775
2776        self_.report_initial_state();
2777        self_
2778    }
2779
2780    /// Reports the initial introspection state.
2781    fn report_initial_state(&self) {
2782        if let Some(frontiers) = &self.frontiers {
2783            let row = frontiers.row_for_collection(self.collection_id);
2784            let updates = vec![(row, Diff::ONE)];
2785            self.send(IntrospectionType::Frontiers, updates);
2786        }
2787
2788        if let Some(refresh) = &self.refresh {
2789            let row = refresh.row_for_collection(self.collection_id);
2790            let updates = vec![(row, Diff::ONE)];
2791            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2792        }
2793    }
2794
2795    /// Observe the given current collection frontiers and update the introspection state as
2796    /// necessary.
2797    fn observe_frontiers(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2798        self.update_frontier_introspection(read_frontier, write_frontier);
2799        self.update_refresh_introspection(write_frontier);
2800    }
2801
2802    fn update_frontier_introspection(
2803        &mut self,
2804        read_frontier: &Antichain<T>,
2805        write_frontier: &Antichain<T>,
2806    ) {
2807        let Some(frontiers) = &mut self.frontiers else {
2808            return;
2809        };
2810
2811        if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2812        {
2813            return; // no change
2814        };
2815
2816        let retraction = frontiers.row_for_collection(self.collection_id);
2817        frontiers.update(read_frontier, write_frontier);
2818        let insertion = frontiers.row_for_collection(self.collection_id);
2819        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2820        self.send(IntrospectionType::Frontiers, updates);
2821    }
2822
2823    fn update_refresh_introspection(&mut self, write_frontier: &Antichain<T>) {
2824        let Some(refresh) = &mut self.refresh else {
2825            return;
2826        };
2827
2828        let retraction = refresh.row_for_collection(self.collection_id);
2829        refresh.frontier_update(write_frontier);
2830        let insertion = refresh.row_for_collection(self.collection_id);
2831
2832        if retraction == insertion {
2833            return; // no change
2834        }
2835
2836        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2837        self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2838    }
2839
2840    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2841        // Failure to send means the `ComputeController` has been dropped and doesn't care about
2842        // introspection updates anymore.
2843        let _ = self.introspection_tx.send((introspection_type, updates));
2844    }
2845}
2846
2847impl<T: ComputeControllerTimestamp> Drop for CollectionIntrospection<T> {
2848    fn drop(&mut self) {
2849        // Retract collection frontiers.
2850        if let Some(frontiers) = &self.frontiers {
2851            let row = frontiers.row_for_collection(self.collection_id);
2852            let updates = vec![(row, Diff::MINUS_ONE)];
2853            self.send(IntrospectionType::Frontiers, updates);
2854        }
2855
2856        // Retract MV refresh state.
2857        if let Some(refresh) = &self.refresh {
2858            let retraction = refresh.row_for_collection(self.collection_id);
2859            let updates = vec![(retraction, Diff::MINUS_ONE)];
2860            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2861        }
2862    }
2863}
2864
2865#[derive(Debug)]
2866struct FrontiersIntrospectionState<T> {
2867    read_frontier: Antichain<T>,
2868    write_frontier: Antichain<T>,
2869}
2870
2871impl<T: ComputeControllerTimestamp> FrontiersIntrospectionState<T> {
2872    fn new(as_of: Antichain<T>) -> Self {
2873        Self {
2874            read_frontier: as_of.clone(),
2875            write_frontier: as_of,
2876        }
2877    }
2878
2879    /// Return a `Row` reflecting the current collection frontiers.
2880    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2881        let read_frontier = self
2882            .read_frontier
2883            .as_option()
2884            .map_or(Datum::Null, |ts| ts.clone().into());
2885        let write_frontier = self
2886            .write_frontier
2887            .as_option()
2888            .map_or(Datum::Null, |ts| ts.clone().into());
2889        Row::pack_slice(&[
2890            Datum::String(&collection_id.to_string()),
2891            read_frontier,
2892            write_frontier,
2893        ])
2894    }
2895
2896    /// Update the introspection state with the given new frontiers.
2897    fn update(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2898        if read_frontier != &self.read_frontier {
2899            self.read_frontier.clone_from(read_frontier);
2900        }
2901        if write_frontier != &self.write_frontier {
2902            self.write_frontier.clone_from(write_frontier);
2903        }
2904    }
2905}
2906
2907/// Information needed to compute introspection updates for a REFRESH materialized view when the
2908/// write frontier advances.
2909#[derive(Debug)]
2910struct RefreshIntrospectionState<T> {
2911    // Immutable properties of the MV
2912    refresh_schedule: RefreshSchedule,
2913    initial_as_of: Antichain<T>,
2914    // Refresh state
2915    next_refresh: Datum<'static>,           // Null or an MzTimestamp
2916    last_completed_refresh: Datum<'static>, // Null or an MzTimestamp
2917}
2918
2919impl<T> RefreshIntrospectionState<T> {
2920    /// Return a `Row` reflecting the current refresh introspection state.
2921    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2922        Row::pack_slice(&[
2923            Datum::String(&collection_id.to_string()),
2924            self.last_completed_refresh,
2925            self.next_refresh,
2926        ])
2927    }
2928}
2929
2930impl<T: ComputeControllerTimestamp> RefreshIntrospectionState<T> {
2931    /// Construct a new [`RefreshIntrospectionState`], and apply an initial `frontier_update()` at
2932    /// the `upper`.
2933    fn new(
2934        refresh_schedule: RefreshSchedule,
2935        initial_as_of: Antichain<T>,
2936        upper: &Antichain<T>,
2937    ) -> Self {
2938        let mut self_ = Self {
2939            refresh_schedule: refresh_schedule.clone(),
2940            initial_as_of: initial_as_of.clone(),
2941            next_refresh: Datum::Null,
2942            last_completed_refresh: Datum::Null,
2943        };
2944        self_.frontier_update(upper);
2945        self_
2946    }
2947
2948    /// Should be called whenever the write frontier of the collection advances. It updates the
2949    /// state that should be recorded in introspection relations, but doesn't send the updates yet.
2950    fn frontier_update(&mut self, write_frontier: &Antichain<T>) {
2951        if write_frontier.is_empty() {
2952            self.last_completed_refresh =
2953                if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2954                    last_refresh.into()
2955                } else {
2956                    // If there is no last refresh, then we have a `REFRESH EVERY`, in which case
2957                    // the saturating roundup puts a refresh at the maximum possible timestamp.
2958                    T::maximum().into()
2959                };
2960            self.next_refresh = Datum::Null;
2961        } else {
2962            if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2963                // We are before the first refresh.
2964                self.last_completed_refresh = Datum::Null;
2965                let initial_as_of = self.initial_as_of.as_option().expect(
2966                    "initial_as_of can't be [], because then there would be no refreshes at all",
2967                );
2968                let first_refresh = initial_as_of
2969                    .round_up(&self.refresh_schedule)
2970                    .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2971                soft_assert_or_log!(
2972                    first_refresh == *initial_as_of,
2973                    "initial_as_of should be set to the first refresh"
2974                );
2975                self.next_refresh = first_refresh.into();
2976            } else {
2977                // The first refresh has already happened.
2978                let write_frontier = write_frontier.as_option().expect("checked above");
2979                self.last_completed_refresh = write_frontier
2980                    .round_down_minus_1(&self.refresh_schedule)
2981                    .map_or_else(
2982                        || {
2983                            soft_panic_or_log!(
2984                                "rounding down should have returned the first refresh or later"
2985                            );
2986                            Datum::Null
2987                        },
2988                        |last_completed_refresh| last_completed_refresh.into(),
2989                    );
2990                self.next_refresh = write_frontier.clone().into();
2991            }
2992        }
2993    }
2994}
2995
2996/// A note of an outstanding peek response.
2997#[derive(Debug)]
2998struct PendingPeek<T: Timestamp> {
2999    /// For replica-targeted peeks, this specifies the replica whose response we should pass on.
3000    ///
3001    /// If this value is `None`, we pass on the first response.
3002    target_replica: Option<ReplicaId>,
3003    /// The OpenTelemetry context for this peek.
3004    otel_ctx: OpenTelemetryContext,
3005    /// The time at which the peek was requested.
3006    ///
3007    /// Used to track peek durations.
3008    requested_at: Instant,
3009    /// The read hold installed to serve this peek.
3010    read_hold: ReadHold<T>,
3011    /// The channel to send peek results.
3012    peek_response_tx: oneshot::Sender<PeekResponse>,
3013    /// An optional limit of the peek's result size.
3014    limit: Option<usize>,
3015    /// The offset into the peek's result.
3016    offset: usize,
3017}
3018
3019#[derive(Debug, Clone)]
3020struct ActiveSubscribe<T> {
3021    /// Current upper frontier of this subscribe.
3022    frontier: Antichain<T>,
3023    /// For replica-targeted subscribes, this specifies the replica whose responses we should pass on.
3024    ///
3025    /// If this value is `None`, we pass on the first response for each time slice.
3026    target_replica: Option<ReplicaId>,
3027}
3028
3029impl<T: ComputeControllerTimestamp> ActiveSubscribe<T> {
3030    fn new(target_replica: Option<ReplicaId>) -> Self {
3031        Self {
3032            frontier: Antichain::from_elem(T::minimum()),
3033            target_replica,
3034        }
3035    }
3036}
3037
3038/// State maintained about individual replicas.
3039#[derive(Debug)]
3040struct ReplicaState<T: ComputeControllerTimestamp> {
3041    /// The ID of the replica.
3042    id: ReplicaId,
3043    /// Client for the running replica task.
3044    client: ReplicaClient<T>,
3045    /// The replica configuration.
3046    config: ReplicaConfig,
3047    /// Replica metrics.
3048    metrics: ReplicaMetrics,
3049    /// A channel through which introspection updates are delivered.
3050    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3051    /// Per-replica collection state.
3052    collections: BTreeMap<GlobalId, ReplicaCollectionState<T>>,
3053    /// The epoch of the replica.
3054    epoch: u64,
3055}
3056
3057impl<T: ComputeControllerTimestamp> ReplicaState<T> {
3058    fn new(
3059        id: ReplicaId,
3060        client: ReplicaClient<T>,
3061        config: ReplicaConfig,
3062        metrics: ReplicaMetrics,
3063        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3064        epoch: u64,
3065    ) -> Self {
3066        Self {
3067            id,
3068            client,
3069            config,
3070            metrics,
3071            introspection_tx,
3072            epoch,
3073            collections: Default::default(),
3074        }
3075    }
3076
3077    /// Add a collection to the replica state.
3078    ///
3079    /// # Panics
3080    ///
3081    /// Panics if a collection with the same ID exists already.
3082    fn add_collection(
3083        &mut self,
3084        id: GlobalId,
3085        as_of: Antichain<T>,
3086        input_read_holds: Vec<ReadHold<T>>,
3087    ) {
3088        let metrics = self.metrics.for_collection(id);
3089        let introspection = ReplicaCollectionIntrospection::new(
3090            self.id,
3091            id,
3092            self.introspection_tx.clone(),
3093            as_of.clone(),
3094        );
3095        let mut state =
3096            ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
3097
3098        // In an effort to keep the produced wallclock lag introspection data small and
3099        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
3100        // select indexes and subscribes.
3101        if id.is_transient() {
3102            state.wallclock_lag_max = None;
3103        }
3104
3105        if let Some(previous) = self.collections.insert(id, state) {
3106            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
3107        }
3108    }
3109
3110    /// Remove state for a collection.
3111    fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState<T>> {
3112        self.collections.remove(&id)
3113    }
3114
3115    /// Returns whether all replica frontiers of the given collection are empty.
3116    fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
3117        self.collections.get(&id).map_or(true, |c| {
3118            c.write_frontier.is_empty()
3119                && c.input_frontier.is_empty()
3120                && c.output_frontier.is_empty()
3121        })
3122    }
3123
3124    /// Returns the state of the [`ReplicaState`] formatted as JSON.
3125    ///
3126    /// The returned value is not guaranteed to be stable and may change at any point in time.
3127    #[mz_ore::instrument(level = "debug")]
3128    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3129        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
3130        // returned object as a tradeoff between usability and stability. `serde_json` will fail
3131        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
3132        // prevents a future unrelated change from silently breaking this method.
3133
3134        // Destructure `self` here so we don't forget to consider dumping newly added fields.
3135        let Self {
3136            id,
3137            client: _,
3138            config: _,
3139            metrics: _,
3140            introspection_tx: _,
3141            epoch,
3142            collections,
3143        } = self;
3144
3145        let collections: BTreeMap<_, _> = collections
3146            .iter()
3147            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
3148            .collect();
3149
3150        Ok(serde_json::json!({
3151            "id": id.to_string(),
3152            "collections": collections,
3153            "epoch": epoch,
3154        }))
3155    }
3156}
3157
3158#[derive(Debug)]
3159struct ReplicaCollectionState<T: ComputeControllerTimestamp> {
3160    /// The replica write frontier of this collection.
3161    ///
3162    /// See [`FrontiersResponse::write_frontier`].
3163    write_frontier: Antichain<T>,
3164    /// The replica input frontier of this collection.
3165    ///
3166    /// See [`FrontiersResponse::input_frontier`].
3167    input_frontier: Antichain<T>,
3168    /// The replica output frontier of this collection.
3169    ///
3170    /// See [`FrontiersResponse::output_frontier`].
3171    output_frontier: Antichain<T>,
3172
3173    /// Metrics tracked for this collection.
3174    ///
3175    /// If this is `None`, no metrics are collected.
3176    metrics: Option<ReplicaCollectionMetrics>,
3177    /// As-of frontier with which this collection was installed on the replica.
3178    as_of: Antichain<T>,
3179    /// Tracks introspection state for this collection.
3180    introspection: ReplicaCollectionIntrospection<T>,
3181    /// Read holds on storage inputs to this collection.
3182    ///
3183    /// These read holds are kept to ensure that the replica is able to read from storage inputs at
3184    /// all times it hasn't read yet. We only need to install read holds for storage inputs since
3185    /// compaction of compute inputs is implicitly held back by Timely/DD.
3186    input_read_holds: Vec<ReadHold<T>>,
3187
3188    /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
3189    ///
3190    /// If this is `None`, wallclock lag is not tracked for this collection.
3191    wallclock_lag_max: Option<WallclockLag>,
3192}
3193
3194impl<T: ComputeControllerTimestamp> ReplicaCollectionState<T> {
3195    fn new(
3196        metrics: Option<ReplicaCollectionMetrics>,
3197        as_of: Antichain<T>,
3198        introspection: ReplicaCollectionIntrospection<T>,
3199        input_read_holds: Vec<ReadHold<T>>,
3200    ) -> Self {
3201        Self {
3202            write_frontier: as_of.clone(),
3203            input_frontier: as_of.clone(),
3204            output_frontier: as_of.clone(),
3205            metrics,
3206            as_of,
3207            introspection,
3208            input_read_holds,
3209            wallclock_lag_max: Some(WallclockLag::MIN),
3210        }
3211    }
3212
3213    /// Returns whether this collection is hydrated.
3214    fn hydrated(&self) -> bool {
3215        // If the observed frontier is greater than the collection's as-of, the collection has
3216        // produced some output and is therefore hydrated.
3217        //
3218        // We need to consider the edge case where the as-of is the empty frontier. Such an as-of
3219        // is not useful for indexes, because they wouldn't be readable. For write-only
3220        // collections, an empty as-of means that the collection has been fully written and no new
3221        // dataflow needs to be created for it. Consequently, no hydration will happen either.
3222        //
3223        // Based on this, we could respond in two ways:
3224        //  * `false`, as in "the dataflow was never created"
3225        //  * `true`, as in "the dataflow completed immediately"
3226        //
3227        // Since hydration is often used as a measure of dataflow progress and we don't want to
3228        // give the impression that certain dataflows are somehow stuck when they are not, we go
3229        // with the second interpretation here.
3230        self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3231    }
3232
3233    /// Updates the replica write frontier of this collection.
3234    fn update_write_frontier(&mut self, new_frontier: Antichain<T>) {
3235        if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3236            soft_panic_or_log!(
3237                "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3238                self.write_frontier,
3239            );
3240            return;
3241        } else if new_frontier == self.write_frontier {
3242            return;
3243        }
3244
3245        self.write_frontier = new_frontier;
3246    }
3247
3248    /// Updates the replica input frontier of this collection.
3249    fn update_input_frontier(&mut self, new_frontier: Antichain<T>) {
3250        if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3251            soft_panic_or_log!(
3252                "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3253                self.input_frontier,
3254            );
3255            return;
3256        } else if new_frontier == self.input_frontier {
3257            return;
3258        }
3259
3260        self.input_frontier = new_frontier;
3261
3262        // Relax our read holds on collection inputs.
3263        for read_hold in &mut self.input_read_holds {
3264            let result = read_hold.try_downgrade(self.input_frontier.clone());
3265            soft_assert_or_log!(
3266                result.is_ok(),
3267                "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3268                self.input_frontier,
3269            );
3270        }
3271    }
3272
3273    /// Updates the replica output frontier of this collection.
3274    fn update_output_frontier(&mut self, new_frontier: Antichain<T>) {
3275        if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3276            soft_panic_or_log!(
3277                "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3278                self.output_frontier,
3279            );
3280            return;
3281        } else if new_frontier == self.output_frontier {
3282            return;
3283        }
3284
3285        self.output_frontier = new_frontier;
3286    }
3287}
3288
3289/// Maintains the introspection state for a given replica and collection, and ensures that reported
3290/// introspection data is retracted when the collection is dropped.
3291#[derive(Debug)]
3292struct ReplicaCollectionIntrospection<T: ComputeControllerTimestamp> {
3293    /// The ID of the replica.
3294    replica_id: ReplicaId,
3295    /// The ID of the compute collection.
3296    collection_id: GlobalId,
3297    /// The collection's reported replica write frontier.
3298    write_frontier: Antichain<T>,
3299    /// A channel through which introspection updates are delivered.
3300    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3301}
3302
3303impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
3304    /// Create a new `HydrationState` and initialize introspection.
3305    fn new(
3306        replica_id: ReplicaId,
3307        collection_id: GlobalId,
3308        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3309        as_of: Antichain<T>,
3310    ) -> Self {
3311        let self_ = Self {
3312            replica_id,
3313            collection_id,
3314            write_frontier: as_of,
3315            introspection_tx,
3316        };
3317
3318        self_.report_initial_state();
3319        self_
3320    }
3321
3322    /// Reports the initial introspection state.
3323    fn report_initial_state(&self) {
3324        let row = self.write_frontier_row();
3325        let updates = vec![(row, Diff::ONE)];
3326        self.send(IntrospectionType::ReplicaFrontiers, updates);
3327    }
3328
3329    /// Observe the given current write frontier and update the introspection state as necessary.
3330    fn observe_frontier(&mut self, write_frontier: &Antichain<T>) {
3331        if self.write_frontier == *write_frontier {
3332            return; // no change
3333        }
3334
3335        let retraction = self.write_frontier_row();
3336        self.write_frontier.clone_from(write_frontier);
3337        let insertion = self.write_frontier_row();
3338
3339        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3340        self.send(IntrospectionType::ReplicaFrontiers, updates);
3341    }
3342
3343    /// Return a `Row` reflecting the current replica write frontier.
3344    fn write_frontier_row(&self) -> Row {
3345        let write_frontier = self
3346            .write_frontier
3347            .as_option()
3348            .map_or(Datum::Null, |ts| ts.clone().into());
3349        Row::pack_slice(&[
3350            Datum::String(&self.collection_id.to_string()),
3351            Datum::String(&self.replica_id.to_string()),
3352            write_frontier,
3353        ])
3354    }
3355
3356    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3357        // Failure to send means the `ComputeController` has been dropped and doesn't care about
3358        // introspection updates anymore.
3359        let _ = self.introspection_tx.send((introspection_type, updates));
3360    }
3361}
3362
3363impl<T: ComputeControllerTimestamp> Drop for ReplicaCollectionIntrospection<T> {
3364    fn drop(&mut self) {
3365        // Retract the write frontier.
3366        let row = self.write_frontier_row();
3367        let updates = vec![(row, Diff::MINUS_ONE)];
3368        self.send(IntrospectionType::ReplicaFrontiers, updates);
3369    }
3370}