Skip to main content

mz_compute_client/controller/
instance.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller for a compute instance.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::ComputeInstanceId;
21use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
22use mz_compute_types::plan::render_plan::RenderPlan;
23use mz_compute_types::sinks::{
24    ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, MaterializedViewSinkConnection,
25};
26use mz_compute_types::sources::SourceInstanceDesc;
27use mz_controller_types::dyncfgs::{
28    ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, WALLCLOCK_LAG_RECORDING_INTERVAL,
29};
30use mz_dyncfg::ConfigSet;
31use mz_expr::RowSetFinishing;
32use mz_ore::cast::CastFrom;
33use mz_ore::channel::instrumented_unbounded_channel;
34use mz_ore::now::NowFn;
35use mz_ore::tracing::OpenTelemetryContext;
36use mz_ore::{soft_assert_or_log, soft_panic_or_log};
37use mz_persist_types::PersistLocation;
38use mz_repr::adt::timestamp::CheckedTimestamp;
39use mz_repr::refresh_schedule::RefreshSchedule;
40use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row};
41use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
42use mz_storage_types::read_holds::{self, ReadHold};
43use mz_storage_types::read_policy::ReadPolicy;
44use thiserror::Error;
45use timely::PartialOrder;
46use timely::progress::frontier::MutableAntichain;
47use timely::progress::{Antichain, ChangeBatch, Timestamp};
48use tokio::sync::mpsc::error::SendError;
49use tokio::sync::{mpsc, oneshot};
50use tracing::debug_span;
51use uuid::Uuid;
52
53use crate::controller::error::{
54    CollectionLookupError, CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
55};
56use crate::controller::replica::{ReplicaClient, ReplicaConfig};
57use crate::controller::{
58    ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, PeekNotification,
59    ReplicaId, StorageCollections,
60};
61use crate::logging::LogVariant;
62use crate::metrics::IntCounter;
63use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
64use crate::protocol::command::{
65    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
66};
67use crate::protocol::history::ComputeCommandHistory;
68use crate::protocol::response::{
69    ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StatusResponse,
70    SubscribeBatch, SubscribeResponse,
71};
72
73#[derive(Error, Debug)]
74#[error("replica exists already: {0}")]
75pub(super) struct ReplicaExists(pub ReplicaId);
76
77#[derive(Error, Debug)]
78#[error("replica does not exist: {0}")]
79pub(super) struct ReplicaMissing(pub ReplicaId);
80
81#[derive(Error, Debug)]
82pub(super) enum DataflowCreationError {
83    #[error("collection does not exist: {0}")]
84    CollectionMissing(GlobalId),
85    #[error("replica does not exist: {0}")]
86    ReplicaMissing(ReplicaId),
87    #[error("dataflow definition lacks an as_of value")]
88    MissingAsOf,
89    #[error("subscribe dataflow has an empty as_of")]
90    EmptyAsOfForSubscribe,
91    #[error("copy to dataflow has an empty as_of")]
92    EmptyAsOfForCopyTo,
93    #[error("no read hold provided for dataflow import: {0}")]
94    ReadHoldMissing(GlobalId),
95    #[error("insufficient read hold provided for dataflow import: {0}")]
96    ReadHoldInsufficient(GlobalId),
97}
98
99impl From<CollectionMissing> for DataflowCreationError {
100    fn from(error: CollectionMissing) -> Self {
101        Self::CollectionMissing(error.0)
102    }
103}
104
105#[derive(Error, Debug)]
106#[error("the instance has shut down")]
107pub(super) struct InstanceShutDown;
108
109/// Errors arising during peek processing.
110#[derive(Error, Debug)]
111pub enum PeekError {
112    /// The replica that the peek was issued against does not exist.
113    #[error("replica does not exist: {0}")]
114    ReplicaMissing(ReplicaId),
115    /// The read hold that was passed in is against the wrong collection.
116    #[error("read hold ID does not match peeked collection: {0}")]
117    ReadHoldIdMismatch(GlobalId),
118    /// The read hold that was passed in is for a later time than the peek's timestamp.
119    #[error("insufficient read hold provided: {0}")]
120    ReadHoldInsufficient(GlobalId),
121    /// The peek's target instance has shut down.
122    #[error("the instance has shut down")]
123    InstanceShutDown,
124}
125
126impl From<InstanceShutDown> for PeekError {
127    fn from(_error: InstanceShutDown) -> Self {
128        Self::InstanceShutDown
129    }
130}
131
132#[derive(Error, Debug)]
133pub(super) enum ReadPolicyError {
134    #[error("collection does not exist: {0}")]
135    CollectionMissing(GlobalId),
136    #[error("collection is write-only: {0}")]
137    WriteOnlyCollection(GlobalId),
138}
139
140impl From<CollectionMissing> for ReadPolicyError {
141    fn from(error: CollectionMissing) -> Self {
142        Self::CollectionMissing(error.0)
143    }
144}
145
146/// A command sent to an [`Instance`] task.
147type Command<T> = Box<dyn FnOnce(&mut Instance<T>) + Send>;
148
149/// A client for an `Instance` task.
150#[derive(Clone, derivative::Derivative)]
151#[derivative(Debug)]
152pub struct Client<T: ComputeControllerTimestamp> {
153    /// A sender for commands for the instance.
154    command_tx: mpsc::UnboundedSender<Command<T>>,
155    /// A sender for read hold changes for collections installed on the instance.
156    #[derivative(Debug = "ignore")]
157    read_hold_tx: read_holds::ChangeTx<T>,
158}
159
160impl<T: ComputeControllerTimestamp> Client<T> {
161    pub(super) fn read_hold_tx(&self) -> read_holds::ChangeTx<T> {
162        Arc::clone(&self.read_hold_tx)
163    }
164
165    /// Call a method to be run on the instance task, by sending a message to the instance.
166    /// Does not wait for a response message.
167    pub(super) fn call<F>(&self, f: F) -> Result<(), InstanceShutDown>
168    where
169        F: FnOnce(&mut Instance<T>) + Send + 'static,
170    {
171        let otel_ctx = OpenTelemetryContext::obtain();
172        self.command_tx
173            .send(Box::new(move |instance| {
174                let _span = debug_span!("instance::call").entered();
175                otel_ctx.attach_as_parent();
176
177                f(instance)
178            }))
179            .map_err(|_send_error| InstanceShutDown)
180    }
181
182    /// Call a method to be run on the instance task, by sending a message to the instance and
183    /// waiting for a response message.
184    pub(super) async fn call_sync<F, R>(&self, f: F) -> Result<R, InstanceShutDown>
185    where
186        F: FnOnce(&mut Instance<T>) -> R + Send + 'static,
187        R: Send + 'static,
188    {
189        let (tx, rx) = oneshot::channel();
190        let otel_ctx = OpenTelemetryContext::obtain();
191        self.command_tx
192            .send(Box::new(move |instance| {
193                let _span = debug_span!("instance::call_sync").entered();
194                otel_ctx.attach_as_parent();
195                let result = f(instance);
196                let _ = tx.send(result);
197            }))
198            .map_err(|_send_error| InstanceShutDown)?;
199
200        rx.await.map_err(|_| InstanceShutDown)
201    }
202}
203
204impl<T> Client<T>
205where
206    T: ComputeControllerTimestamp,
207{
208    pub(super) fn spawn(
209        id: ComputeInstanceId,
210        build_info: &'static BuildInfo,
211        storage: StorageCollections<T>,
212        peek_stash_persist_location: PersistLocation,
213        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
214        metrics: InstanceMetrics,
215        now: NowFn,
216        wallclock_lag: WallclockLagFn<T>,
217        dyncfg: Arc<ConfigSet>,
218        response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
219        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
220        read_only: bool,
221    ) -> Self {
222        let (command_tx, command_rx) = mpsc::unbounded_channel();
223
224        let read_hold_tx: read_holds::ChangeTx<_> = {
225            let command_tx = command_tx.clone();
226            Arc::new(move |id, change: ChangeBatch<_>| {
227                let cmd: Command<_> = {
228                    let change = change.clone();
229                    Box::new(move |i| i.apply_read_hold_change(id, change.clone()))
230                };
231                command_tx.send(cmd).map_err(|_| SendError((id, change)))
232            })
233        };
234
235        mz_ore::task::spawn(
236            || format!("compute-instance-{id}"),
237            Instance::new(
238                build_info,
239                storage,
240                peek_stash_persist_location,
241                arranged_logs,
242                metrics,
243                now,
244                wallclock_lag,
245                dyncfg,
246                command_rx,
247                response_tx,
248                Arc::clone(&read_hold_tx),
249                introspection_tx,
250                read_only,
251            )
252            .run(),
253        );
254
255        Self {
256            command_tx,
257            read_hold_tx,
258        }
259    }
260
261    /// Acquires a `ReadHold` and collection write frontier for each of the identified compute
262    /// collections.
263    pub async fn acquire_read_holds_and_collection_write_frontiers(
264        &self,
265        ids: Vec<GlobalId>,
266    ) -> Result<Vec<(GlobalId, ReadHold<T>, Antichain<T>)>, CollectionLookupError> {
267        self.call_sync(move |i| {
268            let mut result = Vec::new();
269            for id in ids.into_iter() {
270                result.push((
271                    id,
272                    i.acquire_read_hold(id)?,
273                    i.collection_write_frontier(id)?,
274                ));
275            }
276            Ok(result)
277        })
278        .await?
279    }
280
281    /// Issue a peek by calling into the instance task.
282    ///
283    /// If this returns an error, then it didn't modify any `Instance` state.
284    pub async fn peek(
285        &self,
286        peek_target: PeekTarget,
287        literal_constraints: Option<Vec<Row>>,
288        uuid: Uuid,
289        timestamp: T,
290        result_desc: RelationDesc,
291        finishing: RowSetFinishing,
292        map_filter_project: mz_expr::SafeMfpPlan,
293        target_read_hold: ReadHold<T>,
294        target_replica: Option<ReplicaId>,
295        peek_response_tx: oneshot::Sender<PeekResponse>,
296    ) -> Result<(), PeekError> {
297        self.call_sync(move |i| {
298            i.peek(
299                peek_target,
300                literal_constraints,
301                uuid,
302                timestamp,
303                result_desc,
304                finishing,
305                map_filter_project,
306                target_read_hold,
307                target_replica,
308                peek_response_tx,
309            )
310        })
311        .await?
312    }
313}
314
315/// A response from a replica, composed of a replica ID, the replica's current epoch, and the
316/// compute response itself.
317pub(super) type ReplicaResponse<T> = (ReplicaId, u64, ComputeResponse<T>);
318
319/// The state we keep for a compute instance.
320pub(super) struct Instance<T: ComputeControllerTimestamp> {
321    /// Build info for spawning replicas
322    build_info: &'static BuildInfo,
323    /// A handle providing access to storage collections.
324    storage_collections: StorageCollections<T>,
325    /// Whether instance initialization has been completed.
326    initialized: bool,
327    /// Whether this instance is in read-only mode.
328    ///
329    /// When in read-only mode, this instance will not update persistent state, such as
330    /// wallclock lag introspection.
331    read_only: bool,
332    /// The workload class of this instance.
333    ///
334    /// This is currently only used to annotate metrics.
335    workload_class: Option<String>,
336    /// The replicas of this compute instance.
337    replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
338    /// Currently installed compute collections.
339    ///
340    /// New entries are added for all collections exported from dataflows created through
341    /// [`Instance::create_dataflow`].
342    ///
343    /// Entries are removed by [`Instance::cleanup_collections`]. See that method's documentation
344    /// about the conditions for removing collection state.
345    collections: BTreeMap<GlobalId, CollectionState<T>>,
346    /// IDs of log sources maintained by this compute instance.
347    log_sources: BTreeMap<LogVariant, GlobalId>,
348    /// Currently outstanding peeks.
349    ///
350    /// New entries are added for all peeks initiated through [`Instance::peek`].
351    ///
352    /// The entry for a peek is only removed once all replicas have responded to the peek. This is
353    /// currently required to ensure all replicas have stopped reading from the peeked collection's
354    /// inputs before we allow them to compact. database-issues#4822 tracks changing this so we only have to wait
355    /// for the first peek response.
356    peeks: BTreeMap<Uuid, PendingPeek<T>>,
357    /// Currently in-progress subscribes.
358    ///
359    /// New entries are added for all subscribes exported from dataflows created through
360    /// [`Instance::create_dataflow`].
361    ///
362    /// The entry for a subscribe is removed once at least one replica has reported the subscribe
363    /// to have advanced to the empty frontier or to have been dropped, implying that no further
364    /// updates will be emitted for this subscribe.
365    ///
366    /// Note that subscribes are tracked both in `collections` and `subscribes`. `collections`
367    /// keeps track of the subscribe's upper and since frontiers and ensures appropriate read holds
368    /// on the subscribe's input. `subscribes` is only used to track which updates have been
369    /// emitted, to decide if new ones should be emitted or suppressed.
370    subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>,
371    /// Tracks all in-progress COPY TOs.
372    ///
373    /// New entries are added for all s3 oneshot sinks (corresponding to a COPY TO) exported from
374    /// dataflows created through [`Instance::create_dataflow`].
375    ///
376    /// The entry for a copy to is removed once at least one replica has finished
377    /// or the exporting collection is dropped.
378    copy_tos: BTreeSet<GlobalId>,
379    /// The command history, used when introducing new replicas or restarting existing replicas.
380    history: ComputeCommandHistory<UIntGauge, T>,
381    /// Receiver for commands to be executed.
382    command_rx: mpsc::UnboundedReceiver<Command<T>>,
383    /// Sender for responses to be delivered.
384    response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
385    /// Sender for introspection updates to be recorded.
386    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
387    /// The registry the controller uses to report metrics.
388    metrics: InstanceMetrics,
389    /// Dynamic system configuration.
390    dyncfg: Arc<ConfigSet>,
391
392    /// The persist location where we can stash large peek results.
393    peek_stash_persist_location: PersistLocation,
394
395    /// A function that produces the current wallclock time.
396    now: NowFn,
397    /// A function that computes the lag between the given time and wallclock time.
398    wallclock_lag: WallclockLagFn<T>,
399    /// The last time wallclock lag introspection was recorded.
400    wallclock_lag_last_recorded: DateTime<Utc>,
401
402    /// Sender for updates to collection read holds.
403    ///
404    /// Copies of this sender are given to [`ReadHold`]s that are created in
405    /// [`CollectionState::new`].
406    read_hold_tx: read_holds::ChangeTx<T>,
407    /// A sender for responses from replicas.
408    replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
409    /// A receiver for responses from replicas.
410    replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse<T>, IntCounter>,
411}
412
413impl<T: ComputeControllerTimestamp> Instance<T> {
414    /// Acquire a handle to the collection state associated with `id`.
415    fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, CollectionMissing> {
416        self.collections.get(&id).ok_or(CollectionMissing(id))
417    }
418
419    /// Acquire a mutable handle to the collection state associated with `id`.
420    fn collection_mut(
421        &mut self,
422        id: GlobalId,
423    ) -> Result<&mut CollectionState<T>, CollectionMissing> {
424        self.collections.get_mut(&id).ok_or(CollectionMissing(id))
425    }
426
427    /// Acquire a handle to the collection state associated with `id`.
428    ///
429    /// # Panics
430    ///
431    /// Panics if the identified collection does not exist.
432    fn expect_collection(&self, id: GlobalId) -> &CollectionState<T> {
433        self.collections.get(&id).expect("collection must exist")
434    }
435
436    /// Acquire a mutable handle to the collection state associated with `id`.
437    ///
438    /// # Panics
439    ///
440    /// Panics if the identified collection does not exist.
441    fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T> {
442        self.collections
443            .get_mut(&id)
444            .expect("collection must exist")
445    }
446
447    fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState<T>)> {
448        self.collections.iter().map(|(id, coll)| (*id, coll))
449    }
450
451    /// Add a collection to the instance state.
452    ///
453    /// # Panics
454    ///
455    /// Panics if a collection with the same ID exists already.
456    fn add_collection(
457        &mut self,
458        id: GlobalId,
459        as_of: Antichain<T>,
460        shared: SharedCollectionState<T>,
461        storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
462        compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
463        replica_input_read_holds: Vec<ReadHold<T>>,
464        write_only: bool,
465        storage_sink: bool,
466        initial_as_of: Option<Antichain<T>>,
467        refresh_schedule: Option<RefreshSchedule>,
468    ) {
469        // Add global collection state.
470        let introspection = CollectionIntrospection::new(
471            id,
472            self.introspection_tx.clone(),
473            as_of.clone(),
474            storage_sink,
475            initial_as_of,
476            refresh_schedule,
477        );
478        let mut state = CollectionState::new(
479            id,
480            as_of.clone(),
481            shared,
482            storage_dependencies,
483            compute_dependencies,
484            Arc::clone(&self.read_hold_tx),
485            introspection,
486        );
487        // If the collection is write-only, clear its read policy to reflect that.
488        if write_only {
489            state.read_policy = None;
490        }
491
492        if let Some(previous) = self.collections.insert(id, state) {
493            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
494        }
495
496        // Add per-replica collection state.
497        for replica in self.replicas.values_mut() {
498            replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
499        }
500
501        // Update introspection.
502        self.report_dependency_updates(id, Diff::ONE);
503    }
504
505    fn remove_collection(&mut self, id: GlobalId) {
506        // Update introspection.
507        self.report_dependency_updates(id, Diff::MINUS_ONE);
508
509        // Remove per-replica collection state.
510        for replica in self.replicas.values_mut() {
511            replica.remove_collection(id);
512        }
513
514        // Remove global collection state.
515        self.collections.remove(&id);
516    }
517
518    fn add_replica_state(
519        &mut self,
520        id: ReplicaId,
521        client: ReplicaClient<T>,
522        config: ReplicaConfig,
523        epoch: u64,
524    ) {
525        let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
526
527        let metrics = self.metrics.for_replica(id);
528        let mut replica = ReplicaState::new(
529            id,
530            client,
531            config,
532            metrics,
533            self.introspection_tx.clone(),
534            epoch,
535        );
536
537        // Add per-replica collection state.
538        for (collection_id, collection) in &self.collections {
539            // Skip log collections not maintained by this replica.
540            if collection.log_collection && !log_ids.contains(collection_id) {
541                continue;
542            }
543
544            let as_of = if collection.log_collection {
545                // For log collections, we don't send a `CreateDataflow` command to the replica, so
546                // it doesn't know which as-of the controler chose and defaults to the minimum
547                // frontier instead. We need to initialize the controller-side tracking with the
548                // same frontier, to avoid observing regressions in the reported frontiers.
549                Antichain::from_elem(T::minimum())
550            } else {
551                collection.read_frontier().to_owned()
552            };
553
554            let input_read_holds = collection.storage_dependencies.values().cloned().collect();
555            replica.add_collection(*collection_id, as_of, input_read_holds);
556        }
557
558        self.replicas.insert(id, replica);
559    }
560
561    /// Enqueue the given response for delivery to the controller clients.
562    fn deliver_response(&self, response: ComputeControllerResponse<T>) {
563        // Failure to send means the `ComputeController` has been dropped and doesn't care about
564        // responses anymore.
565        let _ = self.response_tx.send(response);
566    }
567
568    /// Enqueue the given introspection updates for recording.
569    fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
570        // Failure to send means the `ComputeController` has been dropped and doesn't care about
571        // introspection updates anymore.
572        let _ = self.introspection_tx.send((type_, updates));
573    }
574
575    /// Returns whether the identified replica exists.
576    fn replica_exists(&self, id: ReplicaId) -> bool {
577        self.replicas.contains_key(&id)
578    }
579
580    /// Return the IDs of pending peeks targeting the specified replica.
581    fn peeks_targeting(
582        &self,
583        replica_id: ReplicaId,
584    ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)> {
585        self.peeks.iter().filter_map(move |(uuid, peek)| {
586            if peek.target_replica == Some(replica_id) {
587                Some((*uuid, peek))
588            } else {
589                None
590            }
591        })
592    }
593
594    /// Return the IDs of in-progress subscribes targeting the specified replica.
595    fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
596        self.subscribes.iter().filter_map(move |(id, subscribe)| {
597            let targeting = subscribe.target_replica == Some(replica_id);
598            targeting.then_some(*id)
599        })
600    }
601
602    /// Update introspection with the current collection frontiers.
603    ///
604    /// We could also do this directly in response to frontier changes, but doing it periodically
605    /// lets us avoid emitting some introspection updates that can be consolidated (e.g. a write
606    /// frontier updated immediately followed by a read frontier update).
607    ///
608    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
609    /// per second during normal operation.
610    fn update_frontier_introspection(&mut self) {
611        for collection in self.collections.values_mut() {
612            collection
613                .introspection
614                .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
615        }
616
617        for replica in self.replicas.values_mut() {
618            for collection in replica.collections.values_mut() {
619                collection
620                    .introspection
621                    .observe_frontier(&collection.write_frontier);
622            }
623        }
624    }
625
626    /// Refresh the controller state metrics for this instance.
627    ///
628    /// We could also do state metric updates directly in response to state changes, but that would
629    /// mean littering the code with metric update calls. Encapsulating state metric maintenance in
630    /// a single method is less noisy.
631    ///
632    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
633    /// per second during normal operation.
634    fn refresh_state_metrics(&self) {
635        let unscheduled_collections_count =
636            self.collections.values().filter(|c| !c.scheduled).count();
637        let connected_replica_count = self
638            .replicas
639            .values()
640            .filter(|r| r.client.is_connected())
641            .count();
642
643        self.metrics
644            .replica_count
645            .set(u64::cast_from(self.replicas.len()));
646        self.metrics
647            .collection_count
648            .set(u64::cast_from(self.collections.len()));
649        self.metrics
650            .collection_unscheduled_count
651            .set(u64::cast_from(unscheduled_collections_count));
652        self.metrics
653            .peek_count
654            .set(u64::cast_from(self.peeks.len()));
655        self.metrics
656            .subscribe_count
657            .set(u64::cast_from(self.subscribes.len()));
658        self.metrics
659            .copy_to_count
660            .set(u64::cast_from(self.copy_tos.len()));
661        self.metrics
662            .connected_replica_count
663            .set(u64::cast_from(connected_replica_count));
664    }
665
666    /// Refresh the wallclock lag introspection and metrics with the current lag values.
667    ///
668    /// This method produces wallclock lag metrics of two different shapes:
669    ///
670    /// * Histories: For each replica and each collection, we measure the lag of the write frontier
671    ///   behind the wallclock time every second. Every minute we emit the maximum lag observed
672    ///   over the last minute, together with the current time.
673    /// * Histograms: For each collection, we measure the lag of the write frontier behind
674    ///   wallclock time every second. Every minute we emit all lags observed over the last minute,
675    ///   together with the current histogram period.
676    ///
677    /// Histories are emitted to both Mz introspection and Prometheus, histograms only to
678    /// introspection. We treat lags of unreadable collections (i.e. collections that contain no
679    /// readable times) as undefined and set them to NULL in introspection and `u64::MAX` in
680    /// Prometheus.
681    ///
682    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
683    /// per second during normal operation.
684    fn refresh_wallclock_lag(&mut self) {
685        let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
686            Some(ts) => (self.wallclock_lag)(ts.clone()),
687            None => Duration::ZERO,
688        };
689
690        let now_ms = (self.now)();
691        let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
692        let histogram_labels = match &self.workload_class {
693            Some(wc) => [("workload_class", wc.clone())].into(),
694            None => BTreeMap::new(),
695        };
696
697        // First, iterate over all collections and collect histogram measurements.
698        // We keep a record of unreadable collections, so we can emit undefined lags for those here
699        // and below when we collect history measurements.
700        let mut unreadable_collections = BTreeSet::new();
701        for (id, collection) in &mut self.collections {
702            // We need to ask the storage controller for the read frontiers of storage collections.
703            let read_frontier = match self.storage_collections.collection_frontiers(*id) {
704                Ok(f) => f.read_capabilities,
705                Err(_) => collection.read_frontier(),
706            };
707            let write_frontier = collection.write_frontier();
708            let collection_unreadable = PartialOrder::less_equal(&write_frontier, &read_frontier);
709            if collection_unreadable {
710                unreadable_collections.insert(id);
711            }
712
713            if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
714                let bucket = if collection_unreadable {
715                    WallclockLag::Undefined
716                } else {
717                    let lag = frontier_lag(&write_frontier);
718                    let lag = lag.as_secs().next_power_of_two();
719                    WallclockLag::Seconds(lag)
720                };
721
722                let key = (histogram_period, bucket, histogram_labels.clone());
723                *stash.entry(key).or_default() += Diff::ONE;
724            }
725        }
726
727        // Second, iterate over all per-replica collections and collect history measurements.
728        for replica in self.replicas.values_mut() {
729            for (id, collection) in &mut replica.collections {
730                let lag = if unreadable_collections.contains(&id) {
731                    WallclockLag::Undefined
732                } else {
733                    let lag = frontier_lag(&collection.write_frontier);
734                    WallclockLag::Seconds(lag.as_secs())
735                };
736
737                if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
738                    *wallclock_lag_max = (*wallclock_lag_max).max(lag);
739                }
740
741                if let Some(metrics) = &mut collection.metrics {
742                    // No way to specify values as undefined in Prometheus metrics, so we use the
743                    // maximum value instead.
744                    let secs = lag.unwrap_seconds_or(u64::MAX);
745                    metrics.wallclock_lag.observe(secs);
746                };
747            }
748        }
749
750        // Record lags to persist, if it's time.
751        self.maybe_record_wallclock_lag();
752    }
753
754    /// Produce new wallclock lag introspection updates, provided enough time has passed since the
755    /// last recording.
756    //
757    /// We emit new introspection updates if the system time has passed into a new multiple of the
758    /// recording interval (typically 1 minute) since the last refresh. The storage controller uses
759    /// the same approach, ensuring that both controllers commit their lags at roughly the same
760    /// time, avoiding confusion caused by inconsistencies.
761    fn maybe_record_wallclock_lag(&mut self) {
762        if self.read_only {
763            return;
764        }
765
766        let duration_trunc = |datetime: DateTime<_>, interval| {
767            let td = TimeDelta::from_std(interval).ok()?;
768            datetime.duration_trunc(td).ok()
769        };
770
771        let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
772        let now_dt = mz_ore::now::to_datetime((self.now)());
773        let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
774            soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
775            let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
776            duration_trunc(now_dt, *default).unwrap()
777        });
778        if now_trunc <= self.wallclock_lag_last_recorded {
779            return;
780        }
781
782        let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
783
784        let mut history_updates = Vec::new();
785        for (replica_id, replica) in &mut self.replicas {
786            for (collection_id, collection) in &mut replica.collections {
787                let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
788                    continue;
789                };
790
791                let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
792                let row = Row::pack_slice(&[
793                    Datum::String(&collection_id.to_string()),
794                    Datum::String(&replica_id.to_string()),
795                    max_lag.into_interval_datum(),
796                    Datum::TimestampTz(now_ts),
797                ]);
798                history_updates.push((row, Diff::ONE));
799            }
800        }
801        if !history_updates.is_empty() {
802            self.deliver_introspection_updates(
803                IntrospectionType::WallclockLagHistory,
804                history_updates,
805            );
806        }
807
808        let mut histogram_updates = Vec::new();
809        let mut row_buf = Row::default();
810        for (collection_id, collection) in &mut self.collections {
811            let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
812                continue;
813            };
814
815            for ((period, lag, labels), count) in std::mem::take(stash) {
816                let mut packer = row_buf.packer();
817                packer.extend([
818                    Datum::TimestampTz(period.start),
819                    Datum::TimestampTz(period.end),
820                    Datum::String(&collection_id.to_string()),
821                    lag.into_uint64_datum(),
822                ]);
823                let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
824                packer.push_dict(labels);
825
826                histogram_updates.push((row_buf.clone(), count));
827            }
828        }
829        if !histogram_updates.is_empty() {
830            self.deliver_introspection_updates(
831                IntrospectionType::WallclockLagHistogram,
832                histogram_updates,
833            );
834        }
835
836        self.wallclock_lag_last_recorded = now_trunc;
837    }
838
839    /// Report updates (inserts or retractions) to the identified collection's dependencies.
840    ///
841    /// # Panics
842    ///
843    /// Panics if the identified collection does not exist.
844    fn report_dependency_updates(&self, id: GlobalId, diff: Diff) {
845        let collection = self.expect_collection(id);
846        let dependencies = collection.dependency_ids();
847
848        let updates = dependencies
849            .map(|dependency_id| {
850                let row = Row::pack_slice(&[
851                    Datum::String(&id.to_string()),
852                    Datum::String(&dependency_id.to_string()),
853                ]);
854                (row, diff)
855            })
856            .collect();
857
858        self.deliver_introspection_updates(IntrospectionType::ComputeDependencies, updates);
859    }
860
861    /// Returns `true` if the given collection is hydrated on at least one
862    /// replica.
863    ///
864    /// This also returns `true` in case this cluster does not have any
865    /// replicas.
866    #[mz_ore::instrument(level = "debug")]
867    pub fn collection_hydrated(
868        &self,
869        collection_id: GlobalId,
870    ) -> Result<bool, CollectionLookupError> {
871        if self.replicas.is_empty() {
872            return Ok(true);
873        }
874
875        for replica_state in self.replicas.values() {
876            let collection_state = replica_state
877                .collections
878                .get(&collection_id)
879                .ok_or(CollectionLookupError::CollectionMissing(collection_id))?;
880
881            if collection_state.hydrated() {
882                return Ok(true);
883            }
884        }
885
886        Ok(false)
887    }
888
889    /// Returns `true` if each non-transient, non-excluded collection is hydrated on at
890    /// least one replica.
891    ///
892    /// This also returns `true` in case this cluster does not have any
893    /// replicas.
894    #[mz_ore::instrument(level = "debug")]
895    pub fn collections_hydrated_on_replicas(
896        &self,
897        target_replica_ids: Option<Vec<ReplicaId>>,
898        exclude_collections: &BTreeSet<GlobalId>,
899    ) -> Result<bool, HydrationCheckBadTarget> {
900        if self.replicas.is_empty() {
901            return Ok(true);
902        }
903        let mut all_hydrated = true;
904        let target_replicas: BTreeSet<ReplicaId> = self
905            .replicas
906            .keys()
907            .filter_map(|id| match target_replica_ids {
908                None => Some(id.clone()),
909                Some(ref ids) if ids.contains(id) => Some(id.clone()),
910                Some(_) => None,
911            })
912            .collect();
913        if let Some(targets) = target_replica_ids {
914            if target_replicas.is_empty() {
915                return Err(HydrationCheckBadTarget(targets));
916            }
917        }
918
919        for (id, _collection) in self.collections_iter() {
920            if id.is_transient() || exclude_collections.contains(&id) {
921                continue;
922            }
923
924            let mut collection_hydrated = false;
925            for replica_state in self.replicas.values() {
926                if !target_replicas.contains(&replica_state.id) {
927                    continue;
928                }
929                let collection_state = replica_state
930                    .collections
931                    .get(&id)
932                    .expect("missing collection state");
933
934                if collection_state.hydrated() {
935                    collection_hydrated = true;
936                    break;
937                }
938            }
939
940            if !collection_hydrated {
941                tracing::info!("collection {id} is not hydrated on any replica");
942                all_hydrated = false;
943                // We continue with our loop instead of breaking out early, so
944                // that we log all non-hydrated replicas.
945            }
946        }
947
948        Ok(all_hydrated)
949    }
950
951    /// Clean up collection state that is not needed anymore.
952    ///
953    /// Three conditions need to be true before we can remove state for a collection:
954    ///
955    ///  1. A client must have explicitly dropped the collection. If that is not the case, clients
956    ///     can still reasonably assume that the controller knows about the collection and can
957    ///     answer queries about it.
958    ///  2. There must be no outstanding read capabilities on the collection. As long as someone
959    ///     still holds read capabilities on a collection, we need to keep it around to be able
960    ///     to properly handle downgrading of said capabilities.
961    ///  3. All replica frontiers for the collection must have advanced to the empty frontier.
962    ///     Advancement to the empty frontiers signals that replicas are done computing the
963    ///     collection and that they won't send more `ComputeResponse`s for it. As long as we might
964    ///     receive responses for a collection we want to keep it around to be able to validate and
965    ///     handle these responses.
966    fn cleanup_collections(&mut self) {
967        let to_remove: Vec<_> = self
968            .collections_iter()
969            .filter(|(id, collection)| {
970                collection.dropped
971                    && collection.shared.lock_read_capabilities(|c| c.is_empty())
972                    && self
973                        .replicas
974                        .values()
975                        .all(|r| r.collection_frontiers_empty(*id))
976            })
977            .map(|(id, _collection)| id)
978            .collect();
979
980        for id in to_remove {
981            self.remove_collection(id);
982        }
983    }
984
985    /// Returns the state of the [`Instance`] formatted as JSON.
986    ///
987    /// The returned value is not guaranteed to be stable and may change at any point in time.
988    #[mz_ore::instrument(level = "debug")]
989    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
990        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
991        // returned object as a tradeoff between usability and stability. `serde_json` will fail
992        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
993        // prevents a future unrelated change from silently breaking this method.
994
995        // Destructure `self` here so we don't forget to consider dumping newly added fields.
996        let Self {
997            build_info: _,
998            storage_collections: _,
999            peek_stash_persist_location: _,
1000            initialized,
1001            read_only,
1002            workload_class,
1003            replicas,
1004            collections,
1005            log_sources: _,
1006            peeks,
1007            subscribes,
1008            copy_tos,
1009            history: _,
1010            command_rx: _,
1011            response_tx: _,
1012            introspection_tx: _,
1013            metrics: _,
1014            dyncfg: _,
1015            now: _,
1016            wallclock_lag: _,
1017            wallclock_lag_last_recorded,
1018            read_hold_tx: _,
1019            replica_tx: _,
1020            replica_rx: _,
1021        } = self;
1022
1023        let replicas: BTreeMap<_, _> = replicas
1024            .iter()
1025            .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
1026            .collect::<Result<_, anyhow::Error>>()?;
1027        let collections: BTreeMap<_, _> = collections
1028            .iter()
1029            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
1030            .collect();
1031        let peeks: BTreeMap<_, _> = peeks
1032            .iter()
1033            .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
1034            .collect();
1035        let subscribes: BTreeMap<_, _> = subscribes
1036            .iter()
1037            .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
1038            .collect();
1039        let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
1040        let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
1041
1042        Ok(serde_json::json!({
1043            "initialized": initialized,
1044            "read_only": read_only,
1045            "workload_class": workload_class,
1046            "replicas": replicas,
1047            "collections": collections,
1048            "peeks": peeks,
1049            "subscribes": subscribes,
1050            "copy_tos": copy_tos,
1051            "wallclock_lag_last_recorded": wallclock_lag_last_recorded,
1052        }))
1053    }
1054
1055    /// Reports the current write frontier for the identified compute collection.
1056    fn collection_write_frontier(&self, id: GlobalId) -> Result<Antichain<T>, CollectionMissing> {
1057        Ok(self.collection(id)?.write_frontier())
1058    }
1059}
1060
1061impl<T> Instance<T>
1062where
1063    T: ComputeControllerTimestamp,
1064{
1065    fn new(
1066        build_info: &'static BuildInfo,
1067        storage: StorageCollections<T>,
1068        peek_stash_persist_location: PersistLocation,
1069        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
1070        metrics: InstanceMetrics,
1071        now: NowFn,
1072        wallclock_lag: WallclockLagFn<T>,
1073        dyncfg: Arc<ConfigSet>,
1074        command_rx: mpsc::UnboundedReceiver<Command<T>>,
1075        response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
1076        read_hold_tx: read_holds::ChangeTx<T>,
1077        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
1078        read_only: bool,
1079    ) -> Self {
1080        let mut collections = BTreeMap::new();
1081        let mut log_sources = BTreeMap::new();
1082        for (log, id, shared) in arranged_logs {
1083            let collection = CollectionState::new_log_collection(
1084                id,
1085                shared,
1086                Arc::clone(&read_hold_tx),
1087                introspection_tx.clone(),
1088            );
1089            collections.insert(id, collection);
1090            log_sources.insert(log, id);
1091        }
1092
1093        let history = ComputeCommandHistory::new(metrics.for_history());
1094
1095        let send_count = metrics.response_send_count.clone();
1096        let recv_count = metrics.response_recv_count.clone();
1097        let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
1098
1099        let now_dt = mz_ore::now::to_datetime(now());
1100
1101        Self {
1102            build_info,
1103            storage_collections: storage,
1104            peek_stash_persist_location,
1105            initialized: false,
1106            read_only,
1107            workload_class: None,
1108            replicas: Default::default(),
1109            collections,
1110            log_sources,
1111            peeks: Default::default(),
1112            subscribes: Default::default(),
1113            copy_tos: Default::default(),
1114            history,
1115            command_rx,
1116            response_tx,
1117            introspection_tx,
1118            metrics,
1119            dyncfg,
1120            now,
1121            wallclock_lag,
1122            wallclock_lag_last_recorded: now_dt,
1123            read_hold_tx,
1124            replica_tx,
1125            replica_rx,
1126        }
1127    }
1128
1129    async fn run(mut self) {
1130        self.send(ComputeCommand::Hello {
1131            // The nonce is protocol iteration-specific and will be set in
1132            // `ReplicaTask::specialize_command`.
1133            nonce: Uuid::default(),
1134        });
1135
1136        let instance_config = InstanceConfig {
1137            peek_stash_persist_location: self.peek_stash_persist_location.clone(),
1138            // The remaining fields are replica-specific and will be set in
1139            // `ReplicaTask::specialize_command`.
1140            logging: Default::default(),
1141            expiration_offset: Default::default(),
1142        };
1143
1144        self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
1145
1146        loop {
1147            tokio::select! {
1148                command = self.command_rx.recv() => match command {
1149                    Some(cmd) => cmd(&mut self),
1150                    None => break,
1151                },
1152                response = self.replica_rx.recv() => match response {
1153                    Some(response) => self.handle_response(response),
1154                    None => unreachable!("self owns a sender side of the channel"),
1155                }
1156            }
1157        }
1158    }
1159
1160    /// Update instance configuration.
1161    #[mz_ore::instrument(level = "debug")]
1162    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
1163        if let Some(workload_class) = &config_params.workload_class {
1164            self.workload_class = workload_class.clone();
1165        }
1166
1167        let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
1168        self.send(command);
1169    }
1170
1171    /// Marks the end of any initialization commands.
1172    ///
1173    /// Intended to be called by `Controller`, rather than by other code.
1174    /// Calling this method repeatedly has no effect.
1175    #[mz_ore::instrument(level = "debug")]
1176    pub fn initialization_complete(&mut self) {
1177        // The compute protocol requires that `InitializationComplete` is sent only once.
1178        if !self.initialized {
1179            self.send(ComputeCommand::InitializationComplete);
1180            self.initialized = true;
1181        }
1182    }
1183
1184    /// Allows collections to affect writes to external systems (persist).
1185    ///
1186    /// Calling this method repeatedly has no effect.
1187    #[mz_ore::instrument(level = "debug")]
1188    pub fn allow_writes(&mut self, collection_id: GlobalId) -> Result<(), CollectionMissing> {
1189        let collection = self.collection_mut(collection_id)?;
1190
1191        // Do not send redundant allow-writes commands.
1192        if !collection.read_only {
1193            return Ok(());
1194        }
1195
1196        // Don't send allow-writes for collections that are not installed.
1197        let as_of = collection.read_frontier();
1198
1199        // If the collection has an empty `as_of`, it was either never installed on the replica or
1200        // has since been dropped. In either case the replica does not expect any commands for it.
1201        if as_of.is_empty() {
1202            return Ok(());
1203        }
1204
1205        collection.read_only = false;
1206        self.send(ComputeCommand::AllowWrites(collection_id));
1207
1208        Ok(())
1209    }
1210
1211    /// Shut down this instance.
1212    ///
1213    /// This method runs various assertions ensuring the instance state is empty. It exists to help
1214    /// us find bugs where the client drops a compute instance that still has replicas or
1215    /// collections installed, and later assumes that said replicas/collections still exists.
1216    ///
1217    /// # Panics
1218    ///
1219    /// Panics if the compute instance still has active replicas.
1220    /// Panics if the compute instance still has collections installed.
1221    #[mz_ore::instrument(level = "debug")]
1222    pub fn shutdown(&mut self) {
1223        // Taking the `command_rx` ensures that the [`Instance::run`] loop terminates.
1224        let (_tx, rx) = mpsc::unbounded_channel();
1225        let mut command_rx = std::mem::replace(&mut self.command_rx, rx);
1226
1227        // Apply all outstanding read hold changes. This might cause read hold downgrades to be
1228        // added to `command_tx`, so we need to apply those in a loop.
1229        //
1230        // TODO(teskje): Make `Command` an enum and assert that all received commands are read
1231        // hold downgrades.
1232        while let Ok(cmd) = command_rx.try_recv() {
1233            cmd(self);
1234        }
1235
1236        // Collections might have been dropped but not cleaned up yet.
1237        self.cleanup_collections();
1238
1239        let stray_replicas: Vec<_> = self.replicas.keys().collect();
1240        soft_assert_or_log!(
1241            stray_replicas.is_empty(),
1242            "dropped instance still has provisioned replicas: {stray_replicas:?}",
1243        );
1244
1245        let collections = self.collections.iter();
1246        let stray_collections: Vec<_> = collections
1247            .filter(|(_, c)| !c.log_collection)
1248            .map(|(id, _)| id)
1249            .collect();
1250        soft_assert_or_log!(
1251            stray_collections.is_empty(),
1252            "dropped instance still has installed collections: {stray_collections:?}",
1253        );
1254    }
1255
1256    /// Sends a command to all replicas of this instance.
1257    #[mz_ore::instrument(level = "debug")]
1258    fn send(&mut self, cmd: ComputeCommand<T>) {
1259        // Record the command so that new replicas can be brought up to speed.
1260        self.history.push(cmd.clone());
1261
1262        // Clone the command for each active replica.
1263        for replica in self.replicas.values_mut() {
1264            // Swallow error, we'll notice because the replica task has stopped.
1265            let _ = replica.client.send(cmd.clone());
1266        }
1267    }
1268
1269    /// Add a new instance replica, by ID.
1270    #[mz_ore::instrument(level = "debug")]
1271    pub fn add_replica(
1272        &mut self,
1273        id: ReplicaId,
1274        mut config: ReplicaConfig,
1275        epoch: Option<u64>,
1276    ) -> Result<(), ReplicaExists> {
1277        if self.replica_exists(id) {
1278            return Err(ReplicaExists(id));
1279        }
1280
1281        config.logging.index_logs = self.log_sources.clone();
1282
1283        let epoch = epoch.unwrap_or(1);
1284        let metrics = self.metrics.for_replica(id);
1285        let client = ReplicaClient::spawn(
1286            id,
1287            self.build_info,
1288            config.clone(),
1289            epoch,
1290            metrics.clone(),
1291            Arc::clone(&self.dyncfg),
1292            self.replica_tx.clone(),
1293        );
1294
1295        // Take this opportunity to clean up the history we should present.
1296        self.history.reduce();
1297
1298        // Advance the uppers of source imports
1299        self.history.update_source_uppers(&self.storage_collections);
1300
1301        // Replay the commands at the client, creating new dataflow identifiers.
1302        for command in self.history.iter() {
1303            if client.send(command.clone()).is_err() {
1304                // We swallow the error here. On the next send, we will fail again, and
1305                // restart the connection as well as this rehydration.
1306                tracing::warn!("Replica {:?} connection terminated during hydration", id);
1307                break;
1308            }
1309        }
1310
1311        // Add replica to tracked state.
1312        self.add_replica_state(id, client, config, epoch);
1313
1314        Ok(())
1315    }
1316
1317    /// Remove an existing instance replica, by ID.
1318    #[mz_ore::instrument(level = "debug")]
1319    pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1320        self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1321
1322        // Subscribes targeting this replica either won't be served anymore (if the replica is
1323        // dropped) or might produce inconsistent output (if the target collection is an
1324        // introspection index). We produce an error to inform upstream.
1325        let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1326        for subscribe_id in to_drop {
1327            let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1328            let response = ComputeControllerResponse::SubscribeResponse(
1329                subscribe_id,
1330                SubscribeBatch {
1331                    lower: subscribe.frontier.clone(),
1332                    upper: subscribe.frontier,
1333                    updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1334                },
1335            );
1336            self.deliver_response(response);
1337        }
1338
1339        // Peeks targeting this replica might not be served anymore (if the replica is dropped).
1340        // If the replica has failed it might come back and respond to the peek later, but it still
1341        // seems like a good idea to cancel the peek to inform the caller about the failure. This
1342        // is consistent with how we handle targeted subscribes above.
1343        let mut peek_responses = Vec::new();
1344        let mut to_drop = Vec::new();
1345        for (uuid, peek) in self.peeks_targeting(id) {
1346            peek_responses.push(ComputeControllerResponse::PeekNotification(
1347                uuid,
1348                PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1349                peek.otel_ctx.clone(),
1350            ));
1351            to_drop.push(uuid);
1352        }
1353        for response in peek_responses {
1354            self.deliver_response(response);
1355        }
1356        for uuid in to_drop {
1357            let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1358            self.finish_peek(uuid, response);
1359        }
1360
1361        // We might have a chance to forward implied capabilities and reduce the cost of bringing
1362        // up the next replica, if the dropped replica was the only one in the cluster.
1363        self.forward_implied_capabilities();
1364
1365        Ok(())
1366    }
1367
1368    /// Rehydrate the given instance replica.
1369    ///
1370    /// # Panics
1371    ///
1372    /// Panics if the specified replica does not exist.
1373    fn rehydrate_replica(&mut self, id: ReplicaId) {
1374        let config = self.replicas[&id].config.clone();
1375        let epoch = self.replicas[&id].epoch + 1;
1376
1377        self.remove_replica(id).expect("replica must exist");
1378        let result = self.add_replica(id, config, Some(epoch));
1379
1380        match result {
1381            Ok(()) => (),
1382            Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1383        }
1384    }
1385
1386    /// Rehydrate any failed replicas of this instance.
1387    fn rehydrate_failed_replicas(&mut self) {
1388        let replicas = self.replicas.iter();
1389        let failed_replicas: Vec<_> = replicas
1390            .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1391            .collect();
1392
1393        for replica_id in failed_replicas {
1394            self.rehydrate_replica(replica_id);
1395        }
1396    }
1397
1398    /// Creates the described dataflow and initializes state for its output.
1399    ///
1400    /// This method expects a `DataflowDescription` with an `as_of` frontier specified, as well as
1401    /// for each imported collection a read hold in `import_read_holds` at at least the `as_of`.
1402    ///
1403    /// If a `subscribe_target_replica` is given, any subscribes exported by the dataflow are
1404    /// configured to target that replica, i.e., only subscribe responses sent by that replica are
1405    /// considered.
1406    #[mz_ore::instrument(level = "debug")]
1407    pub fn create_dataflow(
1408        &mut self,
1409        dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1410        import_read_holds: Vec<ReadHold<T>>,
1411        subscribe_target_replica: Option<ReplicaId>,
1412        mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState<T>>,
1413    ) -> Result<(), DataflowCreationError> {
1414        use DataflowCreationError::*;
1415
1416        if let Some(replica_id) = subscribe_target_replica {
1417            if !self.replica_exists(replica_id) {
1418                return Err(ReplicaMissing(replica_id));
1419            }
1420        }
1421
1422        // Simple sanity checks around `as_of`
1423        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1424        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1425            return Err(EmptyAsOfForSubscribe);
1426        }
1427        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1428            return Err(EmptyAsOfForCopyTo);
1429        }
1430
1431        // Collect all dependencies of the dataflow, and read holds on them at the `as_of`.
1432        let mut storage_dependencies = BTreeMap::new();
1433        let mut compute_dependencies = BTreeMap::new();
1434
1435        // When we install per-replica input read holds, we cannot use the `as_of` because of
1436        // reconciliation: Existing slow replicas might be reading from the inputs at times before
1437        // the `as_of` and we would rather not crash them by allowing their inputs to compact too
1438        // far. So instead we take read holds at the least time available.
1439        let mut replica_input_read_holds = Vec::new();
1440
1441        let mut import_read_holds: BTreeMap<_, _> =
1442            import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1443
1444        for &id in dataflow.source_imports.keys() {
1445            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1446            replica_input_read_holds.push(read_hold.clone());
1447
1448            read_hold
1449                .try_downgrade(as_of.clone())
1450                .map_err(|_| ReadHoldInsufficient(id))?;
1451            storage_dependencies.insert(id, read_hold);
1452        }
1453
1454        for &id in dataflow.index_imports.keys() {
1455            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1456            read_hold
1457                .try_downgrade(as_of.clone())
1458                .map_err(|_| ReadHoldInsufficient(id))?;
1459            compute_dependencies.insert(id, read_hold);
1460        }
1461
1462        // If the `as_of` is empty, we are not going to create a dataflow, so replicas won't read
1463        // from the inputs.
1464        if as_of.is_empty() {
1465            replica_input_read_holds = Default::default();
1466        }
1467
1468        // Install collection state for each of the exports.
1469        for export_id in dataflow.export_ids() {
1470            let shared = shared_collection_state
1471                .remove(&export_id)
1472                .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1473            let write_only = dataflow.sink_exports.contains_key(&export_id);
1474            let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1475
1476            self.add_collection(
1477                export_id,
1478                as_of.clone(),
1479                shared,
1480                storage_dependencies.clone(),
1481                compute_dependencies.clone(),
1482                replica_input_read_holds.clone(),
1483                write_only,
1484                storage_sink,
1485                dataflow.initial_storage_as_of.clone(),
1486                dataflow.refresh_schedule.clone(),
1487            );
1488
1489            // If the export is a storage sink, we can advance its write frontier to the write
1490            // frontier of the target storage collection.
1491            if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1492                self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1493            }
1494        }
1495
1496        // Initialize tracking of subscribes.
1497        for subscribe_id in dataflow.subscribe_ids() {
1498            self.subscribes
1499                .insert(subscribe_id, ActiveSubscribe::new(subscribe_target_replica));
1500        }
1501
1502        // Initialize tracking of copy tos.
1503        for copy_to_id in dataflow.copy_to_ids() {
1504            self.copy_tos.insert(copy_to_id);
1505        }
1506
1507        // Here we augment all imported sources and all exported sinks with the appropriate
1508        // storage metadata needed by the compute instance.
1509        let mut source_imports = BTreeMap::new();
1510        for (id, import) in dataflow.source_imports {
1511            let frontiers = self
1512                .storage_collections
1513                .collection_frontiers(id)
1514                .expect("collection exists");
1515
1516            let collection_metadata = self
1517                .storage_collections
1518                .collection_metadata(id)
1519                .expect("we have a read hold on this collection");
1520
1521            let desc = SourceInstanceDesc {
1522                storage_metadata: collection_metadata.clone(),
1523                arguments: import.desc.arguments,
1524                typ: import.desc.typ.clone(),
1525            };
1526            source_imports.insert(
1527                id,
1528                mz_compute_types::dataflows::SourceImport {
1529                    desc,
1530                    monotonic: import.monotonic,
1531                    with_snapshot: import.with_snapshot,
1532                    upper: frontiers.write_frontier,
1533                },
1534            );
1535        }
1536
1537        let mut sink_exports = BTreeMap::new();
1538        for (id, se) in dataflow.sink_exports {
1539            let connection = match se.connection {
1540                ComputeSinkConnection::MaterializedView(conn) => {
1541                    let metadata = self
1542                        .storage_collections
1543                        .collection_metadata(id)
1544                        .map_err(|_| CollectionMissing(id))?
1545                        .clone();
1546                    let conn = MaterializedViewSinkConnection {
1547                        value_desc: conn.value_desc,
1548                        storage_metadata: metadata,
1549                    };
1550                    ComputeSinkConnection::MaterializedView(conn)
1551                }
1552                ComputeSinkConnection::ContinualTask(conn) => {
1553                    let metadata = self
1554                        .storage_collections
1555                        .collection_metadata(id)
1556                        .map_err(|_| DataflowCreationError::CollectionMissing(id))?
1557                        .clone();
1558                    let conn = ContinualTaskConnection {
1559                        input_id: conn.input_id,
1560                        storage_metadata: metadata,
1561                    };
1562                    ComputeSinkConnection::ContinualTask(conn)
1563                }
1564                ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1565                ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1566                    ComputeSinkConnection::CopyToS3Oneshot(conn)
1567                }
1568            };
1569            let desc = ComputeSinkDesc {
1570                from: se.from,
1571                from_desc: se.from_desc,
1572                connection,
1573                with_snapshot: se.with_snapshot,
1574                up_to: se.up_to,
1575                non_null_assertions: se.non_null_assertions,
1576                refresh_schedule: se.refresh_schedule,
1577            };
1578            sink_exports.insert(id, desc);
1579        }
1580
1581        // Flatten the dataflow plans into the representation expected by replicas.
1582        let objects_to_build = dataflow
1583            .objects_to_build
1584            .into_iter()
1585            .map(|object| BuildDesc {
1586                id: object.id,
1587                plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1588            })
1589            .collect();
1590
1591        let augmented_dataflow = DataflowDescription {
1592            source_imports,
1593            sink_exports,
1594            objects_to_build,
1595            // The rest of the fields are identical
1596            index_imports: dataflow.index_imports,
1597            index_exports: dataflow.index_exports,
1598            as_of: dataflow.as_of.clone(),
1599            until: dataflow.until,
1600            initial_storage_as_of: dataflow.initial_storage_as_of,
1601            refresh_schedule: dataflow.refresh_schedule,
1602            debug_name: dataflow.debug_name,
1603            time_dependence: dataflow.time_dependence,
1604        };
1605
1606        if augmented_dataflow.is_transient() {
1607            tracing::debug!(
1608                name = %augmented_dataflow.debug_name,
1609                import_ids = %augmented_dataflow.display_import_ids(),
1610                export_ids = %augmented_dataflow.display_export_ids(),
1611                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1612                until = ?augmented_dataflow.until.elements(),
1613                "creating dataflow",
1614            );
1615        } else {
1616            tracing::info!(
1617                name = %augmented_dataflow.debug_name,
1618                import_ids = %augmented_dataflow.display_import_ids(),
1619                export_ids = %augmented_dataflow.display_export_ids(),
1620                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1621                until = ?augmented_dataflow.until.elements(),
1622                "creating dataflow",
1623            );
1624        }
1625
1626        // Skip the actual dataflow creation for an empty `as_of`. (Happens e.g. for the
1627        // bootstrapping of a REFRESH AT mat view that is past its last refresh.)
1628        if as_of.is_empty() {
1629            tracing::info!(
1630                name = %augmented_dataflow.debug_name,
1631                "not sending `CreateDataflow`, because of empty `as_of`",
1632            );
1633        } else {
1634            let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1635            let dataflow = Box::new(augmented_dataflow);
1636            self.send(ComputeCommand::CreateDataflow(dataflow));
1637
1638            for id in collections {
1639                self.maybe_schedule_collection(id);
1640            }
1641        }
1642
1643        Ok(())
1644    }
1645
1646    /// Schedule the identified collection if all its inputs are available.
1647    ///
1648    /// # Panics
1649    ///
1650    /// Panics if the identified collection does not exist.
1651    fn maybe_schedule_collection(&mut self, id: GlobalId) {
1652        let collection = self.expect_collection(id);
1653
1654        // Don't schedule collections twice.
1655        if collection.scheduled {
1656            return;
1657        }
1658
1659        let as_of = collection.read_frontier();
1660
1661        // If the collection has an empty `as_of`, it was either never installed on the replica or
1662        // has since been dropped. In either case the replica does not expect any commands for it.
1663        if as_of.is_empty() {
1664            return;
1665        }
1666
1667        let ready = if id.is_transient() {
1668            // Always schedule transient collections immediately. The assumption is that those are
1669            // created by interactive user commands and we want to schedule them as quickly as
1670            // possible. Inputs might not yet be available, but when they become available, we
1671            // don't need to wait for the controller to become aware and for the scheduling check
1672            // to run again.
1673            true
1674        } else {
1675            // Ignore self-dependencies. Any self-dependencies do not need to be
1676            // available at the as_of for the dataflow to make progress, so we
1677            // can ignore them here. At the moment, only continual tasks have
1678            // self-dependencies, but this logic is correct for any dataflow, so
1679            // we don't special case it to CTs.
1680            let not_self_dep = |x: &GlobalId| *x != id;
1681
1682            // Check dependency frontiers to determine if all inputs are
1683            // available. An input is available when its frontier is greater
1684            // than the `as_of`, i.e., all input data up to and including the
1685            // `as_of` has been sealed.
1686            let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1687            let compute_frontiers = compute_deps.map(|id| {
1688                let dep = &self.expect_collection(id);
1689                dep.write_frontier()
1690            });
1691
1692            let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1693            let storage_frontiers = self
1694                .storage_collections
1695                .collections_frontiers(storage_deps.collect())
1696                .expect("must exist");
1697            let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1698
1699            let ready = compute_frontiers
1700                .chain(storage_frontiers)
1701                .all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1702
1703            ready
1704        };
1705
1706        if ready {
1707            self.send(ComputeCommand::Schedule(id));
1708            let collection = self.expect_collection_mut(id);
1709            collection.scheduled = true;
1710        }
1711    }
1712
1713    /// Schedule any unscheduled collections that are ready.
1714    fn schedule_collections(&mut self) {
1715        let ids: Vec<_> = self.collections.keys().copied().collect();
1716        for id in ids {
1717            self.maybe_schedule_collection(id);
1718        }
1719    }
1720
1721    /// Drops the read capability for the given collections and allows their resources to be
1722    /// reclaimed.
1723    #[mz_ore::instrument(level = "debug")]
1724    pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1725        for id in &ids {
1726            let collection = self.collection_mut(*id)?;
1727
1728            // Mark the collection as dropped to allow it to be removed from the controller state.
1729            collection.dropped = true;
1730
1731            // Drop the implied and warmup read holds to announce that clients are not
1732            // interested in the collection anymore.
1733            collection.implied_read_hold.release();
1734            collection.warmup_read_hold.release();
1735
1736            // If the collection is a subscribe, stop tracking it. This ensures that the controller
1737            // ceases to produce `SubscribeResponse`s for this subscribe.
1738            self.subscribes.remove(id);
1739            // If the collection is a copy to, stop tracking it. This ensures that the controller
1740            // ceases to produce `CopyToResponse`s` for this copy to.
1741            self.copy_tos.remove(id);
1742        }
1743
1744        Ok(())
1745    }
1746
1747    /// Initiate a peek request for the contents of `id` at `timestamp`.
1748    ///
1749    /// If this returns an error, then it didn't modify any `Instance` state.
1750    #[mz_ore::instrument(level = "debug")]
1751    pub fn peek(
1752        &mut self,
1753        peek_target: PeekTarget,
1754        literal_constraints: Option<Vec<Row>>,
1755        uuid: Uuid,
1756        timestamp: T,
1757        result_desc: RelationDesc,
1758        finishing: RowSetFinishing,
1759        map_filter_project: mz_expr::SafeMfpPlan,
1760        mut read_hold: ReadHold<T>,
1761        target_replica: Option<ReplicaId>,
1762        peek_response_tx: oneshot::Sender<PeekResponse>,
1763    ) -> Result<(), PeekError> {
1764        use PeekError::*;
1765
1766        let target_id = peek_target.id();
1767
1768        // Downgrade the provided read hold to the peek time.
1769        if read_hold.id() != target_id {
1770            return Err(ReadHoldIdMismatch(read_hold.id()));
1771        }
1772        read_hold
1773            .try_downgrade(Antichain::from_elem(timestamp.clone()))
1774            .map_err(|_| ReadHoldInsufficient(target_id))?;
1775
1776        if let Some(target) = target_replica {
1777            if !self.replica_exists(target) {
1778                return Err(ReplicaMissing(target));
1779            }
1780        }
1781
1782        let otel_ctx = OpenTelemetryContext::obtain();
1783
1784        self.peeks.insert(
1785            uuid,
1786            PendingPeek {
1787                target_replica,
1788                // TODO(guswynn): can we just hold the `tracing::Span` here instead?
1789                otel_ctx: otel_ctx.clone(),
1790                requested_at: Instant::now(),
1791                read_hold,
1792                peek_response_tx,
1793                limit: finishing.limit.map(usize::cast_from),
1794                offset: finishing.offset,
1795            },
1796        );
1797
1798        let peek = Peek {
1799            literal_constraints,
1800            uuid,
1801            timestamp,
1802            finishing,
1803            map_filter_project,
1804            // Obtain an `OpenTelemetryContext` from the thread-local tracing
1805            // tree to forward it on to the compute worker.
1806            otel_ctx,
1807            target: peek_target,
1808            result_desc,
1809        };
1810        self.send(ComputeCommand::Peek(Box::new(peek)));
1811
1812        Ok(())
1813    }
1814
1815    /// Cancels an existing peek request.
1816    #[mz_ore::instrument(level = "debug")]
1817    pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1818        let Some(peek) = self.peeks.get_mut(&uuid) else {
1819            tracing::warn!("did not find pending peek for {uuid}");
1820            return;
1821        };
1822
1823        let duration = peek.requested_at.elapsed();
1824        self.metrics
1825            .observe_peek_response(&PeekResponse::Canceled, duration);
1826
1827        // Enqueue a notification for the cancellation.
1828        let otel_ctx = peek.otel_ctx.clone();
1829        otel_ctx.attach_as_parent();
1830
1831        self.deliver_response(ComputeControllerResponse::PeekNotification(
1832            uuid,
1833            PeekNotification::Canceled,
1834            otel_ctx,
1835        ));
1836
1837        // Finish the peek.
1838        // This will also propagate the cancellation to the replicas.
1839        self.finish_peek(uuid, reason);
1840    }
1841
1842    /// Assigns a read policy to specific identifiers.
1843    ///
1844    /// The policies are assigned in the order presented, and repeated identifiers should
1845    /// conclude with the last policy. Changing a policy will immediately downgrade the read
1846    /// capability if appropriate, but it will not "recover" the read capability if the prior
1847    /// capability is already ahead of it.
1848    ///
1849    /// Identifiers not present in `policies` retain their existing read policies.
1850    ///
1851    /// It is an error to attempt to set a read policy for a collection that is not readable in the
1852    /// context of compute. At this time, only indexes are readable compute collections.
1853    #[mz_ore::instrument(level = "debug")]
1854    pub fn set_read_policy(
1855        &mut self,
1856        policies: Vec<(GlobalId, ReadPolicy<T>)>,
1857    ) -> Result<(), ReadPolicyError> {
1858        // Do error checking upfront, to avoid introducing inconsistencies between a collection's
1859        // `implied_capability` and `read_capabilities`.
1860        for (id, _policy) in &policies {
1861            let collection = self.collection(*id)?;
1862            if collection.read_policy.is_none() {
1863                return Err(ReadPolicyError::WriteOnlyCollection(*id));
1864            }
1865        }
1866
1867        for (id, new_policy) in policies {
1868            let collection = self.expect_collection_mut(id);
1869            let new_since = new_policy.frontier(collection.write_frontier().borrow());
1870            let _ = collection.implied_read_hold.try_downgrade(new_since);
1871            collection.read_policy = Some(new_policy);
1872        }
1873
1874        Ok(())
1875    }
1876
1877    /// Advance the global write frontier of the given collection.
1878    ///
1879    /// Frontier regressions are gracefully ignored.
1880    ///
1881    /// # Panics
1882    ///
1883    /// Panics if the identified collection does not exist.
1884    #[mz_ore::instrument(level = "debug")]
1885    fn maybe_update_global_write_frontier(&mut self, id: GlobalId, new_frontier: Antichain<T>) {
1886        let collection = self.expect_collection_mut(id);
1887
1888        let advanced = collection.shared.lock_write_frontier(|f| {
1889            let advanced = PartialOrder::less_than(f, &new_frontier);
1890            if advanced {
1891                f.clone_from(&new_frontier);
1892            }
1893            advanced
1894        });
1895
1896        if !advanced {
1897            return;
1898        }
1899
1900        // Relax the implied read hold according to the read policy.
1901        let new_since = match &collection.read_policy {
1902            Some(read_policy) => {
1903                // For readable collections the read frontier is determined by applying the
1904                // client-provided read policy to the write frontier.
1905                read_policy.frontier(new_frontier.borrow())
1906            }
1907            None => {
1908                // Write-only collections cannot be read within the context of the compute
1909                // controller, so their read frontier only controls the read holds taken on their
1910                // inputs. We can safely downgrade the input read holds to any time less than the
1911                // write frontier.
1912                //
1913                // Note that some write-only collections (continual tasks) need to observe changes
1914                // at their current write frontier during hydration. Thus, we cannot downgrade the
1915                // read frontier to the write frontier and instead step it back by one.
1916                Antichain::from_iter(
1917                    new_frontier
1918                        .iter()
1919                        .map(|t| t.step_back().unwrap_or_else(T::minimum)),
1920                )
1921            }
1922        };
1923        let _ = collection.implied_read_hold.try_downgrade(new_since);
1924
1925        // Report the frontier advancement.
1926        self.deliver_response(ComputeControllerResponse::FrontierUpper {
1927            id,
1928            upper: new_frontier,
1929        });
1930    }
1931
1932    /// Apply a collection read hold change.
1933    fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch<T>) {
1934        let Some(collection) = self.collections.get_mut(&id) else {
1935            soft_panic_or_log!(
1936                "read hold change for absent collection (id={id}, changes={update:?})"
1937            );
1938            return;
1939        };
1940
1941        let new_since = collection.shared.lock_read_capabilities(|caps| {
1942            // Sanity check to prevent corrupted `read_capabilities`, which can cause hard-to-debug
1943            // issues (usually stuck read frontiers).
1944            let read_frontier = caps.frontier();
1945            for (time, diff) in update.iter() {
1946                let count = caps.count_for(time) + diff;
1947                assert!(
1948                    count >= 0,
1949                    "invalid read capabilities update: negative capability \
1950             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1951                );
1952                assert!(
1953                    count == 0 || read_frontier.less_equal(time),
1954                    "invalid read capabilities update: frontier regression \
1955             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1956                );
1957            }
1958
1959            // Apply read capability updates and learn about resulting changes to the read
1960            // frontier.
1961            let changes = caps.update_iter(update.drain());
1962
1963            let changed = changes.count() > 0;
1964            changed.then(|| caps.frontier().to_owned())
1965        });
1966
1967        let Some(new_since) = new_since else {
1968            return; // read frontier did not change
1969        };
1970
1971        // Propagate read frontier update to dependencies.
1972        for read_hold in collection.compute_dependencies.values_mut() {
1973            read_hold
1974                .try_downgrade(new_since.clone())
1975                .expect("frontiers don't regress");
1976        }
1977        for read_hold in collection.storage_dependencies.values_mut() {
1978            read_hold
1979                .try_downgrade(new_since.clone())
1980                .expect("frontiers don't regress");
1981        }
1982
1983        // Produce `AllowCompaction` command.
1984        self.send(ComputeCommand::AllowCompaction {
1985            id,
1986            frontier: new_since,
1987        });
1988    }
1989
1990    /// Fulfills a registered peek and cleans up associated state.
1991    ///
1992    /// As part of this we:
1993    ///  * Send a `PeekResponse` through the peek's response channel.
1994    ///  * Emit a `CancelPeek` command to instruct replicas to stop spending resources on this
1995    ///    peek, and to allow the `ComputeCommandHistory` to reduce away the corresponding `Peek`
1996    ///    command.
1997    ///  * Remove the read hold for this peek, unblocking compaction that might have waited on it.
1998    fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1999        let Some(peek) = self.peeks.remove(&uuid) else {
2000            return;
2001        };
2002
2003        // The recipient might not be interested in the peek response anymore, which is fine.
2004        let _ = peek.peek_response_tx.send(response);
2005
2006        // NOTE: We need to send the `CancelPeek` command _before_ we release the peek's read hold
2007        // (by dropping it), to avoid the edge case that caused database-issues#4812.
2008        self.send(ComputeCommand::CancelPeek { uuid });
2009
2010        drop(peek.read_hold);
2011    }
2012
2013    /// Handles a response from a replica. Replica IDs are re-used across replica restarts, so we
2014    /// use the replica epoch to drop stale responses.
2015    fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse<T>) {
2016        // Filter responses from non-existing or stale replicas.
2017        if self
2018            .replicas
2019            .get(&replica_id)
2020            .filter(|replica| replica.epoch == epoch)
2021            .is_none()
2022        {
2023            return;
2024        }
2025
2026        // Invariant: the replica exists and has the expected epoch.
2027
2028        match response {
2029            ComputeResponse::Frontiers(id, frontiers) => {
2030                self.handle_frontiers_response(id, frontiers, replica_id);
2031            }
2032            ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
2033                self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
2034            }
2035            ComputeResponse::CopyToResponse(id, response) => {
2036                self.handle_copy_to_response(id, response, replica_id);
2037            }
2038            ComputeResponse::SubscribeResponse(id, response) => {
2039                self.handle_subscribe_response(id, response, replica_id);
2040            }
2041            ComputeResponse::Status(response) => {
2042                self.handle_status_response(response, replica_id);
2043            }
2044        }
2045    }
2046
2047    /// Handle new frontiers, returning any compute response that needs to
2048    /// be sent to the client.
2049    fn handle_frontiers_response(
2050        &mut self,
2051        id: GlobalId,
2052        frontiers: FrontiersResponse<T>,
2053        replica_id: ReplicaId,
2054    ) {
2055        if !self.collections.contains_key(&id) {
2056            soft_panic_or_log!(
2057                "frontiers update for an unknown collection \
2058                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
2059            );
2060            return;
2061        }
2062        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2063            soft_panic_or_log!(
2064                "frontiers update for an unknown replica \
2065                 (replica_id={replica_id}, frontiers={frontiers:?})"
2066            );
2067            return;
2068        };
2069        let Some(replica_collection) = replica.collections.get_mut(&id) else {
2070            soft_panic_or_log!(
2071                "frontiers update for an unknown replica collection \
2072                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
2073            );
2074            return;
2075        };
2076
2077        if let Some(new_frontier) = frontiers.input_frontier {
2078            replica_collection.update_input_frontier(new_frontier.clone());
2079        }
2080        if let Some(new_frontier) = frontiers.output_frontier {
2081            replica_collection.update_output_frontier(new_frontier.clone());
2082        }
2083        if let Some(new_frontier) = frontiers.write_frontier {
2084            replica_collection.update_write_frontier(new_frontier.clone());
2085            self.maybe_update_global_write_frontier(id, new_frontier);
2086        }
2087    }
2088
2089    #[mz_ore::instrument(level = "debug")]
2090    fn handle_peek_response(
2091        &mut self,
2092        uuid: Uuid,
2093        response: PeekResponse,
2094        otel_ctx: OpenTelemetryContext,
2095        replica_id: ReplicaId,
2096    ) {
2097        otel_ctx.attach_as_parent();
2098
2099        // We might not be tracking this peek anymore, because we have served a response already or
2100        // because it was canceled. If this is the case, we ignore the response.
2101        let Some(peek) = self.peeks.get(&uuid) else {
2102            return;
2103        };
2104
2105        // If the peek is targeting a replica, ignore responses from other replicas.
2106        let target_replica = peek.target_replica.unwrap_or(replica_id);
2107        if target_replica != replica_id {
2108            return;
2109        }
2110
2111        let duration = peek.requested_at.elapsed();
2112        self.metrics.observe_peek_response(&response, duration);
2113
2114        let notification = PeekNotification::new(&response, peek.offset, peek.limit);
2115        // NOTE: We use the `otel_ctx` from the response, not the pending peek, because we
2116        // currently want the parent to be whatever the compute worker did with this peek.
2117        self.deliver_response(ComputeControllerResponse::PeekNotification(
2118            uuid,
2119            notification,
2120            otel_ctx,
2121        ));
2122
2123        self.finish_peek(uuid, response)
2124    }
2125
2126    fn handle_copy_to_response(
2127        &mut self,
2128        sink_id: GlobalId,
2129        response: CopyToResponse,
2130        replica_id: ReplicaId,
2131    ) {
2132        if !self.collections.contains_key(&sink_id) {
2133            soft_panic_or_log!(
2134                "received response for an unknown copy-to \
2135                 (sink_id={sink_id}, replica_id={replica_id})",
2136            );
2137            return;
2138        }
2139        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2140            soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
2141            return;
2142        };
2143        let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
2144            soft_panic_or_log!(
2145                "copy-to response for an unknown replica collection \
2146                 (sink_id={sink_id}, replica_id={replica_id})"
2147            );
2148            return;
2149        };
2150
2151        // Downgrade the replica frontiers, to enable dropping of input read holds and clean up of
2152        // collection state.
2153        // TODO(database-issues#4701): report copy-to frontiers through `Frontiers` responses
2154        replica_collection.update_write_frontier(Antichain::new());
2155        replica_collection.update_input_frontier(Antichain::new());
2156        replica_collection.update_output_frontier(Antichain::new());
2157
2158        // We might not be tracking this COPY TO because we have already returned a response
2159        // from one of the replicas. In that case, we ignore the response.
2160        if !self.copy_tos.remove(&sink_id) {
2161            return;
2162        }
2163
2164        let result = match response {
2165            CopyToResponse::RowCount(count) => Ok(count),
2166            CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
2167            // We should never get here: Replicas only drop copy to collections in response
2168            // to the controller allowing them to do so, and when the controller drops a
2169            // copy to it also removes it from the list of tracked copy_tos (see
2170            // [`Instance::drop_collections`]).
2171            CopyToResponse::Dropped => {
2172                tracing::error!(
2173                    %sink_id, %replica_id,
2174                    "received `Dropped` response for a tracked copy to",
2175                );
2176                return;
2177            }
2178        };
2179
2180        self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2181    }
2182
2183    fn handle_subscribe_response(
2184        &mut self,
2185        subscribe_id: GlobalId,
2186        response: SubscribeResponse<T>,
2187        replica_id: ReplicaId,
2188    ) {
2189        if !self.collections.contains_key(&subscribe_id) {
2190            soft_panic_or_log!(
2191                "received response for an unknown subscribe \
2192                 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2193            );
2194            return;
2195        }
2196        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2197            soft_panic_or_log!(
2198                "subscribe response for an unknown replica (replica_id={replica_id})"
2199            );
2200            return;
2201        };
2202        let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2203            soft_panic_or_log!(
2204                "subscribe response for an unknown replica collection \
2205                 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2206            );
2207            return;
2208        };
2209
2210        // Always apply replica write frontier updates. Even if the subscribe is not tracked
2211        // anymore, there might still be replicas reading from its inputs, so we need to track the
2212        // frontiers until all replicas have advanced to the empty one.
2213        let write_frontier = match &response {
2214            SubscribeResponse::Batch(batch) => batch.upper.clone(),
2215            SubscribeResponse::DroppedAt(_) => Antichain::new(),
2216        };
2217
2218        // For subscribes we downgrade all replica frontiers based on write frontiers. This should
2219        // be fine because the input and output frontier of a subscribe track its write frontier.
2220        // TODO(database-issues#4701): report subscribe frontiers through `Frontiers` responses
2221        replica_collection.update_write_frontier(write_frontier.clone());
2222        replica_collection.update_input_frontier(write_frontier.clone());
2223        replica_collection.update_output_frontier(write_frontier.clone());
2224
2225        // If the subscribe is not tracked, or targets a different replica, there is nothing to do.
2226        let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2227            return;
2228        };
2229        let replica_targeted = subscribe.target_replica.unwrap_or(replica_id) == replica_id;
2230        if !replica_targeted {
2231            return;
2232        }
2233
2234        // Apply a global frontier update.
2235        // If this is a replica-targeted subscribe, it is important that we advance the global
2236        // frontier only based on responses from the targeted replica. Otherwise, another replica
2237        // could advance to the empty frontier, making us drop the subscribe on the targeted
2238        // replica prematurely.
2239        self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2240
2241        match response {
2242            SubscribeResponse::Batch(batch) => {
2243                let upper = batch.upper;
2244                let mut updates = batch.updates;
2245
2246                // If this batch advances the subscribe's frontier, we emit all updates at times
2247                // greater or equal to the last frontier (to avoid emitting duplicate updates).
2248                if PartialOrder::less_than(&subscribe.frontier, &upper) {
2249                    let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2250
2251                    if upper.is_empty() {
2252                        // This subscribe cannot produce more data. Stop tracking it.
2253                        self.subscribes.remove(&subscribe_id);
2254                    } else {
2255                        // This subscribe can produce more data. Update our tracking of it.
2256                        self.subscribes.insert(subscribe_id, subscribe);
2257                    }
2258
2259                    if let Ok(updates) = updates.as_mut() {
2260                        updates.retain(|(time, _data, _diff)| lower.less_equal(time));
2261                    }
2262                    self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2263                        subscribe_id,
2264                        SubscribeBatch {
2265                            lower,
2266                            upper,
2267                            updates,
2268                        },
2269                    ));
2270                }
2271            }
2272            SubscribeResponse::DroppedAt(frontier) => {
2273                // We should never get here: Replicas only drop subscribe collections in response
2274                // to the controller allowing them to do so, and when the controller drops a
2275                // subscribe it also removes it from the list of tracked subscribes (see
2276                // [`Instance::drop_collections`]).
2277                tracing::error!(
2278                    %subscribe_id,
2279                    %replica_id,
2280                    frontier = ?frontier.elements(),
2281                    "received `DroppedAt` response for a tracked subscribe",
2282                );
2283                self.subscribes.remove(&subscribe_id);
2284            }
2285        }
2286    }
2287
2288    fn handle_status_response(&self, response: StatusResponse, _replica_id: ReplicaId) {
2289        match response {
2290            StatusResponse::Placeholder => {}
2291        }
2292    }
2293
2294    /// Return the write frontiers of the dependencies of the given collection.
2295    fn dependency_write_frontiers<'b>(
2296        &'b self,
2297        collection: &'b CollectionState<T>,
2298    ) -> impl Iterator<Item = Antichain<T>> + 'b {
2299        let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2300            let collection = self.collections.get(&dep_id);
2301            collection.map(|c| c.write_frontier())
2302        });
2303        let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2304            let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2305            frontiers.map(|f| f.write_frontier)
2306        });
2307
2308        compute_frontiers.chain(storage_frontiers)
2309    }
2310
2311    /// Return the write frontiers of transitive storage dependencies of the given collection.
2312    fn transitive_storage_dependency_write_frontiers<'b>(
2313        &'b self,
2314        collection: &'b CollectionState<T>,
2315    ) -> impl Iterator<Item = Antichain<T>> + 'b {
2316        let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2317        let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2318        let mut done = BTreeSet::new();
2319
2320        while let Some(id) = todo.pop() {
2321            if done.contains(&id) {
2322                continue;
2323            }
2324            if let Some(dep) = self.collections.get(&id) {
2325                storage_ids.extend(dep.storage_dependency_ids());
2326                todo.extend(dep.compute_dependency_ids())
2327            }
2328            done.insert(id);
2329        }
2330
2331        let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2332            let frontiers = self.storage_collections.collection_frontiers(id).ok();
2333            frontiers.map(|f| f.write_frontier)
2334        });
2335
2336        storage_frontiers
2337    }
2338
2339    /// Downgrade the warmup capabilities of collections as much as possible.
2340    ///
2341    /// The only requirement we have for a collection's warmup capability is that it is for a time
2342    /// that is available in all of the collection's inputs. For each input the latest time that is
2343    /// the case for is `write_frontier - 1`. So the farthest we can downgrade a collection's
2344    /// warmup capability is the minimum of `write_frontier - 1` of all its inputs.
2345    ///
2346    /// This method expects to be periodically called as part of instance maintenance work.
2347    /// We would like to instead update the warmup capabilities synchronously in response to
2348    /// frontier updates of dependency collections, but that is not generally possible because we
2349    /// don't learn about frontier updates of storage collections synchronously. We could do
2350    /// synchronous updates for compute dependencies, but we refrain from doing for simplicity.
2351    fn downgrade_warmup_capabilities(&mut self) {
2352        let mut new_capabilities = BTreeMap::new();
2353        for (id, collection) in &self.collections {
2354            // For write-only collections that have advanced to the empty frontier, we can drop the
2355            // warmup capability entirely. There is no reason why we would need to hydrate those
2356            // collections again, so being able to warm them up is not useful.
2357            if collection.read_policy.is_none()
2358                && collection.shared.lock_write_frontier(|f| f.is_empty())
2359            {
2360                new_capabilities.insert(*id, Antichain::new());
2361                continue;
2362            }
2363
2364            let mut new_capability = Antichain::new();
2365            for frontier in self.dependency_write_frontiers(collection) {
2366                for time in frontier {
2367                    new_capability.insert(time.step_back().unwrap_or(time));
2368                }
2369            }
2370
2371            new_capabilities.insert(*id, new_capability);
2372        }
2373
2374        for (id, new_capability) in new_capabilities {
2375            let collection = self.expect_collection_mut(id);
2376            let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2377        }
2378    }
2379
2380    /// Forward the implied capabilities of collections, if possible.
2381    ///
2382    /// The implied capability of a collection controls (a) which times are still readable (for
2383    /// indexes) and (b) with which as-of the collection gets installed on a new replica. We are
2384    /// usually not allowed to advance an implied capability beyond the frontier that follows from
2385    /// the collection's read policy applied to its write frontier:
2386    ///
2387    ///  * For sink collections, some external consumer might rely on seeing all distinct times in
2388    ///    the input reflected in the output. If we'd forward the implied capability of a sink,
2389    ///    we'd risk skipping times in the output across replica restarts.
2390    ///  * For index collections, we might make the index unreadable by advancing its read frontier
2391    ///    beyond its write frontier.
2392    ///
2393    /// There is one case where forwarding an implied capability is fine though: an index installed
2394    /// on a cluster that has no replicas. Such indexes are not readable anyway until a new replica
2395    /// is added, so advancing its read frontier can't make it unreadable. We can thus advance the
2396    /// implied capability as long as we make sure that when a new replica is added, the expected
2397    /// relationship between write frontier, read policy, and implied capability can be restored
2398    /// immediately (modulo computation time).
2399    ///
2400    /// Forwarding implied capabilities is not necessary for the correct functioning of the
2401    /// controller but an optimization that is beneficial in two ways:
2402    ///
2403    ///  * It relaxes read holds on inputs to forwarded collections, allowing their compaction.
2404    ///  * It reduces the amount of historical detail new replicas need to process when computing
2405    ///    forwarded collections, as forwarding the implied capability also forwards the corresponding
2406    ///    dataflow as-of.
2407    fn forward_implied_capabilities(&mut self) {
2408        if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2409            return;
2410        }
2411        if !self.replicas.is_empty() {
2412            return;
2413        }
2414
2415        let mut new_capabilities = BTreeMap::new();
2416        for (id, collection) in &self.collections {
2417            let Some(read_policy) = &collection.read_policy else {
2418                // Collection is write-only, i.e. a sink.
2419                continue;
2420            };
2421
2422            // When a new replica is started, it will immediately be able to compute all collection
2423            // output up to the write frontier of its transitive storage inputs. So the new implied
2424            // read capability should be the read policy applied to that frontier.
2425            let mut dep_frontier = Antichain::new();
2426            for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2427                dep_frontier.extend(frontier);
2428            }
2429
2430            let new_capability = read_policy.frontier(dep_frontier.borrow());
2431            if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2432                new_capabilities.insert(*id, new_capability);
2433            }
2434        }
2435
2436        for (id, new_capability) in new_capabilities {
2437            let collection = self.expect_collection_mut(id);
2438            let _ = collection.implied_read_hold.try_downgrade(new_capability);
2439        }
2440    }
2441
2442    /// Acquires a `ReadHold` for the identified compute collection.
2443    ///
2444    /// This mirrors the logic used by the controller-side `InstanceState::acquire_read_hold`,
2445    /// but executes on the instance task itself.
2446    fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
2447        // Similarly to InstanceState::acquire_read_hold and StorageCollections::acquire_read_holds,
2448        // we acquire read holds at the earliest possible time rather than returning a copy
2449        // of the implied read hold. This is so that dependents can acquire read holds on
2450        // compute dependencies at frontiers that are held back by other read holds the caller
2451        // has previously taken.
2452        let collection = self.collection(id)?;
2453        let since = collection.shared.lock_read_capabilities(|caps| {
2454            let since = caps.frontier().to_owned();
2455            caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
2456            since
2457        });
2458        let hold = ReadHold::new(id, since, Arc::clone(&self.read_hold_tx));
2459        Ok(hold)
2460    }
2461
2462    /// Process pending maintenance work.
2463    ///
2464    /// This method is invoked periodically by the global controller.
2465    /// It is a good place to perform maintenance work that arises from various controller state
2466    /// changes and that cannot conveniently be handled synchronously with those state changes.
2467    #[mz_ore::instrument(level = "debug")]
2468    pub fn maintain(&mut self) {
2469        self.rehydrate_failed_replicas();
2470        self.downgrade_warmup_capabilities();
2471        self.forward_implied_capabilities();
2472        self.schedule_collections();
2473        self.cleanup_collections();
2474        self.update_frontier_introspection();
2475        self.refresh_state_metrics();
2476        self.refresh_wallclock_lag();
2477    }
2478}
2479
2480/// State maintained about individual compute collections.
2481///
2482/// A compute collection is either an index, or a storage sink, or a subscribe, exported by a
2483/// compute dataflow.
2484#[derive(Debug)]
2485struct CollectionState<T: ComputeControllerTimestamp> {
2486    /// Whether this collection is a log collection.
2487    ///
2488    /// Log collections are special in that they are only maintained by a subset of all replicas.
2489    log_collection: bool,
2490    /// Whether this collection has been dropped by a controller client.
2491    ///
2492    /// The controller is allowed to remove the `CollectionState` for a collection only when
2493    /// `dropped == true`. Otherwise, clients might still expect to be able to query information
2494    /// about this collection.
2495    dropped: bool,
2496    /// Whether this collection has been scheduled, i.e., the controller has sent a `Schedule`
2497    /// command for it.
2498    scheduled: bool,
2499
2500    /// Whether this collection is in read-only mode.
2501    ///
2502    /// When in read-only mode, the dataflow is not allowed to affect external state (largely persist).
2503    read_only: bool,
2504
2505    /// State shared with the `ComputeController`.
2506    shared: SharedCollectionState<T>,
2507
2508    /// A read hold maintaining the implicit capability of the collection.
2509    ///
2510    /// This capability is kept to ensure that the collection remains readable according to its
2511    /// `read_policy`. It also ensures that read holds on the collection's dependencies are kept at
2512    /// some time not greater than the collection's `write_frontier`, guaranteeing that the
2513    /// collection's next outputs can always be computed without skipping times.
2514    implied_read_hold: ReadHold<T>,
2515    /// A read hold held to enable dataflow warmup.
2516    ///
2517    /// Dataflow warmup is an optimization that allows dataflows to immediately start hydrating
2518    /// even when their next output time (as implied by the `write_frontier`) is in the future.
2519    /// By installing a read capability derived from the write frontiers of the collection's
2520    /// inputs, we ensure that the as-of of new dataflows installed for the collection is at a time
2521    /// that is immediately available, so hydration can begin immediately too.
2522    warmup_read_hold: ReadHold<T>,
2523    /// The policy to use to downgrade `self.implied_read_hold`.
2524    ///
2525    /// If `None`, the collection is a write-only collection (i.e. a sink). For write-only
2526    /// collections, the `implied_read_hold` is only required for maintaining read holds on the
2527    /// inputs, so we can immediately downgrade it to the `write_frontier`.
2528    read_policy: Option<ReadPolicy<T>>,
2529
2530    /// Storage identifiers on which this collection depends, and read holds this collection
2531    /// requires on them.
2532    storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2533    /// Compute identifiers on which this collection depends, and read holds this collection
2534    /// requires on them.
2535    compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2536
2537    /// Introspection state associated with this collection.
2538    introspection: CollectionIntrospection<T>,
2539
2540    /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
2541    /// introspection update.
2542    ///
2543    /// Keys are `(period, lag, labels)` triples, values are counts.
2544    ///
2545    /// If this is `None`, wallclock lag is not tracked for this collection.
2546    wallclock_lag_histogram_stash: Option<
2547        BTreeMap<
2548            (
2549                WallclockLagHistogramPeriod,
2550                WallclockLag,
2551                BTreeMap<&'static str, String>,
2552            ),
2553            Diff,
2554        >,
2555    >,
2556}
2557
2558impl<T: ComputeControllerTimestamp> CollectionState<T> {
2559    /// Creates a new collection state, with an initial read policy valid from `since`.
2560    fn new(
2561        collection_id: GlobalId,
2562        as_of: Antichain<T>,
2563        shared: SharedCollectionState<T>,
2564        storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2565        compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2566        read_hold_tx: read_holds::ChangeTx<T>,
2567        introspection: CollectionIntrospection<T>,
2568    ) -> Self {
2569        // A collection is not readable before the `as_of`.
2570        let since = as_of.clone();
2571        // A collection won't produce updates for times before the `as_of`.
2572        let upper = as_of;
2573
2574        // Ensure that the provided `shared` is valid for the given `as_of`.
2575        assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2576        assert!(shared.lock_write_frontier(|f| f == &upper));
2577
2578        // Initialize collection read holds.
2579        // Note that the implied read hold was already added to the `read_capabilities` when
2580        // `shared` was created, so we only need to add the warmup read hold here.
2581        let implied_read_hold =
2582            ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2583        let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2584
2585        let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2586        shared.lock_read_capabilities(|c| {
2587            c.update_iter(updates);
2588        });
2589
2590        // In an effort to keep the produced wallclock lag introspection data small and
2591        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
2592        // select indexes and subscribes.
2593        let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2594            true => None,
2595            false => Some(Default::default()),
2596        };
2597
2598        Self {
2599            log_collection: false,
2600            dropped: false,
2601            scheduled: false,
2602            read_only: true,
2603            shared,
2604            implied_read_hold,
2605            warmup_read_hold,
2606            read_policy: Some(ReadPolicy::ValidFrom(since)),
2607            storage_dependencies,
2608            compute_dependencies,
2609            introspection,
2610            wallclock_lag_histogram_stash,
2611        }
2612    }
2613
2614    /// Creates a new collection state for a log collection.
2615    fn new_log_collection(
2616        id: GlobalId,
2617        shared: SharedCollectionState<T>,
2618        read_hold_tx: read_holds::ChangeTx<T>,
2619        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2620    ) -> Self {
2621        let since = Antichain::from_elem(T::minimum());
2622        let introspection =
2623            CollectionIntrospection::new(id, introspection_tx, since.clone(), false, None, None);
2624        let mut state = Self::new(
2625            id,
2626            since,
2627            shared,
2628            Default::default(),
2629            Default::default(),
2630            read_hold_tx,
2631            introspection,
2632        );
2633        state.log_collection = true;
2634        // Log collections are created and scheduled implicitly as part of replica initialization.
2635        state.scheduled = true;
2636        state
2637    }
2638
2639    /// Reports the current read frontier.
2640    fn read_frontier(&self) -> Antichain<T> {
2641        self.shared
2642            .lock_read_capabilities(|c| c.frontier().to_owned())
2643    }
2644
2645    /// Reports the current write frontier.
2646    fn write_frontier(&self) -> Antichain<T> {
2647        self.shared.lock_write_frontier(|f| f.clone())
2648    }
2649
2650    fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2651        self.storage_dependencies.keys().copied()
2652    }
2653
2654    fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2655        self.compute_dependencies.keys().copied()
2656    }
2657
2658    /// Reports the IDs of the dependencies of this collection.
2659    fn dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2660        self.compute_dependency_ids()
2661            .chain(self.storage_dependency_ids())
2662    }
2663}
2664
2665/// Collection state shared with the `ComputeController`.
2666///
2667/// Having this allows certain controller APIs, such as `ComputeController::collection_frontiers`
2668/// and `ComputeController::acquire_read_hold` to be non-`async`. This comes at the cost of
2669/// complexity (by introducing shared mutable state) and performance (by introducing locking). We
2670/// should aim to reduce the amount of shared state over time, rather than expand it.
2671///
2672/// Note that [`SharedCollectionState`]s are initialized by the `ComputeController` prior to the
2673/// collection's creation in the [`Instance`]. This is to allow compute clients to query frontiers
2674/// and take new read holds immediately, without having to wait for the [`Instance`] to update.
2675#[derive(Clone, Debug)]
2676pub(super) struct SharedCollectionState<T> {
2677    /// Accumulation of read capabilities for the collection.
2678    ///
2679    /// This accumulation contains the capabilities held by all [`ReadHold`]s given out for the
2680    /// collection, including `implied_read_hold` and `warmup_read_hold`.
2681    ///
2682    /// NOTE: This field may only be modified by [`Instance::apply_read_hold_change`],
2683    /// [`Instance::acquire_read_hold`], and `ComputeController::acquire_read_hold`.
2684    /// Nobody else should modify read capabilities directly. Instead, collection users should
2685    /// manage read holds through [`ReadHold`] objects acquired through
2686    /// `ComputeController::acquire_read_hold`.
2687    ///
2688    /// TODO(teskje): Restructure the code to enforce the above in the type system.
2689    read_capabilities: Arc<Mutex<MutableAntichain<T>>>,
2690    /// The write frontier of this collection.
2691    write_frontier: Arc<Mutex<Antichain<T>>>,
2692}
2693
2694impl<T: Timestamp> SharedCollectionState<T> {
2695    pub fn new(as_of: Antichain<T>) -> Self {
2696        // A collection is not readable before the `as_of`.
2697        let since = as_of.clone();
2698        // A collection won't produce updates for times before the `as_of`.
2699        let upper = as_of;
2700
2701        // Initialize read capabilities to the `since`.
2702        // The is the implied read capability. The corresponding [`ReadHold`] is created in
2703        // [`CollectionState::new`].
2704        let mut read_capabilities = MutableAntichain::new();
2705        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2706
2707        Self {
2708            read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2709            write_frontier: Arc::new(Mutex::new(upper)),
2710        }
2711    }
2712
2713    pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2714    where
2715        F: FnOnce(&mut MutableAntichain<T>) -> R,
2716    {
2717        let mut caps = self.read_capabilities.lock().expect("poisoned");
2718        f(&mut *caps)
2719    }
2720
2721    pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2722    where
2723        F: FnOnce(&mut Antichain<T>) -> R,
2724    {
2725        let mut frontier = self.write_frontier.lock().expect("poisoned");
2726        f(&mut *frontier)
2727    }
2728}
2729
2730/// Manages certain introspection relations associated with a collection. Upon creation, it adds
2731/// rows to introspection relations. When dropped, it retracts its managed rows.
2732///
2733/// TODO: `ComputeDependencies` could be moved under this.
2734#[derive(Debug)]
2735struct CollectionIntrospection<T: ComputeControllerTimestamp> {
2736    /// The ID of the compute collection.
2737    collection_id: GlobalId,
2738    /// A channel through which introspection updates are delivered.
2739    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2740    /// Introspection state for `IntrospectionType::Frontiers`.
2741    ///
2742    /// `Some` if the collection does _not_ sink into a storage collection (i.e. is not an MV). If
2743    /// the collection sinks into storage, the storage controller reports its frontiers instead.
2744    frontiers: Option<FrontiersIntrospectionState<T>>,
2745    /// Introspection state for `IntrospectionType::ComputeMaterializedViewRefreshes`.
2746    ///
2747    /// `Some` if the collection is a REFRESH MV.
2748    refresh: Option<RefreshIntrospectionState<T>>,
2749}
2750
2751impl<T: ComputeControllerTimestamp> CollectionIntrospection<T> {
2752    fn new(
2753        collection_id: GlobalId,
2754        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2755        as_of: Antichain<T>,
2756        storage_sink: bool,
2757        initial_as_of: Option<Antichain<T>>,
2758        refresh_schedule: Option<RefreshSchedule>,
2759    ) -> Self {
2760        let refresh =
2761            match (refresh_schedule, initial_as_of) {
2762                (Some(refresh_schedule), Some(initial_as_of)) => Some(
2763                    RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2764                ),
2765                (refresh_schedule, _) => {
2766                    // If we have a `refresh_schedule`, then the collection is a MV, so we should also have
2767                    // an `initial_as_of`.
2768                    soft_assert_or_log!(
2769                        refresh_schedule.is_none(),
2770                        "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2771                    );
2772                    None
2773                }
2774            };
2775        let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2776
2777        let self_ = Self {
2778            collection_id,
2779            introspection_tx,
2780            frontiers,
2781            refresh,
2782        };
2783
2784        self_.report_initial_state();
2785        self_
2786    }
2787
2788    /// Reports the initial introspection state.
2789    fn report_initial_state(&self) {
2790        if let Some(frontiers) = &self.frontiers {
2791            let row = frontiers.row_for_collection(self.collection_id);
2792            let updates = vec![(row, Diff::ONE)];
2793            self.send(IntrospectionType::Frontiers, updates);
2794        }
2795
2796        if let Some(refresh) = &self.refresh {
2797            let row = refresh.row_for_collection(self.collection_id);
2798            let updates = vec![(row, Diff::ONE)];
2799            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2800        }
2801    }
2802
2803    /// Observe the given current collection frontiers and update the introspection state as
2804    /// necessary.
2805    fn observe_frontiers(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2806        self.update_frontier_introspection(read_frontier, write_frontier);
2807        self.update_refresh_introspection(write_frontier);
2808    }
2809
2810    fn update_frontier_introspection(
2811        &mut self,
2812        read_frontier: &Antichain<T>,
2813        write_frontier: &Antichain<T>,
2814    ) {
2815        let Some(frontiers) = &mut self.frontiers else {
2816            return;
2817        };
2818
2819        if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2820        {
2821            return; // no change
2822        };
2823
2824        let retraction = frontiers.row_for_collection(self.collection_id);
2825        frontiers.update(read_frontier, write_frontier);
2826        let insertion = frontiers.row_for_collection(self.collection_id);
2827        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2828        self.send(IntrospectionType::Frontiers, updates);
2829    }
2830
2831    fn update_refresh_introspection(&mut self, write_frontier: &Antichain<T>) {
2832        let Some(refresh) = &mut self.refresh else {
2833            return;
2834        };
2835
2836        let retraction = refresh.row_for_collection(self.collection_id);
2837        refresh.frontier_update(write_frontier);
2838        let insertion = refresh.row_for_collection(self.collection_id);
2839
2840        if retraction == insertion {
2841            return; // no change
2842        }
2843
2844        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2845        self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2846    }
2847
2848    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2849        // Failure to send means the `ComputeController` has been dropped and doesn't care about
2850        // introspection updates anymore.
2851        let _ = self.introspection_tx.send((introspection_type, updates));
2852    }
2853}
2854
2855impl<T: ComputeControllerTimestamp> Drop for CollectionIntrospection<T> {
2856    fn drop(&mut self) {
2857        // Retract collection frontiers.
2858        if let Some(frontiers) = &self.frontiers {
2859            let row = frontiers.row_for_collection(self.collection_id);
2860            let updates = vec![(row, Diff::MINUS_ONE)];
2861            self.send(IntrospectionType::Frontiers, updates);
2862        }
2863
2864        // Retract MV refresh state.
2865        if let Some(refresh) = &self.refresh {
2866            let retraction = refresh.row_for_collection(self.collection_id);
2867            let updates = vec![(retraction, Diff::MINUS_ONE)];
2868            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2869        }
2870    }
2871}
2872
2873#[derive(Debug)]
2874struct FrontiersIntrospectionState<T> {
2875    read_frontier: Antichain<T>,
2876    write_frontier: Antichain<T>,
2877}
2878
2879impl<T: ComputeControllerTimestamp> FrontiersIntrospectionState<T> {
2880    fn new(as_of: Antichain<T>) -> Self {
2881        Self {
2882            read_frontier: as_of.clone(),
2883            write_frontier: as_of,
2884        }
2885    }
2886
2887    /// Return a `Row` reflecting the current collection frontiers.
2888    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2889        let read_frontier = self
2890            .read_frontier
2891            .as_option()
2892            .map_or(Datum::Null, |ts| ts.clone().into());
2893        let write_frontier = self
2894            .write_frontier
2895            .as_option()
2896            .map_or(Datum::Null, |ts| ts.clone().into());
2897        Row::pack_slice(&[
2898            Datum::String(&collection_id.to_string()),
2899            read_frontier,
2900            write_frontier,
2901        ])
2902    }
2903
2904    /// Update the introspection state with the given new frontiers.
2905    fn update(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2906        if read_frontier != &self.read_frontier {
2907            self.read_frontier.clone_from(read_frontier);
2908        }
2909        if write_frontier != &self.write_frontier {
2910            self.write_frontier.clone_from(write_frontier);
2911        }
2912    }
2913}
2914
2915/// Information needed to compute introspection updates for a REFRESH materialized view when the
2916/// write frontier advances.
2917#[derive(Debug)]
2918struct RefreshIntrospectionState<T> {
2919    // Immutable properties of the MV
2920    refresh_schedule: RefreshSchedule,
2921    initial_as_of: Antichain<T>,
2922    // Refresh state
2923    next_refresh: Datum<'static>,           // Null or an MzTimestamp
2924    last_completed_refresh: Datum<'static>, // Null or an MzTimestamp
2925}
2926
2927impl<T> RefreshIntrospectionState<T> {
2928    /// Return a `Row` reflecting the current refresh introspection state.
2929    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2930        Row::pack_slice(&[
2931            Datum::String(&collection_id.to_string()),
2932            self.last_completed_refresh,
2933            self.next_refresh,
2934        ])
2935    }
2936}
2937
2938impl<T: ComputeControllerTimestamp> RefreshIntrospectionState<T> {
2939    /// Construct a new [`RefreshIntrospectionState`], and apply an initial `frontier_update()` at
2940    /// the `upper`.
2941    fn new(
2942        refresh_schedule: RefreshSchedule,
2943        initial_as_of: Antichain<T>,
2944        upper: &Antichain<T>,
2945    ) -> Self {
2946        let mut self_ = Self {
2947            refresh_schedule: refresh_schedule.clone(),
2948            initial_as_of: initial_as_of.clone(),
2949            next_refresh: Datum::Null,
2950            last_completed_refresh: Datum::Null,
2951        };
2952        self_.frontier_update(upper);
2953        self_
2954    }
2955
2956    /// Should be called whenever the write frontier of the collection advances. It updates the
2957    /// state that should be recorded in introspection relations, but doesn't send the updates yet.
2958    fn frontier_update(&mut self, write_frontier: &Antichain<T>) {
2959        if write_frontier.is_empty() {
2960            self.last_completed_refresh =
2961                if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2962                    last_refresh.into()
2963                } else {
2964                    // If there is no last refresh, then we have a `REFRESH EVERY`, in which case
2965                    // the saturating roundup puts a refresh at the maximum possible timestamp.
2966                    T::maximum().into()
2967                };
2968            self.next_refresh = Datum::Null;
2969        } else {
2970            if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2971                // We are before the first refresh.
2972                self.last_completed_refresh = Datum::Null;
2973                let initial_as_of = self.initial_as_of.as_option().expect(
2974                    "initial_as_of can't be [], because then there would be no refreshes at all",
2975                );
2976                let first_refresh = initial_as_of
2977                    .round_up(&self.refresh_schedule)
2978                    .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2979                soft_assert_or_log!(
2980                    first_refresh == *initial_as_of,
2981                    "initial_as_of should be set to the first refresh"
2982                );
2983                self.next_refresh = first_refresh.into();
2984            } else {
2985                // The first refresh has already happened.
2986                let write_frontier = write_frontier.as_option().expect("checked above");
2987                self.last_completed_refresh = write_frontier
2988                    .round_down_minus_1(&self.refresh_schedule)
2989                    .map_or_else(
2990                        || {
2991                            soft_panic_or_log!(
2992                                "rounding down should have returned the first refresh or later"
2993                            );
2994                            Datum::Null
2995                        },
2996                        |last_completed_refresh| last_completed_refresh.into(),
2997                    );
2998                self.next_refresh = write_frontier.clone().into();
2999            }
3000        }
3001    }
3002}
3003
3004/// A note of an outstanding peek response.
3005#[derive(Debug)]
3006struct PendingPeek<T: Timestamp> {
3007    /// For replica-targeted peeks, this specifies the replica whose response we should pass on.
3008    ///
3009    /// If this value is `None`, we pass on the first response.
3010    target_replica: Option<ReplicaId>,
3011    /// The OpenTelemetry context for this peek.
3012    otel_ctx: OpenTelemetryContext,
3013    /// The time at which the peek was requested.
3014    ///
3015    /// Used to track peek durations.
3016    requested_at: Instant,
3017    /// The read hold installed to serve this peek.
3018    read_hold: ReadHold<T>,
3019    /// The channel to send peek results.
3020    peek_response_tx: oneshot::Sender<PeekResponse>,
3021    /// An optional limit of the peek's result size.
3022    limit: Option<usize>,
3023    /// The offset into the peek's result.
3024    offset: usize,
3025}
3026
3027#[derive(Debug, Clone)]
3028struct ActiveSubscribe<T> {
3029    /// Current upper frontier of this subscribe.
3030    frontier: Antichain<T>,
3031    /// For replica-targeted subscribes, this specifies the replica whose responses we should pass on.
3032    ///
3033    /// If this value is `None`, we pass on the first response for each time slice.
3034    target_replica: Option<ReplicaId>,
3035}
3036
3037impl<T: ComputeControllerTimestamp> ActiveSubscribe<T> {
3038    fn new(target_replica: Option<ReplicaId>) -> Self {
3039        Self {
3040            frontier: Antichain::from_elem(T::minimum()),
3041            target_replica,
3042        }
3043    }
3044}
3045
3046/// State maintained about individual replicas.
3047#[derive(Debug)]
3048struct ReplicaState<T: ComputeControllerTimestamp> {
3049    /// The ID of the replica.
3050    id: ReplicaId,
3051    /// Client for the running replica task.
3052    client: ReplicaClient<T>,
3053    /// The replica configuration.
3054    config: ReplicaConfig,
3055    /// Replica metrics.
3056    metrics: ReplicaMetrics,
3057    /// A channel through which introspection updates are delivered.
3058    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3059    /// Per-replica collection state.
3060    collections: BTreeMap<GlobalId, ReplicaCollectionState<T>>,
3061    /// The epoch of the replica.
3062    epoch: u64,
3063}
3064
3065impl<T: ComputeControllerTimestamp> ReplicaState<T> {
3066    fn new(
3067        id: ReplicaId,
3068        client: ReplicaClient<T>,
3069        config: ReplicaConfig,
3070        metrics: ReplicaMetrics,
3071        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3072        epoch: u64,
3073    ) -> Self {
3074        Self {
3075            id,
3076            client,
3077            config,
3078            metrics,
3079            introspection_tx,
3080            epoch,
3081            collections: Default::default(),
3082        }
3083    }
3084
3085    /// Add a collection to the replica state.
3086    ///
3087    /// # Panics
3088    ///
3089    /// Panics if a collection with the same ID exists already.
3090    fn add_collection(
3091        &mut self,
3092        id: GlobalId,
3093        as_of: Antichain<T>,
3094        input_read_holds: Vec<ReadHold<T>>,
3095    ) {
3096        let metrics = self.metrics.for_collection(id);
3097        let introspection = ReplicaCollectionIntrospection::new(
3098            self.id,
3099            id,
3100            self.introspection_tx.clone(),
3101            as_of.clone(),
3102        );
3103        let mut state =
3104            ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
3105
3106        // In an effort to keep the produced wallclock lag introspection data small and
3107        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
3108        // select indexes and subscribes.
3109        if id.is_transient() {
3110            state.wallclock_lag_max = None;
3111        }
3112
3113        if let Some(previous) = self.collections.insert(id, state) {
3114            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
3115        }
3116    }
3117
3118    /// Remove state for a collection.
3119    fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState<T>> {
3120        self.collections.remove(&id)
3121    }
3122
3123    /// Returns whether all replica frontiers of the given collection are empty.
3124    fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
3125        self.collections.get(&id).map_or(true, |c| {
3126            c.write_frontier.is_empty()
3127                && c.input_frontier.is_empty()
3128                && c.output_frontier.is_empty()
3129        })
3130    }
3131
3132    /// Returns the state of the [`ReplicaState`] formatted as JSON.
3133    ///
3134    /// The returned value is not guaranteed to be stable and may change at any point in time.
3135    #[mz_ore::instrument(level = "debug")]
3136    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3137        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
3138        // returned object as a tradeoff between usability and stability. `serde_json` will fail
3139        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
3140        // prevents a future unrelated change from silently breaking this method.
3141
3142        // Destructure `self` here so we don't forget to consider dumping newly added fields.
3143        let Self {
3144            id,
3145            client: _,
3146            config: _,
3147            metrics: _,
3148            introspection_tx: _,
3149            epoch,
3150            collections,
3151        } = self;
3152
3153        let collections: BTreeMap<_, _> = collections
3154            .iter()
3155            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
3156            .collect();
3157
3158        Ok(serde_json::json!({
3159            "id": id.to_string(),
3160            "collections": collections,
3161            "epoch": epoch,
3162        }))
3163    }
3164}
3165
3166#[derive(Debug)]
3167struct ReplicaCollectionState<T: ComputeControllerTimestamp> {
3168    /// The replica write frontier of this collection.
3169    ///
3170    /// See [`FrontiersResponse::write_frontier`].
3171    write_frontier: Antichain<T>,
3172    /// The replica input frontier of this collection.
3173    ///
3174    /// See [`FrontiersResponse::input_frontier`].
3175    input_frontier: Antichain<T>,
3176    /// The replica output frontier of this collection.
3177    ///
3178    /// See [`FrontiersResponse::output_frontier`].
3179    output_frontier: Antichain<T>,
3180
3181    /// Metrics tracked for this collection.
3182    ///
3183    /// If this is `None`, no metrics are collected.
3184    metrics: Option<ReplicaCollectionMetrics>,
3185    /// As-of frontier with which this collection was installed on the replica.
3186    as_of: Antichain<T>,
3187    /// Tracks introspection state for this collection.
3188    introspection: ReplicaCollectionIntrospection<T>,
3189    /// Read holds on storage inputs to this collection.
3190    ///
3191    /// These read holds are kept to ensure that the replica is able to read from storage inputs at
3192    /// all times it hasn't read yet. We only need to install read holds for storage inputs since
3193    /// compaction of compute inputs is implicitly held back by Timely/DD.
3194    input_read_holds: Vec<ReadHold<T>>,
3195
3196    /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
3197    ///
3198    /// If this is `None`, wallclock lag is not tracked for this collection.
3199    wallclock_lag_max: Option<WallclockLag>,
3200}
3201
3202impl<T: ComputeControllerTimestamp> ReplicaCollectionState<T> {
3203    fn new(
3204        metrics: Option<ReplicaCollectionMetrics>,
3205        as_of: Antichain<T>,
3206        introspection: ReplicaCollectionIntrospection<T>,
3207        input_read_holds: Vec<ReadHold<T>>,
3208    ) -> Self {
3209        Self {
3210            write_frontier: as_of.clone(),
3211            input_frontier: as_of.clone(),
3212            output_frontier: as_of.clone(),
3213            metrics,
3214            as_of,
3215            introspection,
3216            input_read_holds,
3217            wallclock_lag_max: Some(WallclockLag::MIN),
3218        }
3219    }
3220
3221    /// Returns whether this collection is hydrated.
3222    fn hydrated(&self) -> bool {
3223        // If the observed frontier is greater than the collection's as-of, the collection has
3224        // produced some output and is therefore hydrated.
3225        //
3226        // We need to consider the edge case where the as-of is the empty frontier. Such an as-of
3227        // is not useful for indexes, because they wouldn't be readable. For write-only
3228        // collections, an empty as-of means that the collection has been fully written and no new
3229        // dataflow needs to be created for it. Consequently, no hydration will happen either.
3230        //
3231        // Based on this, we could respond in two ways:
3232        //  * `false`, as in "the dataflow was never created"
3233        //  * `true`, as in "the dataflow completed immediately"
3234        //
3235        // Since hydration is often used as a measure of dataflow progress and we don't want to
3236        // give the impression that certain dataflows are somehow stuck when they are not, we go
3237        // with the second interpretation here.
3238        self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3239    }
3240
3241    /// Updates the replica write frontier of this collection.
3242    fn update_write_frontier(&mut self, new_frontier: Antichain<T>) {
3243        if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3244            soft_panic_or_log!(
3245                "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3246                self.write_frontier,
3247            );
3248            return;
3249        } else if new_frontier == self.write_frontier {
3250            return;
3251        }
3252
3253        self.write_frontier = new_frontier;
3254    }
3255
3256    /// Updates the replica input frontier of this collection.
3257    fn update_input_frontier(&mut self, new_frontier: Antichain<T>) {
3258        if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3259            soft_panic_or_log!(
3260                "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3261                self.input_frontier,
3262            );
3263            return;
3264        } else if new_frontier == self.input_frontier {
3265            return;
3266        }
3267
3268        self.input_frontier = new_frontier;
3269
3270        // Relax our read holds on collection inputs.
3271        for read_hold in &mut self.input_read_holds {
3272            let result = read_hold.try_downgrade(self.input_frontier.clone());
3273            soft_assert_or_log!(
3274                result.is_ok(),
3275                "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3276                self.input_frontier,
3277            );
3278        }
3279    }
3280
3281    /// Updates the replica output frontier of this collection.
3282    fn update_output_frontier(&mut self, new_frontier: Antichain<T>) {
3283        if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3284            soft_panic_or_log!(
3285                "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3286                self.output_frontier,
3287            );
3288            return;
3289        } else if new_frontier == self.output_frontier {
3290            return;
3291        }
3292
3293        self.output_frontier = new_frontier;
3294    }
3295}
3296
3297/// Maintains the introspection state for a given replica and collection, and ensures that reported
3298/// introspection data is retracted when the collection is dropped.
3299#[derive(Debug)]
3300struct ReplicaCollectionIntrospection<T: ComputeControllerTimestamp> {
3301    /// The ID of the replica.
3302    replica_id: ReplicaId,
3303    /// The ID of the compute collection.
3304    collection_id: GlobalId,
3305    /// The collection's reported replica write frontier.
3306    write_frontier: Antichain<T>,
3307    /// A channel through which introspection updates are delivered.
3308    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3309}
3310
3311impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
3312    /// Create a new `HydrationState` and initialize introspection.
3313    fn new(
3314        replica_id: ReplicaId,
3315        collection_id: GlobalId,
3316        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3317        as_of: Antichain<T>,
3318    ) -> Self {
3319        let self_ = Self {
3320            replica_id,
3321            collection_id,
3322            write_frontier: as_of,
3323            introspection_tx,
3324        };
3325
3326        self_.report_initial_state();
3327        self_
3328    }
3329
3330    /// Reports the initial introspection state.
3331    fn report_initial_state(&self) {
3332        let row = self.write_frontier_row();
3333        let updates = vec![(row, Diff::ONE)];
3334        self.send(IntrospectionType::ReplicaFrontiers, updates);
3335    }
3336
3337    /// Observe the given current write frontier and update the introspection state as necessary.
3338    fn observe_frontier(&mut self, write_frontier: &Antichain<T>) {
3339        if self.write_frontier == *write_frontier {
3340            return; // no change
3341        }
3342
3343        let retraction = self.write_frontier_row();
3344        self.write_frontier.clone_from(write_frontier);
3345        let insertion = self.write_frontier_row();
3346
3347        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3348        self.send(IntrospectionType::ReplicaFrontiers, updates);
3349    }
3350
3351    /// Return a `Row` reflecting the current replica write frontier.
3352    fn write_frontier_row(&self) -> Row {
3353        let write_frontier = self
3354            .write_frontier
3355            .as_option()
3356            .map_or(Datum::Null, |ts| ts.clone().into());
3357        Row::pack_slice(&[
3358            Datum::String(&self.collection_id.to_string()),
3359            Datum::String(&self.replica_id.to_string()),
3360            write_frontier,
3361        ])
3362    }
3363
3364    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3365        // Failure to send means the `ComputeController` has been dropped and doesn't care about
3366        // introspection updates anymore.
3367        let _ = self.introspection_tx.send((introspection_type, updates));
3368    }
3369}
3370
3371impl<T: ComputeControllerTimestamp> Drop for ReplicaCollectionIntrospection<T> {
3372    fn drop(&mut self) {
3373        // Retract the write frontier.
3374        let row = self.write_frontier_row();
3375        let updates = vec![(row, Diff::MINUS_ONE)];
3376        self.send(IntrospectionType::ReplicaFrontiers, updates);
3377    }
3378}