mz_compute_client/controller/
instance.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller for a compute instance.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::num::NonZeroI64;
15use std::sync::{Arc, Mutex};
16use std::time::{Duration, Instant};
17
18use chrono::{DateTime, DurationRound, TimeDelta, Utc};
19use mz_build_info::BuildInfo;
20use mz_cluster_client::WallclockLagFn;
21use mz_cluster_client::client::ClusterStartupEpoch;
22use mz_compute_types::ComputeInstanceId;
23use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
24use mz_compute_types::plan::LirId;
25use mz_compute_types::plan::render_plan::RenderPlan;
26use mz_compute_types::sinks::{
27    ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, MaterializedViewSinkConnection,
28};
29use mz_compute_types::sources::SourceInstanceDesc;
30use mz_controller_types::dyncfgs::{
31    ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION, WALLCLOCK_LAG_RECORDING_INTERVAL,
32};
33use mz_dyncfg::ConfigSet;
34use mz_expr::RowSetFinishing;
35use mz_ore::cast::CastFrom;
36use mz_ore::channel::instrumented_unbounded_channel;
37use mz_ore::now::NowFn;
38use mz_ore::tracing::OpenTelemetryContext;
39use mz_ore::{soft_assert_or_log, soft_panic_or_log};
40use mz_repr::adt::interval::Interval;
41use mz_repr::adt::timestamp::CheckedTimestamp;
42use mz_repr::refresh_schedule::RefreshSchedule;
43use mz_repr::{Datum, Diff, GlobalId, Row};
44use mz_storage_client::controller::{IntrospectionType, WallclockLagHistogramPeriod};
45use mz_storage_types::read_holds::{self, ReadHold};
46use mz_storage_types::read_policy::ReadPolicy;
47use serde::Serialize;
48use thiserror::Error;
49use timely::PartialOrder;
50use timely::progress::frontier::MutableAntichain;
51use timely::progress::{Antichain, ChangeBatch, Timestamp};
52use tokio::sync::mpsc::error::SendError;
53use tokio::sync::{mpsc, oneshot};
54use uuid::Uuid;
55
56use crate::controller::error::{
57    CollectionLookupError, CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
58};
59use crate::controller::replica::{ReplicaClient, ReplicaConfig};
60use crate::controller::{
61    ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, PeekNotification,
62    ReplicaId, StorageCollections,
63};
64use crate::logging::LogVariant;
65use crate::metrics::IntCounter;
66use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
67use crate::protocol::command::{ComputeCommand, ComputeParameters, Peek, PeekTarget};
68use crate::protocol::history::ComputeCommandHistory;
69use crate::protocol::response::{
70    ComputeResponse, CopyToResponse, FrontiersResponse, OperatorHydrationStatus, PeekResponse,
71    StatusResponse, SubscribeBatch, SubscribeResponse,
72};
73use crate::service::{ComputeClient, ComputeGrpcClient};
74
75#[derive(Error, Debug)]
76#[error("replica exists already: {0}")]
77pub(super) struct ReplicaExists(pub ReplicaId);
78
79#[derive(Error, Debug)]
80#[error("replica does not exist: {0}")]
81pub(super) struct ReplicaMissing(pub ReplicaId);
82
83#[derive(Error, Debug)]
84pub(super) enum DataflowCreationError {
85    #[error("collection does not exist: {0}")]
86    CollectionMissing(GlobalId),
87    #[error("replica does not exist: {0}")]
88    ReplicaMissing(ReplicaId),
89    #[error("dataflow definition lacks an as_of value")]
90    MissingAsOf,
91    #[error("subscribe dataflow has an empty as_of")]
92    EmptyAsOfForSubscribe,
93    #[error("copy to dataflow has an empty as_of")]
94    EmptyAsOfForCopyTo,
95    #[error("no read hold provided for dataflow import: {0}")]
96    ReadHoldMissing(GlobalId),
97    #[error("insufficient read hold provided for dataflow import: {0}")]
98    ReadHoldInsufficient(GlobalId),
99}
100
101impl From<CollectionMissing> for DataflowCreationError {
102    fn from(error: CollectionMissing) -> Self {
103        Self::CollectionMissing(error.0)
104    }
105}
106
107#[derive(Error, Debug)]
108pub(super) enum PeekError {
109    #[error("replica does not exist: {0}")]
110    ReplicaMissing(ReplicaId),
111    #[error("read hold ID does not match peeked collection: {0}")]
112    ReadHoldIdMismatch(GlobalId),
113    #[error("insufficient read hold provided: {0}")]
114    ReadHoldInsufficient(GlobalId),
115}
116
117#[derive(Error, Debug)]
118pub(super) enum ReadPolicyError {
119    #[error("collection does not exist: {0}")]
120    CollectionMissing(GlobalId),
121    #[error("collection is write-only: {0}")]
122    WriteOnlyCollection(GlobalId),
123}
124
125impl From<CollectionMissing> for ReadPolicyError {
126    fn from(error: CollectionMissing) -> Self {
127        Self::CollectionMissing(error.0)
128    }
129}
130
131/// A command sent to an [`Instance`] task.
132pub type Command<T> = Box<dyn FnOnce(&mut Instance<T>) + Send>;
133
134/// A client for an [`Instance`] task.
135#[derive(Clone, derivative::Derivative)]
136#[derivative(Debug)]
137pub(super) struct Client<T: ComputeControllerTimestamp> {
138    /// A sender for commands for the instance.
139    command_tx: mpsc::UnboundedSender<Command<T>>,
140    /// A sender for read hold changes for collections installed on the instance.
141    #[derivative(Debug = "ignore")]
142    read_hold_tx: read_holds::ChangeTx<T>,
143}
144
145impl<T: ComputeControllerTimestamp> Client<T> {
146    pub fn send(&self, command: Command<T>) -> Result<(), SendError<Command<T>>> {
147        self.command_tx.send(command)
148    }
149
150    pub fn read_hold_tx(&self) -> read_holds::ChangeTx<T> {
151        Arc::clone(&self.read_hold_tx)
152    }
153}
154
155impl<T> Client<T>
156where
157    T: ComputeControllerTimestamp,
158    ComputeGrpcClient: ComputeClient<T>,
159{
160    pub fn spawn(
161        id: ComputeInstanceId,
162        build_info: &'static BuildInfo,
163        storage: StorageCollections<T>,
164        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
165        envd_epoch: NonZeroI64,
166        metrics: InstanceMetrics,
167        now: NowFn,
168        wallclock_lag: WallclockLagFn<T>,
169        dyncfg: Arc<ConfigSet>,
170        response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
171        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
172    ) -> Self {
173        let (command_tx, command_rx) = mpsc::unbounded_channel();
174
175        let read_hold_tx: read_holds::ChangeTx<_> = {
176            let command_tx = command_tx.clone();
177            Arc::new(move |id, change: ChangeBatch<_>| {
178                let cmd: Command<_> = {
179                    let change = change.clone();
180                    Box::new(move |i| i.apply_read_hold_change(id, change.clone()))
181                };
182                command_tx.send(cmd).map_err(|_| SendError((id, change)))
183            })
184        };
185
186        mz_ore::task::spawn(
187            || format!("compute-instance-{id}"),
188            Instance::new(
189                build_info,
190                storage,
191                arranged_logs,
192                envd_epoch,
193                metrics,
194                now,
195                wallclock_lag,
196                dyncfg,
197                command_rx,
198                response_tx,
199                Arc::clone(&read_hold_tx),
200                introspection_tx,
201            )
202            .run(),
203        );
204
205        Self {
206            command_tx,
207            read_hold_tx,
208        }
209    }
210}
211
212/// A response from a replica, composed of a replica ID, the replica's current epoch, and the
213/// compute response itself.
214pub(super) type ReplicaResponse<T> = (ReplicaId, u64, ComputeResponse<T>);
215
216/// The state we keep for a compute instance.
217pub(super) struct Instance<T: ComputeControllerTimestamp> {
218    /// Build info for spawning replicas
219    build_info: &'static BuildInfo,
220    /// A handle providing access to storage collections.
221    storage_collections: StorageCollections<T>,
222    /// Whether instance initialization has been completed.
223    initialized: bool,
224    /// Whether or not this instance is in read-only mode.
225    ///
226    /// When in read-only mode, neither the controller nor the instances
227    /// controlled by it are allowed to affect changes to external systems
228    /// (largely persist).
229    read_only: bool,
230    /// The workload class of this instance.
231    ///
232    /// This is currently only used to annotate metrics.
233    workload_class: Option<String>,
234    /// The replicas of this compute instance.
235    replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
236    /// Currently installed compute collections.
237    ///
238    /// New entries are added for all collections exported from dataflows created through
239    /// [`Instance::create_dataflow`].
240    ///
241    /// Entries are removed by [`Instance::cleanup_collections`]. See that method's documentation
242    /// about the conditions for removing collection state.
243    collections: BTreeMap<GlobalId, CollectionState<T>>,
244    /// IDs of log sources maintained by this compute instance.
245    log_sources: BTreeMap<LogVariant, GlobalId>,
246    /// Currently outstanding peeks.
247    ///
248    /// New entries are added for all peeks initiated through [`Instance::peek`].
249    ///
250    /// The entry for a peek is only removed once all replicas have responded to the peek. This is
251    /// currently required to ensure all replicas have stopped reading from the peeked collection's
252    /// inputs before we allow them to compact. database-issues#4822 tracks changing this so we only have to wait
253    /// for the first peek response.
254    peeks: BTreeMap<Uuid, PendingPeek<T>>,
255    /// Currently in-progress subscribes.
256    ///
257    /// New entries are added for all subscribes exported from dataflows created through
258    /// [`Instance::create_dataflow`].
259    ///
260    /// The entry for a subscribe is removed once at least one replica has reported the subscribe
261    /// to have advanced to the empty frontier or to have been dropped, implying that no further
262    /// updates will be emitted for this subscribe.
263    ///
264    /// Note that subscribes are tracked both in `collections` and `subscribes`. `collections`
265    /// keeps track of the subscribe's upper and since frontiers and ensures appropriate read holds
266    /// on the subscribe's input. `subscribes` is only used to track which updates have been
267    /// emitted, to decide if new ones should be emitted or suppressed.
268    subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>,
269    /// Tracks all in-progress COPY TOs.
270    ///
271    /// New entries are added for all s3 oneshot sinks (corresponding to a COPY TO) exported from
272    /// dataflows created through [`Instance::create_dataflow`].
273    ///
274    /// The entry for a copy to is removed once at least one replica has finished
275    /// or the exporting collection is dropped.
276    copy_tos: BTreeSet<GlobalId>,
277    /// The command history, used when introducing new replicas or restarting existing replicas.
278    history: ComputeCommandHistory<UIntGauge, T>,
279    /// Receiver for commands to be executed.
280    command_rx: mpsc::UnboundedReceiver<Command<T>>,
281    /// Sender for responses to be delivered.
282    response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
283    /// Sender for introspection updates to be recorded.
284    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
285    /// A number that increases with each restart of `environmentd`.
286    envd_epoch: NonZeroI64,
287    /// Numbers that increase with each restart of a replica.
288    replica_epochs: BTreeMap<ReplicaId, u64>,
289    /// The registry the controller uses to report metrics.
290    metrics: InstanceMetrics,
291    /// Dynamic system configuration.
292    dyncfg: Arc<ConfigSet>,
293
294    /// A function that produces the current wallclock time.
295    now: NowFn,
296    /// A function that computes the lag between the given time and wallclock time.
297    wallclock_lag: WallclockLagFn<T>,
298    /// The last time wallclock lag introspection was recorded.
299    wallclock_lag_last_recorded: DateTime<Utc>,
300
301    /// Sender for updates to collection read holds.
302    ///
303    /// Copies of this sender are given to [`ReadHold`]s that are created in
304    /// [`CollectionState::new`].
305    read_hold_tx: read_holds::ChangeTx<T>,
306    /// A sender for responses from replicas.
307    replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
308    /// A receiver for responses from replicas.
309    replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse<T>, IntCounter>,
310}
311
312impl<T: ComputeControllerTimestamp> Instance<T> {
313    /// Acquire a handle to the collection state associated with `id`.
314    fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, CollectionMissing> {
315        self.collections.get(&id).ok_or(CollectionMissing(id))
316    }
317
318    /// Acquire a mutable handle to the collection state associated with `id`.
319    fn collection_mut(
320        &mut self,
321        id: GlobalId,
322    ) -> Result<&mut CollectionState<T>, CollectionMissing> {
323        self.collections.get_mut(&id).ok_or(CollectionMissing(id))
324    }
325
326    /// Acquire a handle to the collection state associated with `id`.
327    ///
328    /// # Panics
329    ///
330    /// Panics if the identified collection does not exist.
331    fn expect_collection(&self, id: GlobalId) -> &CollectionState<T> {
332        self.collections.get(&id).expect("collection must exist")
333    }
334
335    /// Acquire a mutable handle to the collection state associated with `id`.
336    ///
337    /// # Panics
338    ///
339    /// Panics if the identified collection does not exist.
340    fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T> {
341        self.collections
342            .get_mut(&id)
343            .expect("collection must exist")
344    }
345
346    fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState<T>)> {
347        self.collections.iter().map(|(id, coll)| (*id, coll))
348    }
349
350    /// Add a collection to the instance state.
351    ///
352    /// # Panics
353    ///
354    /// Panics if a collection with the same ID exists already.
355    fn add_collection(
356        &mut self,
357        id: GlobalId,
358        as_of: Antichain<T>,
359        shared: SharedCollectionState<T>,
360        storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
361        compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
362        replica_input_read_holds: Vec<ReadHold<T>>,
363        write_only: bool,
364        storage_sink: bool,
365        initial_as_of: Option<Antichain<T>>,
366        refresh_schedule: Option<RefreshSchedule>,
367    ) {
368        // Add global collection state.
369        let introspection = CollectionIntrospection::new(
370            id,
371            self.introspection_tx.clone(),
372            as_of.clone(),
373            storage_sink,
374            initial_as_of,
375            refresh_schedule,
376        );
377        let mut state = CollectionState::new(
378            id,
379            as_of.clone(),
380            shared,
381            storage_dependencies,
382            compute_dependencies,
383            Arc::clone(&self.read_hold_tx),
384            introspection,
385        );
386        // If the collection is write-only, clear its read policy to reflect that.
387        if write_only {
388            state.read_policy = None;
389        }
390
391        if let Some(previous) = self.collections.insert(id, state) {
392            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
393        }
394
395        // Add per-replica collection state.
396        for replica in self.replicas.values_mut() {
397            replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
398        }
399
400        // Update introspection.
401        self.report_dependency_updates(id, Diff::ONE);
402    }
403
404    fn remove_collection(&mut self, id: GlobalId) {
405        // Update introspection.
406        self.report_dependency_updates(id, Diff::MINUS_ONE);
407
408        // Remove per-replica collection state.
409        for replica in self.replicas.values_mut() {
410            replica.remove_collection(id);
411        }
412
413        // Remove global collection state.
414        self.collections.remove(&id);
415    }
416
417    fn add_replica_state(
418        &mut self,
419        id: ReplicaId,
420        client: ReplicaClient<T>,
421        config: ReplicaConfig,
422        epoch: ClusterStartupEpoch,
423    ) {
424        let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
425
426        let metrics = self.metrics.for_replica(id);
427        let mut replica = ReplicaState::new(
428            id,
429            client,
430            config,
431            metrics,
432            self.introspection_tx.clone(),
433            epoch,
434        );
435
436        // Add per-replica collection state.
437        for (collection_id, collection) in &self.collections {
438            // Skip log collections not maintained by this replica.
439            if collection.log_collection && !log_ids.contains(collection_id) {
440                continue;
441            }
442
443            let as_of = collection.read_frontier().to_owned();
444            let input_read_holds = collection.storage_dependencies.values().cloned().collect();
445            replica.add_collection(*collection_id, as_of, input_read_holds);
446        }
447
448        self.replicas.insert(id, replica);
449    }
450
451    /// Enqueue the given response for delivery to the controller clients.
452    fn deliver_response(&self, response: ComputeControllerResponse<T>) {
453        // Failure to send means the `ComputeController` has been dropped and doesn't care about
454        // responses anymore.
455        let _ = self.response_tx.send(response);
456    }
457
458    /// Enqueue the given introspection updates for recording.
459    fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
460        // Failure to send means the `ComputeController` has been dropped and doesn't care about
461        // introspection updates anymore.
462        let _ = self.introspection_tx.send((type_, updates));
463    }
464
465    /// Returns whether the identified replica exists.
466    fn replica_exists(&self, id: ReplicaId) -> bool {
467        self.replicas.contains_key(&id)
468    }
469
470    /// Return the IDs of pending peeks targeting the specified replica.
471    fn peeks_targeting(
472        &self,
473        replica_id: ReplicaId,
474    ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)> {
475        self.peeks.iter().filter_map(move |(uuid, peek)| {
476            if peek.target_replica == Some(replica_id) {
477                Some((*uuid, peek))
478            } else {
479                None
480            }
481        })
482    }
483
484    /// Return the IDs of in-progress subscribes targeting the specified replica.
485    fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
486        self.subscribes.iter().filter_map(move |(id, subscribe)| {
487            let targeting = subscribe.target_replica == Some(replica_id);
488            targeting.then_some(*id)
489        })
490    }
491
492    /// Update introspection with the current collection frontiers.
493    ///
494    /// We could also do this directly in response to frontier changes, but doing it periodically
495    /// lets us avoid emitting some introspection updates that can be consolidated (e.g. a write
496    /// frontier updated immediately followed by a read frontier update).
497    ///
498    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
499    /// per second during normal operation.
500    fn update_frontier_introspection(&mut self) {
501        for collection in self.collections.values_mut() {
502            collection
503                .introspection
504                .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
505        }
506
507        for replica in self.replicas.values_mut() {
508            for collection in replica.collections.values_mut() {
509                collection
510                    .introspection
511                    .observe_frontier(&collection.write_frontier);
512            }
513        }
514    }
515
516    /// Refresh the controller state metrics for this instance.
517    ///
518    /// We could also do state metric updates directly in response to state changes, but that would
519    /// mean littering the code with metric update calls. Encapsulating state metric maintenance in
520    /// a single method is less noisy.
521    ///
522    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
523    /// per second during normal operation.
524    fn refresh_state_metrics(&self) {
525        let unscheduled_collections_count =
526            self.collections.values().filter(|c| !c.scheduled).count();
527        let connected_replica_count = self
528            .replicas
529            .values()
530            .filter(|r| r.client.is_connected())
531            .count();
532
533        self.metrics
534            .replica_count
535            .set(u64::cast_from(self.replicas.len()));
536        self.metrics
537            .collection_count
538            .set(u64::cast_from(self.collections.len()));
539        self.metrics
540            .collection_unscheduled_count
541            .set(u64::cast_from(unscheduled_collections_count));
542        self.metrics
543            .peek_count
544            .set(u64::cast_from(self.peeks.len()));
545        self.metrics
546            .subscribe_count
547            .set(u64::cast_from(self.subscribes.len()));
548        self.metrics
549            .copy_to_count
550            .set(u64::cast_from(self.copy_tos.len()));
551        self.metrics
552            .connected_replica_count
553            .set(u64::cast_from(connected_replica_count));
554    }
555
556    /// Refresh the wallclock lag introspection and metrics with the current lag values.
557    ///
558    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
559    /// per second during normal operation.
560    fn refresh_wallclock_lag(&mut self) {
561        // Record lags to persist, if it's time.
562        self.maybe_record_wallclock_lag();
563
564        let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
565            Some(ts) => (self.wallclock_lag)(ts.clone()),
566            None => Duration::ZERO,
567        };
568
569        for replica in self.replicas.values_mut() {
570            for collection in replica.collections.values_mut() {
571                let lag = frontier_lag(&collection.write_frontier);
572
573                if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
574                    *wallclock_lag_max = std::cmp::max(*wallclock_lag_max, lag);
575                }
576
577                if let Some(metrics) = &mut collection.metrics {
578                    metrics.wallclock_lag.observe(lag);
579                };
580            }
581        }
582
583        let now_ms = (self.now)();
584        let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
585        let histogram_labels = match &self.workload_class {
586            Some(wc) => [("workload_class", wc.clone())].into(),
587            None => BTreeMap::new(),
588        };
589
590        for collection in self.collections.values_mut() {
591            if !ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION.get(&self.dyncfg) {
592                continue;
593            }
594
595            if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
596                let lag = collection.shared.lock_write_frontier(|f| frontier_lag(f));
597                let bucket = lag.as_secs().next_power_of_two();
598
599                let key = (histogram_period, bucket, histogram_labels.clone());
600                *stash.entry(key).or_default() += Diff::ONE;
601            }
602        }
603    }
604
605    /// Produce new wallclock lag introspection updates, provided enough time has passed since the
606    /// last recording.
607    //
608    /// We emit new introspection updates if the system time has passed into a new multiple of the
609    /// recording interval (typically 1 minute) since the last refresh. The storage controller uses
610    /// the same approach, ensuring that both controllers commit their lags at roughly the same
611    /// time, avoiding confusion caused by inconsistencies.
612    fn maybe_record_wallclock_lag(&mut self) {
613        if self.read_only {
614            return;
615        }
616
617        let duration_trunc = |datetime: DateTime<_>, interval| {
618            let td = TimeDelta::from_std(interval).ok()?;
619            datetime.duration_trunc(td).ok()
620        };
621
622        let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
623        let now_dt = mz_ore::now::to_datetime((self.now)());
624        let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
625            soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
626            let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
627            duration_trunc(now_dt, *default).unwrap()
628        });
629        if now_trunc <= self.wallclock_lag_last_recorded {
630            return;
631        }
632
633        let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
634
635        let mut history_updates = Vec::new();
636        for (replica_id, replica) in &mut self.replicas {
637            for (collection_id, collection) in &mut replica.collections {
638                let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
639                    continue;
640                };
641
642                let max_lag = std::mem::take(wallclock_lag_max);
643                let max_lag_us = i64::try_from(max_lag.as_micros()).expect("must fit");
644                let row = Row::pack_slice(&[
645                    Datum::String(&collection_id.to_string()),
646                    Datum::String(&replica_id.to_string()),
647                    Datum::Interval(Interval::new(0, 0, max_lag_us)),
648                    Datum::TimestampTz(now_ts),
649                ]);
650                history_updates.push((row, Diff::ONE));
651            }
652        }
653        if !history_updates.is_empty() {
654            self.deliver_introspection_updates(
655                IntrospectionType::WallclockLagHistory,
656                history_updates,
657            );
658        }
659
660        let mut histogram_updates = Vec::new();
661        let mut row_buf = Row::default();
662        for (collection_id, collection) in &mut self.collections {
663            let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
664                continue;
665            };
666
667            for ((period, lag, labels), count) in std::mem::take(stash) {
668                let mut packer = row_buf.packer();
669                packer.extend([
670                    Datum::TimestampTz(period.start),
671                    Datum::TimestampTz(period.end),
672                    Datum::String(&collection_id.to_string()),
673                    Datum::UInt64(lag),
674                ]);
675                let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
676                packer.push_dict(labels);
677
678                histogram_updates.push((row_buf.clone(), count));
679            }
680        }
681        if !histogram_updates.is_empty() {
682            self.deliver_introspection_updates(
683                IntrospectionType::WallclockLagHistogram,
684                histogram_updates,
685            );
686        }
687
688        self.wallclock_lag_last_recorded = now_trunc;
689    }
690
691    /// Report updates (inserts or retractions) to the identified collection's dependencies.
692    ///
693    /// # Panics
694    ///
695    /// Panics if the identified collection does not exist.
696    fn report_dependency_updates(&self, id: GlobalId, diff: Diff) {
697        let collection = self.expect_collection(id);
698        let dependencies = collection.dependency_ids();
699
700        let updates = dependencies
701            .map(|dependency_id| {
702                let row = Row::pack_slice(&[
703                    Datum::String(&id.to_string()),
704                    Datum::String(&dependency_id.to_string()),
705                ]);
706                (row, diff)
707            })
708            .collect();
709
710        self.deliver_introspection_updates(IntrospectionType::ComputeDependencies, updates);
711    }
712
713    /// Update the tracked hydration status for an operator according to a received status update.
714    fn update_operator_hydration_status(
715        &mut self,
716        replica_id: ReplicaId,
717        status: OperatorHydrationStatus,
718    ) {
719        let Some(replica) = self.replicas.get_mut(&replica_id) else {
720            tracing::error!(
721                %replica_id, ?status,
722                "status update for an unknown replica"
723            );
724            return;
725        };
726        let Some(collection) = replica.collections.get_mut(&status.collection_id) else {
727            tracing::error!(
728                %replica_id, ?status,
729                "status update for an unknown collection"
730            );
731            return;
732        };
733
734        collection.introspection.operator_hydrated(
735            status.lir_id,
736            status.worker_id,
737            status.hydrated,
738        );
739    }
740
741    /// Returns `true` if the given collection is hydrated on at least one
742    /// replica.
743    ///
744    /// This also returns `true` in case this cluster does not have any
745    /// replicas.
746    #[mz_ore::instrument(level = "debug")]
747    pub fn collection_hydrated(
748        &self,
749        collection_id: GlobalId,
750    ) -> Result<bool, CollectionLookupError> {
751        if self.replicas.is_empty() {
752            return Ok(true);
753        }
754
755        for replica_state in self.replicas.values() {
756            let collection_state = replica_state
757                .collections
758                .get(&collection_id)
759                .ok_or(CollectionLookupError::CollectionMissing(collection_id))?;
760
761            if collection_state.hydrated() {
762                return Ok(true);
763            }
764        }
765
766        Ok(false)
767    }
768
769    /// Returns `true` if each non-transient, non-excluded collection is hydrated on at
770    /// least one replica.
771    ///
772    /// This also returns `true` in case this cluster does not have any
773    /// replicas.
774    #[mz_ore::instrument(level = "debug")]
775    pub fn collections_hydrated_on_replicas(
776        &self,
777        target_replica_ids: Option<Vec<ReplicaId>>,
778        exclude_collections: &BTreeSet<GlobalId>,
779    ) -> Result<bool, HydrationCheckBadTarget> {
780        if self.replicas.is_empty() {
781            return Ok(true);
782        }
783        let mut all_hydrated = true;
784        let target_replicas: BTreeSet<ReplicaId> = self
785            .replicas
786            .keys()
787            .filter_map(|id| match target_replica_ids {
788                None => Some(id.clone()),
789                Some(ref ids) if ids.contains(id) => Some(id.clone()),
790                Some(_) => None,
791            })
792            .collect();
793        if let Some(targets) = target_replica_ids {
794            if target_replicas.is_empty() {
795                return Err(HydrationCheckBadTarget(targets));
796            }
797        }
798
799        for (id, _collection) in self.collections_iter() {
800            if id.is_transient() || exclude_collections.contains(&id) {
801                continue;
802            }
803
804            let mut collection_hydrated = false;
805            for replica_state in self.replicas.values() {
806                if !target_replicas.contains(&replica_state.id) {
807                    continue;
808                }
809                let collection_state = replica_state
810                    .collections
811                    .get(&id)
812                    .expect("missing collection state");
813
814                if collection_state.hydrated() {
815                    collection_hydrated = true;
816                    break;
817                }
818            }
819
820            if !collection_hydrated {
821                tracing::info!("collection {id} is not hydrated on any replica");
822                all_hydrated = false;
823                // We continue with our loop instead of breaking out early, so
824                // that we log all non-hydrated replicas.
825            }
826        }
827
828        Ok(all_hydrated)
829    }
830
831    /// Returns `true` if all non-transient, non-excluded collections are hydrated on at least one
832    /// replica.
833    ///
834    /// This also returns `true` in case this cluster does not have any
835    /// replicas.
836    #[mz_ore::instrument(level = "debug")]
837    pub fn collections_hydrated(&self, exclude_collections: &BTreeSet<GlobalId>) -> bool {
838        self.collections_hydrated_on_replicas(None, exclude_collections)
839            .expect("Cannot error if target_replica_ids is None")
840    }
841
842    /// Clean up collection state that is not needed anymore.
843    ///
844    /// Three conditions need to be true before we can remove state for a collection:
845    ///
846    ///  1. A client must have explicitly dropped the collection. If that is not the case, clients
847    ///     can still reasonably assume that the controller knows about the collection and can
848    ///     answer queries about it.
849    ///  2. There must be no outstanding read capabilities on the collection. As long as someone
850    ///     still holds read capabilities on a collection, we need to keep it around to be able
851    ///     to properly handle downgrading of said capabilities.
852    ///  3. All replica frontiers for the collection must have advanced to the empty frontier.
853    ///     Advancement to the empty frontiers signals that replicas are done computing the
854    ///     collection and that they won't send more `ComputeResponse`s for it. As long as we might
855    ///     receive responses for a collection we want to keep it around to be able to validate and
856    ///     handle these responses.
857    fn cleanup_collections(&mut self) {
858        let to_remove: Vec<_> = self
859            .collections_iter()
860            .filter(|(id, collection)| {
861                collection.dropped
862                    && collection.shared.lock_read_capabilities(|c| c.is_empty())
863                    && self
864                        .replicas
865                        .values()
866                        .all(|r| r.collection_frontiers_empty(*id))
867            })
868            .map(|(id, _collection)| id)
869            .collect();
870
871        for id in to_remove {
872            self.remove_collection(id);
873        }
874    }
875
876    /// Returns the state of the [`Instance`] formatted as JSON.
877    ///
878    /// The returned value is not guaranteed to be stable and may change at any point in time.
879    #[mz_ore::instrument(level = "debug")]
880    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
881        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
882        // returned object as a tradeoff between usability and stability. `serde_json` will fail
883        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
884        // prevents a future unrelated change from silently breaking this method.
885
886        // Destructure `self` here so we don't forget to consider dumping newly added fields.
887        let Self {
888            build_info: _,
889            storage_collections: _,
890            initialized,
891            read_only,
892            workload_class,
893            replicas,
894            collections,
895            log_sources: _,
896            peeks,
897            subscribes,
898            copy_tos,
899            history: _,
900            command_rx: _,
901            response_tx: _,
902            introspection_tx: _,
903            envd_epoch,
904            replica_epochs,
905            metrics: _,
906            dyncfg: _,
907            now: _,
908            wallclock_lag: _,
909            wallclock_lag_last_recorded,
910            read_hold_tx: _,
911            replica_tx: _,
912            replica_rx: _,
913        } = self;
914
915        fn field(
916            key: &str,
917            value: impl Serialize,
918        ) -> Result<(String, serde_json::Value), anyhow::Error> {
919            let value = serde_json::to_value(value)?;
920            Ok((key.to_string(), value))
921        }
922
923        let replicas: BTreeMap<_, _> = replicas
924            .iter()
925            .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
926            .collect::<Result<_, anyhow::Error>>()?;
927        let collections: BTreeMap<_, _> = collections
928            .iter()
929            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
930            .collect();
931        let peeks: BTreeMap<_, _> = peeks
932            .iter()
933            .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
934            .collect();
935        let subscribes: BTreeMap<_, _> = subscribes
936            .iter()
937            .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
938            .collect();
939        let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
940        let replica_epochs: BTreeMap<_, _> = replica_epochs
941            .iter()
942            .map(|(id, epoch)| (id.to_string(), epoch))
943            .collect();
944        let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
945
946        let map = serde_json::Map::from_iter([
947            field("initialized", initialized)?,
948            field("read_only", read_only)?,
949            field("workload_class", workload_class)?,
950            field("replicas", replicas)?,
951            field("collections", collections)?,
952            field("peeks", peeks)?,
953            field("subscribes", subscribes)?,
954            field("copy_tos", copy_tos)?,
955            field("envd_epoch", envd_epoch)?,
956            field("replica_epochs", replica_epochs)?,
957            field("wallclock_lag_last_recorded", wallclock_lag_last_recorded)?,
958        ]);
959        Ok(serde_json::Value::Object(map))
960    }
961}
962
963impl<T> Instance<T>
964where
965    T: ComputeControllerTimestamp,
966    ComputeGrpcClient: ComputeClient<T>,
967{
968    fn new(
969        build_info: &'static BuildInfo,
970        storage: StorageCollections<T>,
971        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
972        envd_epoch: NonZeroI64,
973        metrics: InstanceMetrics,
974        now: NowFn,
975        wallclock_lag: WallclockLagFn<T>,
976        dyncfg: Arc<ConfigSet>,
977        command_rx: mpsc::UnboundedReceiver<Command<T>>,
978        response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
979        read_hold_tx: read_holds::ChangeTx<T>,
980        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
981    ) -> Self {
982        let mut collections = BTreeMap::new();
983        let mut log_sources = BTreeMap::new();
984        for (log, id, shared) in arranged_logs {
985            let collection = CollectionState::new_log_collection(
986                id,
987                shared,
988                Arc::clone(&read_hold_tx),
989                introspection_tx.clone(),
990            );
991            collections.insert(id, collection);
992            log_sources.insert(log, id);
993        }
994
995        let history = ComputeCommandHistory::new(metrics.for_history());
996
997        let send_count = metrics.response_send_count.clone();
998        let recv_count = metrics.response_recv_count.clone();
999        let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
1000
1001        let now_dt = mz_ore::now::to_datetime(now());
1002
1003        Self {
1004            build_info,
1005            storage_collections: storage,
1006            initialized: false,
1007            read_only: true,
1008            workload_class: None,
1009            replicas: Default::default(),
1010            collections,
1011            log_sources,
1012            peeks: Default::default(),
1013            subscribes: Default::default(),
1014            copy_tos: Default::default(),
1015            history,
1016            command_rx,
1017            response_tx,
1018            introspection_tx,
1019            envd_epoch,
1020            replica_epochs: Default::default(),
1021            metrics,
1022            dyncfg,
1023            now,
1024            wallclock_lag,
1025            wallclock_lag_last_recorded: now_dt,
1026            read_hold_tx,
1027            replica_tx,
1028            replica_rx,
1029        }
1030    }
1031
1032    async fn run(mut self) {
1033        self.send(ComputeCommand::CreateTimely {
1034            config: Default::default(),
1035            epoch: ClusterStartupEpoch::new(self.envd_epoch, 0),
1036        });
1037
1038        // Send a placeholder instance configuration for the replica task to fill in.
1039        let dummy_instance_config = Default::default();
1040        self.send(ComputeCommand::CreateInstance(dummy_instance_config));
1041
1042        loop {
1043            tokio::select! {
1044                command = self.command_rx.recv() => match command {
1045                    Some(cmd) => cmd(&mut self),
1046                    None => break,
1047                },
1048                response = self.replica_rx.recv() => match response {
1049                    Some(response) => self.handle_response(response),
1050                    None => unreachable!("self owns a sender side of the channel"),
1051                }
1052            }
1053        }
1054    }
1055
1056    /// Update instance configuration.
1057    #[mz_ore::instrument(level = "debug")]
1058    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
1059        if let Some(workload_class) = &config_params.workload_class {
1060            self.workload_class = workload_class.clone();
1061        }
1062
1063        let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
1064        self.send(command);
1065    }
1066
1067    /// Marks the end of any initialization commands.
1068    ///
1069    /// Intended to be called by `Controller`, rather than by other code.
1070    /// Calling this method repeatedly has no effect.
1071    #[mz_ore::instrument(level = "debug")]
1072    pub fn initialization_complete(&mut self) {
1073        // The compute protocol requires that `InitializationComplete` is sent only once.
1074        if !self.initialized {
1075            self.send(ComputeCommand::InitializationComplete);
1076            self.initialized = true;
1077        }
1078    }
1079
1080    /// Allows this instance to affect writes to external systems (persist).
1081    ///
1082    /// Calling this method repeatedly has no effect.
1083    #[mz_ore::instrument(level = "debug")]
1084    pub fn allow_writes(&mut self) {
1085        if self.read_only {
1086            self.read_only = false;
1087            self.send(ComputeCommand::AllowWrites);
1088        }
1089    }
1090
1091    /// Shut down this instance.
1092    ///
1093    /// This method runs various assertions ensuring the instance state is empty. It exists to help
1094    /// us find bugs where the client drops a compute instance that still has replicas or
1095    /// collections installed, and later assumes that said replicas/collections still exists.
1096    ///
1097    /// # Panics
1098    ///
1099    /// Panics if the compute instance still has active replicas.
1100    /// Panics if the compute instance still has collections installed.
1101    #[mz_ore::instrument(level = "debug")]
1102    pub fn shutdown(&mut self) {
1103        // Taking the `command_rx` ensures that the [`Instance::run`] loop terminates.
1104        let (_tx, rx) = mpsc::unbounded_channel();
1105        let mut command_rx = std::mem::replace(&mut self.command_rx, rx);
1106
1107        // Apply all outstanding read hold changes. This might cause read hold downgrades to be
1108        // added to `command_tx`, so we need to apply those in a loop.
1109        //
1110        // TODO(teskje): Make `Command` an enum and assert that all received commands are read
1111        // hold downgrades.
1112        while let Ok(cmd) = command_rx.try_recv() {
1113            cmd(self);
1114        }
1115
1116        // Collections might have been dropped but not cleaned up yet.
1117        self.cleanup_collections();
1118
1119        let stray_replicas: Vec<_> = self.replicas.keys().collect();
1120        soft_assert_or_log!(
1121            stray_replicas.is_empty(),
1122            "dropped instance still has provisioned replicas: {stray_replicas:?}",
1123        );
1124
1125        let collections = self.collections.iter();
1126        let stray_collections: Vec<_> = collections
1127            .filter(|(_, c)| !c.log_collection)
1128            .map(|(id, _)| id)
1129            .collect();
1130        soft_assert_or_log!(
1131            stray_collections.is_empty(),
1132            "dropped instance still has installed collections: {stray_collections:?}",
1133        );
1134    }
1135
1136    /// Sends a command to all replicas of this instance.
1137    #[mz_ore::instrument(level = "debug")]
1138    fn send(&mut self, cmd: ComputeCommand<T>) {
1139        // Record the command so that new replicas can be brought up to speed.
1140        self.history.push(cmd.clone());
1141
1142        // Clone the command for each active replica.
1143        for replica in self.replicas.values_mut() {
1144            // Swallow error, we'll notice because the replica task has stopped.
1145            let _ = replica.client.send(cmd.clone());
1146        }
1147    }
1148
1149    /// Add a new instance replica, by ID.
1150    #[mz_ore::instrument(level = "debug")]
1151    pub fn add_replica(
1152        &mut self,
1153        id: ReplicaId,
1154        mut config: ReplicaConfig,
1155    ) -> Result<(), ReplicaExists> {
1156        if self.replica_exists(id) {
1157            return Err(ReplicaExists(id));
1158        }
1159
1160        config.logging.index_logs = self.log_sources.clone();
1161
1162        let replica_epoch = self.replica_epochs.entry(id).or_default();
1163        *replica_epoch += 1;
1164        let metrics = self.metrics.for_replica(id);
1165        let epoch = ClusterStartupEpoch::new(self.envd_epoch, *replica_epoch);
1166        let client = ReplicaClient::spawn(
1167            id,
1168            self.build_info,
1169            config.clone(),
1170            epoch,
1171            metrics.clone(),
1172            Arc::clone(&self.dyncfg),
1173            self.replica_tx.clone(),
1174        );
1175
1176        // Take this opportunity to clean up the history we should present.
1177        self.history.reduce();
1178
1179        // Advance the uppers of source imports
1180        self.history.update_source_uppers(&self.storage_collections);
1181
1182        // Replay the commands at the client, creating new dataflow identifiers.
1183        for command in self.history.iter() {
1184            if client.send(command.clone()).is_err() {
1185                // We swallow the error here. On the next send, we will fail again, and
1186                // restart the connection as well as this rehydration.
1187                tracing::warn!("Replica {:?} connection terminated during hydration", id);
1188                break;
1189            }
1190        }
1191
1192        // Add replica to tracked state.
1193        self.add_replica_state(id, client, config, epoch);
1194
1195        Ok(())
1196    }
1197
1198    /// Remove an existing instance replica, by ID.
1199    #[mz_ore::instrument(level = "debug")]
1200    pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1201        self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1202
1203        // Subscribes targeting this replica either won't be served anymore (if the replica is
1204        // dropped) or might produce inconsistent output (if the target collection is an
1205        // introspection index). We produce an error to inform upstream.
1206        let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1207        for subscribe_id in to_drop {
1208            let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1209            let response = ComputeControllerResponse::SubscribeResponse(
1210                subscribe_id,
1211                SubscribeBatch {
1212                    lower: subscribe.frontier.clone(),
1213                    upper: subscribe.frontier,
1214                    updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1215                },
1216            );
1217            self.deliver_response(response);
1218        }
1219
1220        // Peeks targeting this replica might not be served anymore (if the replica is dropped).
1221        // If the replica has failed it might come back and respond to the peek later, but it still
1222        // seems like a good idea to cancel the peek to inform the caller about the failure. This
1223        // is consistent with how we handle targeted subscribes above.
1224        let mut peek_responses = Vec::new();
1225        let mut to_drop = Vec::new();
1226        for (uuid, peek) in self.peeks_targeting(id) {
1227            peek_responses.push(ComputeControllerResponse::PeekNotification(
1228                uuid,
1229                PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1230                peek.otel_ctx.clone(),
1231            ));
1232            to_drop.push(uuid);
1233        }
1234        for response in peek_responses {
1235            self.deliver_response(response);
1236        }
1237        for uuid in to_drop {
1238            let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1239            self.finish_peek(uuid, response);
1240        }
1241
1242        Ok(())
1243    }
1244
1245    /// Rehydrate the given instance replica.
1246    ///
1247    /// # Panics
1248    ///
1249    /// Panics if the specified replica does not exist.
1250    fn rehydrate_replica(&mut self, id: ReplicaId) {
1251        let config = self.replicas[&id].config.clone();
1252        self.remove_replica(id).expect("replica must exist");
1253        let result = self.add_replica(id, config);
1254
1255        match result {
1256            Ok(()) => (),
1257            Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1258        }
1259    }
1260
1261    /// Rehydrate any failed replicas of this instance.
1262    fn rehydrate_failed_replicas(&mut self) {
1263        let replicas = self.replicas.iter();
1264        let failed_replicas: Vec<_> = replicas
1265            .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1266            .collect();
1267
1268        for replica_id in failed_replicas {
1269            self.rehydrate_replica(replica_id);
1270        }
1271    }
1272
1273    /// Creates the described dataflow and initializes state for its output.
1274    ///
1275    /// This method expects a `DataflowDescription` with an `as_of` frontier specified, as well as
1276    /// for each imported collection a read hold in `import_read_holds` at at least the `as_of`.
1277    ///
1278    /// If a `subscribe_target_replica` is given, any subscribes exported by the dataflow are
1279    /// configured to target that replica, i.e., only subscribe responses sent by that replica are
1280    /// considered.
1281    #[mz_ore::instrument(level = "debug")]
1282    pub fn create_dataflow(
1283        &mut self,
1284        dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1285        import_read_holds: Vec<ReadHold<T>>,
1286        subscribe_target_replica: Option<ReplicaId>,
1287        mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState<T>>,
1288    ) -> Result<(), DataflowCreationError> {
1289        use DataflowCreationError::*;
1290
1291        if let Some(replica_id) = subscribe_target_replica {
1292            if !self.replica_exists(replica_id) {
1293                return Err(ReplicaMissing(replica_id));
1294            }
1295        }
1296
1297        // Simple sanity checks around `as_of`
1298        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1299        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1300            return Err(EmptyAsOfForSubscribe);
1301        }
1302        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1303            return Err(EmptyAsOfForCopyTo);
1304        }
1305
1306        // Collect all dependencies of the dataflow, and read holds on them at the `as_of`.
1307        let mut storage_dependencies = BTreeMap::new();
1308        let mut compute_dependencies = BTreeMap::new();
1309
1310        // When we install per-replica input read holds, we cannot use the `as_of` because of
1311        // reconciliation: Existing slow replicas might be reading from the inputs at times before
1312        // the `as_of` and we would rather not crash them by allowing their inputs to compact too
1313        // far. So instead we take read holds at the least time available.
1314        let mut replica_input_read_holds = Vec::new();
1315
1316        let mut import_read_holds: BTreeMap<_, _> =
1317            import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1318
1319        for &id in dataflow.source_imports.keys() {
1320            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1321            replica_input_read_holds.push(read_hold.clone());
1322
1323            read_hold
1324                .try_downgrade(as_of.clone())
1325                .map_err(|_| ReadHoldInsufficient(id))?;
1326            storage_dependencies.insert(id, read_hold);
1327        }
1328
1329        for &id in dataflow.index_imports.keys() {
1330            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1331            read_hold
1332                .try_downgrade(as_of.clone())
1333                .map_err(|_| ReadHoldInsufficient(id))?;
1334            compute_dependencies.insert(id, read_hold);
1335        }
1336
1337        // If the `as_of` is empty, we are not going to create a dataflow, so replicas won't read
1338        // from the inputs.
1339        if as_of.is_empty() {
1340            replica_input_read_holds = Default::default();
1341        }
1342
1343        // Install collection state for each of the exports.
1344        for export_id in dataflow.export_ids() {
1345            let shared = shared_collection_state
1346                .remove(&export_id)
1347                .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1348            let write_only = dataflow.sink_exports.contains_key(&export_id);
1349            let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1350            self.add_collection(
1351                export_id,
1352                as_of.clone(),
1353                shared,
1354                storage_dependencies.clone(),
1355                compute_dependencies.clone(),
1356                replica_input_read_holds.clone(),
1357                write_only,
1358                storage_sink,
1359                dataflow.initial_storage_as_of.clone(),
1360                dataflow.refresh_schedule.clone(),
1361            );
1362
1363            // If the export is a storage sink, we can advance its write frontier to the write
1364            // frontier of the target storage collection.
1365            if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1366                self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1367            }
1368        }
1369
1370        // Initialize tracking of subscribes.
1371        for subscribe_id in dataflow.subscribe_ids() {
1372            self.subscribes
1373                .insert(subscribe_id, ActiveSubscribe::new(subscribe_target_replica));
1374        }
1375
1376        // Initialize tracking of copy tos.
1377        for copy_to_id in dataflow.copy_to_ids() {
1378            self.copy_tos.insert(copy_to_id);
1379        }
1380
1381        // Here we augment all imported sources and all exported sinks with the appropriate
1382        // storage metadata needed by the compute instance.
1383        let mut source_imports = BTreeMap::new();
1384        for (id, (si, monotonic, _upper)) in dataflow.source_imports {
1385            let frontiers = self
1386                .storage_collections
1387                .collection_frontiers(id)
1388                .expect("collection exists");
1389
1390            let collection_metadata = self
1391                .storage_collections
1392                .collection_metadata(id)
1393                .expect("we have a read hold on this collection");
1394
1395            let desc = SourceInstanceDesc {
1396                storage_metadata: collection_metadata.clone(),
1397                arguments: si.arguments,
1398                typ: si.typ.clone(),
1399            };
1400            source_imports.insert(id, (desc, monotonic, frontiers.write_frontier));
1401        }
1402
1403        let mut sink_exports = BTreeMap::new();
1404        for (id, se) in dataflow.sink_exports {
1405            let connection = match se.connection {
1406                ComputeSinkConnection::MaterializedView(conn) => {
1407                    let metadata = self
1408                        .storage_collections
1409                        .collection_metadata(id)
1410                        .map_err(|_| CollectionMissing(id))?
1411                        .clone();
1412                    let conn = MaterializedViewSinkConnection {
1413                        value_desc: conn.value_desc,
1414                        storage_metadata: metadata,
1415                    };
1416                    ComputeSinkConnection::MaterializedView(conn)
1417                }
1418                ComputeSinkConnection::ContinualTask(conn) => {
1419                    let metadata = self
1420                        .storage_collections
1421                        .collection_metadata(id)
1422                        .map_err(|_| DataflowCreationError::CollectionMissing(id))?
1423                        .clone();
1424                    let conn = ContinualTaskConnection {
1425                        input_id: conn.input_id,
1426                        storage_metadata: metadata,
1427                    };
1428                    ComputeSinkConnection::ContinualTask(conn)
1429                }
1430                ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1431                ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1432                    ComputeSinkConnection::CopyToS3Oneshot(conn)
1433                }
1434            };
1435            let desc = ComputeSinkDesc {
1436                from: se.from,
1437                from_desc: se.from_desc,
1438                connection,
1439                with_snapshot: se.with_snapshot,
1440                up_to: se.up_to,
1441                non_null_assertions: se.non_null_assertions,
1442                refresh_schedule: se.refresh_schedule,
1443            };
1444            sink_exports.insert(id, desc);
1445        }
1446
1447        // Flatten the dataflow plans into the representation expected by replicas.
1448        let objects_to_build = dataflow
1449            .objects_to_build
1450            .into_iter()
1451            .map(|object| BuildDesc {
1452                id: object.id,
1453                plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1454            })
1455            .collect();
1456
1457        let augmented_dataflow = DataflowDescription {
1458            source_imports,
1459            sink_exports,
1460            objects_to_build,
1461            // The rest of the fields are identical
1462            index_imports: dataflow.index_imports,
1463            index_exports: dataflow.index_exports,
1464            as_of: dataflow.as_of.clone(),
1465            until: dataflow.until,
1466            initial_storage_as_of: dataflow.initial_storage_as_of,
1467            refresh_schedule: dataflow.refresh_schedule,
1468            debug_name: dataflow.debug_name,
1469            time_dependence: dataflow.time_dependence,
1470        };
1471
1472        if augmented_dataflow.is_transient() {
1473            tracing::debug!(
1474                name = %augmented_dataflow.debug_name,
1475                import_ids = %augmented_dataflow.display_import_ids(),
1476                export_ids = %augmented_dataflow.display_export_ids(),
1477                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1478                until = ?augmented_dataflow.until.elements(),
1479                "creating dataflow",
1480            );
1481        } else {
1482            tracing::info!(
1483                name = %augmented_dataflow.debug_name,
1484                import_ids = %augmented_dataflow.display_import_ids(),
1485                export_ids = %augmented_dataflow.display_export_ids(),
1486                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1487                until = ?augmented_dataflow.until.elements(),
1488                "creating dataflow",
1489            );
1490        }
1491
1492        // Skip the actual dataflow creation for an empty `as_of`. (Happens e.g. for the
1493        // bootstrapping of a REFRESH AT mat view that is past its last refresh.)
1494        if as_of.is_empty() {
1495            tracing::info!(
1496                name = %augmented_dataflow.debug_name,
1497                "not sending `CreateDataflow`, because of empty `as_of`",
1498            );
1499        } else {
1500            let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1501            let dataflow = Box::new(augmented_dataflow);
1502            self.send(ComputeCommand::CreateDataflow(dataflow));
1503
1504            for id in collections {
1505                self.maybe_schedule_collection(id);
1506            }
1507        }
1508
1509        Ok(())
1510    }
1511
1512    /// Schedule the identified collection if all its inputs are available.
1513    ///
1514    /// # Panics
1515    ///
1516    /// Panics if the identified collection does not exist.
1517    fn maybe_schedule_collection(&mut self, id: GlobalId) {
1518        let collection = self.expect_collection(id);
1519
1520        // Don't schedule collections twice.
1521        if collection.scheduled {
1522            return;
1523        }
1524
1525        let as_of = collection.read_frontier();
1526
1527        // If the collection has an empty `as_of`, it was either never installed on the replica or
1528        // has since been dropped. In either case the replica does not expect any commands for it.
1529        if as_of.is_empty() {
1530            return;
1531        }
1532
1533        let ready = if id.is_transient() {
1534            // Always schedule transient collections immediately. The assumption is that those are
1535            // created by interactive user commands and we want to schedule them as quickly as
1536            // possible. Inputs might not yet be available, but when they become available, we
1537            // don't need to wait for the controller to become aware and for the scheduling check
1538            // to run again.
1539            true
1540        } else {
1541            // Ignore self-dependencies. Any self-dependencies do not need to be
1542            // available at the as_of for the dataflow to make progress, so we
1543            // can ignore them here. At the moment, only continual tasks have
1544            // self-dependencies, but this logic is correct for any dataflow, so
1545            // we don't special case it to CTs.
1546            let not_self_dep = |x: &GlobalId| *x != id;
1547
1548            // Check dependency frontiers to determine if all inputs are
1549            // available. An input is available when its frontier is greater
1550            // than the `as_of`, i.e., all input data up to and including the
1551            // `as_of` has been sealed.
1552            let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1553            let compute_frontiers = compute_deps.map(|id| {
1554                let dep = &self.expect_collection(id);
1555                dep.write_frontier()
1556            });
1557
1558            let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1559            let storage_frontiers = self
1560                .storage_collections
1561                .collections_frontiers(storage_deps.collect())
1562                .expect("must exist");
1563            let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1564
1565            let ready = compute_frontiers
1566                .chain(storage_frontiers)
1567                .all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1568
1569            ready
1570        };
1571
1572        if ready {
1573            self.send(ComputeCommand::Schedule(id));
1574            let collection = self.expect_collection_mut(id);
1575            collection.scheduled = true;
1576        }
1577    }
1578
1579    /// Schedule any unscheduled collections that are ready.
1580    fn schedule_collections(&mut self) {
1581        let ids: Vec<_> = self.collections.keys().copied().collect();
1582        for id in ids {
1583            self.maybe_schedule_collection(id);
1584        }
1585    }
1586
1587    /// Drops the read capability for the given collections and allows their resources to be
1588    /// reclaimed.
1589    #[mz_ore::instrument(level = "debug")]
1590    pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1591        for id in &ids {
1592            let collection = self.collection_mut(*id)?;
1593
1594            // Mark the collection as dropped to allow it to be removed from the controller state.
1595            collection.dropped = true;
1596
1597            // Drop the implied and warmup read holds to announce that clients are not
1598            // interested in the collection anymore.
1599            collection.implied_read_hold.release();
1600            collection.warmup_read_hold.release();
1601
1602            // If the collection is a subscribe, stop tracking it. This ensures that the controller
1603            // ceases to produce `SubscribeResponse`s for this subscribe.
1604            self.subscribes.remove(id);
1605            // If the collection is a copy to, stop tracking it. This ensures that the controller
1606            // ceases to produce `CopyToResponse`s` for this copy to.
1607            self.copy_tos.remove(id);
1608        }
1609
1610        Ok(())
1611    }
1612
1613    /// Initiate a peek request for the contents of `id` at `timestamp`.
1614    #[mz_ore::instrument(level = "debug")]
1615    pub fn peek(
1616        &mut self,
1617        peek_target: PeekTarget,
1618        literal_constraints: Option<Vec<Row>>,
1619        uuid: Uuid,
1620        timestamp: T,
1621        finishing: RowSetFinishing,
1622        map_filter_project: mz_expr::SafeMfpPlan,
1623        mut read_hold: ReadHold<T>,
1624        target_replica: Option<ReplicaId>,
1625        peek_response_tx: oneshot::Sender<PeekResponse>,
1626    ) -> Result<(), PeekError> {
1627        use PeekError::*;
1628
1629        // Downgrade the provided read hold to the peek time.
1630        let target_id = peek_target.id();
1631        if read_hold.id() != target_id {
1632            return Err(ReadHoldIdMismatch(read_hold.id()));
1633        }
1634        read_hold
1635            .try_downgrade(Antichain::from_elem(timestamp.clone()))
1636            .map_err(|_| ReadHoldInsufficient(target_id))?;
1637
1638        if let Some(target) = target_replica {
1639            if !self.replica_exists(target) {
1640                return Err(ReplicaMissing(target));
1641            }
1642        }
1643
1644        let otel_ctx = OpenTelemetryContext::obtain();
1645        self.peeks.insert(
1646            uuid,
1647            PendingPeek {
1648                target_replica,
1649                // TODO(guswynn): can we just hold the `tracing::Span` here instead?
1650                otel_ctx: otel_ctx.clone(),
1651                requested_at: Instant::now(),
1652                read_hold,
1653                peek_response_tx,
1654                limit: finishing.limit.map(usize::cast_from),
1655                offset: finishing.offset,
1656            },
1657        );
1658
1659        let peek = Peek {
1660            literal_constraints,
1661            uuid,
1662            timestamp,
1663            finishing,
1664            map_filter_project,
1665            // Obtain an `OpenTelemetryContext` from the thread-local tracing
1666            // tree to forward it on to the compute worker.
1667            otel_ctx,
1668            target: peek_target,
1669        };
1670        self.send(ComputeCommand::Peek(Box::new(peek)));
1671
1672        Ok(())
1673    }
1674
1675    /// Cancels an existing peek request.
1676    #[mz_ore::instrument(level = "debug")]
1677    pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1678        let Some(peek) = self.peeks.get_mut(&uuid) else {
1679            tracing::warn!("did not find pending peek for {uuid}");
1680            return;
1681        };
1682
1683        let duration = peek.requested_at.elapsed();
1684        self.metrics
1685            .observe_peek_response(&PeekResponse::Canceled, duration);
1686
1687        // Enqueue a notification for the cancellation.
1688        let otel_ctx = peek.otel_ctx.clone();
1689        otel_ctx.attach_as_parent();
1690
1691        self.deliver_response(ComputeControllerResponse::PeekNotification(
1692            uuid,
1693            PeekNotification::Canceled,
1694            otel_ctx,
1695        ));
1696
1697        // Finish the peek.
1698        // This will also propagate the cancellation to the replicas.
1699        self.finish_peek(uuid, reason);
1700    }
1701
1702    /// Assigns a read policy to specific identifiers.
1703    ///
1704    /// The policies are assigned in the order presented, and repeated identifiers should
1705    /// conclude with the last policy. Changing a policy will immediately downgrade the read
1706    /// capability if appropriate, but it will not "recover" the read capability if the prior
1707    /// capability is already ahead of it.
1708    ///
1709    /// Identifiers not present in `policies` retain their existing read policies.
1710    ///
1711    /// It is an error to attempt to set a read policy for a collection that is not readable in the
1712    /// context of compute. At this time, only indexes are readable compute collections.
1713    #[mz_ore::instrument(level = "debug")]
1714    pub fn set_read_policy(
1715        &mut self,
1716        policies: Vec<(GlobalId, ReadPolicy<T>)>,
1717    ) -> Result<(), ReadPolicyError> {
1718        // Do error checking upfront, to avoid introducing inconsistencies between a collection's
1719        // `implied_capability` and `read_capabilities`.
1720        for (id, _policy) in &policies {
1721            let collection = self.collection(*id)?;
1722            if collection.read_policy.is_none() {
1723                return Err(ReadPolicyError::WriteOnlyCollection(*id));
1724            }
1725        }
1726
1727        for (id, new_policy) in policies {
1728            let collection = self.expect_collection_mut(id);
1729            let new_since = new_policy.frontier(collection.write_frontier().borrow());
1730            let _ = collection.implied_read_hold.try_downgrade(new_since);
1731            collection.read_policy = Some(new_policy);
1732        }
1733
1734        Ok(())
1735    }
1736
1737    /// Advance the global write frontier of the given collection.
1738    ///
1739    /// Frontier regressions are gracefully ignored.
1740    ///
1741    /// # Panics
1742    ///
1743    /// Panics if the identified collection does not exist.
1744    #[mz_ore::instrument(level = "debug")]
1745    fn maybe_update_global_write_frontier(&mut self, id: GlobalId, new_frontier: Antichain<T>) {
1746        let collection = self.expect_collection_mut(id);
1747
1748        let advanced = collection.shared.lock_write_frontier(|f| {
1749            let advanced = PartialOrder::less_than(f, &new_frontier);
1750            if advanced {
1751                f.clone_from(&new_frontier);
1752            }
1753            advanced
1754        });
1755
1756        if !advanced {
1757            return;
1758        }
1759
1760        // Relax the implied read hold according to the read policy.
1761        let new_since = match &collection.read_policy {
1762            Some(read_policy) => {
1763                // For readable collections the read frontier is determined by applying the
1764                // client-provided read policy to the write frontier.
1765                read_policy.frontier(new_frontier.borrow())
1766            }
1767            None => {
1768                // Write-only collections cannot be read within the context of the compute
1769                // controller, so their read frontier only controls the read holds taken on their
1770                // inputs. We can safely downgrade the input read holds to any time less than the
1771                // write frontier.
1772                //
1773                // Note that some write-only collections (continual tasks) need to observe changes
1774                // at their current write frontier during hydration. Thus, we cannot downgrade the
1775                // read frontier to the write frontier and instead step it back by one.
1776                Antichain::from_iter(
1777                    new_frontier
1778                        .iter()
1779                        .map(|t| t.step_back().unwrap_or_else(T::minimum)),
1780                )
1781            }
1782        };
1783        let _ = collection.implied_read_hold.try_downgrade(new_since);
1784
1785        // Report the frontier advancement.
1786        self.deliver_response(ComputeControllerResponse::FrontierUpper {
1787            id,
1788            upper: new_frontier,
1789        });
1790    }
1791
1792    /// Apply a collection read hold change.
1793    fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch<T>) {
1794        let Some(collection) = self.collections.get_mut(&id) else {
1795            soft_panic_or_log!(
1796                "read hold change for absent collection (id={id}, changes={update:?})"
1797            );
1798            return;
1799        };
1800
1801        let new_since = collection.shared.lock_read_capabilities(|caps| {
1802            // Sanity check to prevent corrupted `read_capabilities`, which can cause hard-to-debug
1803            // issues (usually stuck read frontiers).
1804            let read_frontier = caps.frontier();
1805            for (time, diff) in update.iter() {
1806                let count = caps.count_for(time) + diff;
1807                assert!(
1808                    count >= 0,
1809                    "invalid read capabilities update: negative capability \
1810             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1811                );
1812                assert!(
1813                    count == 0 || read_frontier.less_equal(time),
1814                    "invalid read capabilities update: frontier regression \
1815             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1816                );
1817            }
1818
1819            // Apply read capability updates and learn about resulting changes to the read
1820            // frontier.
1821            let changes = caps.update_iter(update.drain());
1822
1823            let changed = changes.count() > 0;
1824            changed.then(|| caps.frontier().to_owned())
1825        });
1826
1827        let Some(new_since) = new_since else {
1828            return; // read frontier did not change
1829        };
1830
1831        // Propagate read frontier update to dependencies.
1832        for read_hold in collection.compute_dependencies.values_mut() {
1833            read_hold
1834                .try_downgrade(new_since.clone())
1835                .expect("frontiers don't regress");
1836        }
1837        for read_hold in collection.storage_dependencies.values_mut() {
1838            read_hold
1839                .try_downgrade(new_since.clone())
1840                .expect("frontiers don't regress");
1841        }
1842
1843        // Produce `AllowCompaction` command.
1844        self.send(ComputeCommand::AllowCompaction {
1845            id,
1846            frontier: new_since,
1847        });
1848    }
1849
1850    /// Fulfills a registered peek and cleans up associated state.
1851    ///
1852    /// As part of this we:
1853    ///  * Send a `PeekResponse` through the peek's response channel.
1854    ///  * Emit a `CancelPeek` command to instruct replicas to stop spending resources on this
1855    ///    peek, and to allow the `ComputeCommandHistory` to reduce away the corresponding `Peek`
1856    ///    command.
1857    ///  * Remove the read hold for this peek, unblocking compaction that might have waited on it.
1858    fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1859        let Some(peek) = self.peeks.remove(&uuid) else {
1860            return;
1861        };
1862
1863        // The recipient might not be interested in the peek response anymore, which is fine.
1864        let _ = peek.peek_response_tx.send(response);
1865
1866        // NOTE: We need to send the `CancelPeek` command _before_ we release the peek's read hold
1867        // (by dropping it), to avoid the edge case that caused database-issues#4812.
1868        self.send(ComputeCommand::CancelPeek { uuid });
1869
1870        drop(peek.read_hold);
1871    }
1872
1873    /// Handles a response from a replica. Replica IDs are re-used across replica restarts, so we
1874    /// use the replica incarnation to drop stale responses.
1875    fn handle_response(&mut self, (replica_id, incarnation, response): ReplicaResponse<T>) {
1876        // Filter responses from non-existing or stale replicas.
1877        if self
1878            .replicas
1879            .get(&replica_id)
1880            .filter(|replica| replica.epoch.replica() == incarnation)
1881            .is_none()
1882        {
1883            return;
1884        }
1885
1886        // Invariant: the replica exists and has the expected incarnation.
1887
1888        match response {
1889            ComputeResponse::Frontiers(id, frontiers) => {
1890                self.handle_frontiers_response(id, frontiers, replica_id);
1891            }
1892            ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
1893                self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
1894            }
1895            ComputeResponse::CopyToResponse(id, response) => {
1896                self.handle_copy_to_response(id, response, replica_id);
1897            }
1898            ComputeResponse::SubscribeResponse(id, response) => {
1899                self.handle_subscribe_response(id, response, replica_id);
1900            }
1901            ComputeResponse::Status(response) => {
1902                self.handle_status_response(response, replica_id);
1903            }
1904        }
1905    }
1906
1907    /// Handle new frontiers, returning any compute response that needs to
1908    /// be sent to the client.
1909    fn handle_frontiers_response(
1910        &mut self,
1911        id: GlobalId,
1912        frontiers: FrontiersResponse<T>,
1913        replica_id: ReplicaId,
1914    ) {
1915        if !self.collections.contains_key(&id) {
1916            soft_panic_or_log!(
1917                "frontiers update for an unknown collection \
1918                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1919            );
1920            return;
1921        }
1922        let Some(replica) = self.replicas.get_mut(&replica_id) else {
1923            soft_panic_or_log!(
1924                "frontiers update for an unknown replica \
1925                 (replica_id={replica_id}, frontiers={frontiers:?})"
1926            );
1927            return;
1928        };
1929        let Some(replica_collection) = replica.collections.get_mut(&id) else {
1930            soft_panic_or_log!(
1931                "frontiers update for an unknown replica collection \
1932                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1933            );
1934            return;
1935        };
1936
1937        if let Some(new_frontier) = frontiers.input_frontier {
1938            replica_collection.update_input_frontier(new_frontier.clone());
1939        }
1940        if let Some(new_frontier) = frontiers.output_frontier {
1941            replica_collection.update_output_frontier(new_frontier.clone());
1942        }
1943        if let Some(new_frontier) = frontiers.write_frontier {
1944            replica_collection.update_write_frontier(new_frontier.clone());
1945            self.maybe_update_global_write_frontier(id, new_frontier);
1946        }
1947    }
1948
1949    #[mz_ore::instrument(level = "debug")]
1950    fn handle_peek_response(
1951        &mut self,
1952        uuid: Uuid,
1953        response: PeekResponse,
1954        otel_ctx: OpenTelemetryContext,
1955        replica_id: ReplicaId,
1956    ) {
1957        otel_ctx.attach_as_parent();
1958
1959        // We might not be tracking this peek anymore, because we have served a response already or
1960        // because it was canceled. If this is the case, we ignore the response.
1961        let Some(peek) = self.peeks.get(&uuid) else {
1962            return;
1963        };
1964
1965        // If the peek is targeting a replica, ignore responses from other replicas.
1966        let target_replica = peek.target_replica.unwrap_or(replica_id);
1967        if target_replica != replica_id {
1968            return;
1969        }
1970
1971        let duration = peek.requested_at.elapsed();
1972        self.metrics.observe_peek_response(&response, duration);
1973
1974        let notification = PeekNotification::new(&response, peek.offset, peek.limit);
1975        // NOTE: We use the `otel_ctx` from the response, not the pending peek, because we
1976        // currently want the parent to be whatever the compute worker did with this peek.
1977        self.deliver_response(ComputeControllerResponse::PeekNotification(
1978            uuid,
1979            notification,
1980            otel_ctx,
1981        ));
1982
1983        self.finish_peek(uuid, response)
1984    }
1985
1986    fn handle_copy_to_response(
1987        &mut self,
1988        sink_id: GlobalId,
1989        response: CopyToResponse,
1990        replica_id: ReplicaId,
1991    ) {
1992        // We might not be tracking this COPY TO because we have already returned a response
1993        // from one of the replicas. In that case, we ignore the response.
1994        if !self.copy_tos.remove(&sink_id) {
1995            return;
1996        }
1997
1998        let result = match response {
1999            CopyToResponse::RowCount(count) => Ok(count),
2000            CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
2001            // We should never get here: Replicas only drop copy to collections in response
2002            // to the controller allowing them to do so, and when the controller drops a
2003            // copy to it also removes it from the list of tracked copy_tos (see
2004            // [`Instance::drop_collections`]).
2005            CopyToResponse::Dropped => {
2006                tracing::error!(
2007                    %sink_id, %replica_id,
2008                    "received `Dropped` response for a tracked copy to",
2009                );
2010                return;
2011            }
2012        };
2013
2014        self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2015    }
2016
2017    fn handle_subscribe_response(
2018        &mut self,
2019        subscribe_id: GlobalId,
2020        response: SubscribeResponse<T>,
2021        replica_id: ReplicaId,
2022    ) {
2023        if !self.collections.contains_key(&subscribe_id) {
2024            soft_panic_or_log!(
2025                "received response for an unknown subscribe \
2026                 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2027            );
2028            return;
2029        }
2030        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2031            soft_panic_or_log!(
2032                "subscribe response for an unknown replica (replica_id={replica_id})"
2033            );
2034            return;
2035        };
2036        let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2037            soft_panic_or_log!(
2038                "subscribe response for an unknown replica collection \
2039                 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2040            );
2041            return;
2042        };
2043
2044        // Always apply replica write frontier updates. Even if the subscribe is not tracked
2045        // anymore, there might still be replicas reading from its inputs, so we need to track the
2046        // frontiers until all replicas have advanced to the empty one.
2047        let write_frontier = match &response {
2048            SubscribeResponse::Batch(batch) => batch.upper.clone(),
2049            SubscribeResponse::DroppedAt(_) => Antichain::new(),
2050        };
2051
2052        // For subscribes we downgrade all replica frontiers based on write frontiers. This should
2053        // be fine because the input and output frontier of a subscribe track its write frontier.
2054        // TODO(database-issues#4701): report subscribe frontiers through `Frontiers` responses
2055        replica_collection.update_write_frontier(write_frontier.clone());
2056        replica_collection.update_input_frontier(write_frontier.clone());
2057        replica_collection.update_output_frontier(write_frontier.clone());
2058
2059        // If the subscribe is not tracked, or targets a different replica, there is nothing to do.
2060        let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2061            return;
2062        };
2063        let replica_targeted = subscribe.target_replica.unwrap_or(replica_id) == replica_id;
2064        if !replica_targeted {
2065            return;
2066        }
2067
2068        // Apply a global frontier update.
2069        // If this is a replica-targeted subscribe, it is important that we advance the global
2070        // frontier only based on responses from the targeted replica. Otherwise, another replica
2071        // could advance to the empty frontier, making us drop the subscribe on the targeted
2072        // replica prematurely.
2073        self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2074
2075        match response {
2076            SubscribeResponse::Batch(batch) => {
2077                let upper = batch.upper;
2078                let mut updates = batch.updates;
2079
2080                // If this batch advances the subscribe's frontier, we emit all updates at times
2081                // greater or equal to the last frontier (to avoid emitting duplicate updates).
2082                if PartialOrder::less_than(&subscribe.frontier, &upper) {
2083                    let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2084
2085                    if upper.is_empty() {
2086                        // This subscribe cannot produce more data. Stop tracking it.
2087                        self.subscribes.remove(&subscribe_id);
2088                    } else {
2089                        // This subscribe can produce more data. Update our tracking of it.
2090                        self.subscribes.insert(subscribe_id, subscribe);
2091                    }
2092
2093                    if let Ok(updates) = updates.as_mut() {
2094                        updates.retain(|(time, _data, _diff)| lower.less_equal(time));
2095                    }
2096                    self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2097                        subscribe_id,
2098                        SubscribeBatch {
2099                            lower,
2100                            upper,
2101                            updates,
2102                        },
2103                    ));
2104                }
2105            }
2106            SubscribeResponse::DroppedAt(frontier) => {
2107                // We should never get here: Replicas only drop subscribe collections in response
2108                // to the controller allowing them to do so, and when the controller drops a
2109                // subscribe it also removes it from the list of tracked subscribes (see
2110                // [`Instance::drop_collections`]).
2111                tracing::error!(
2112                    %subscribe_id,
2113                    %replica_id,
2114                    frontier = ?frontier.elements(),
2115                    "received `DroppedAt` response for a tracked subscribe",
2116                );
2117                self.subscribes.remove(&subscribe_id);
2118            }
2119        }
2120    }
2121
2122    fn handle_status_response(&mut self, response: StatusResponse, replica_id: ReplicaId) {
2123        match response {
2124            StatusResponse::OperatorHydration(status) => {
2125                self.update_operator_hydration_status(replica_id, status)
2126            }
2127        }
2128    }
2129
2130    /// Downgrade the warmup capabilities of collections as much as possible.
2131    ///
2132    /// The only requirement we have for a collection's warmup capability is that it is for a time
2133    /// that is available in all of the collection's inputs. For each input the latest time that is
2134    /// the case for is `write_frontier - 1`. So the farthest we can downgrade a collection's
2135    /// warmup capability is the minimum of `write_frontier - 1` of all its inputs.
2136    ///
2137    /// This method expects to be periodically called as part of instance maintenance work.
2138    /// We would like to instead update the warmup capabilities synchronously in response to
2139    /// frontier updates of dependency collections, but that is not generally possible because we
2140    /// don't learn about frontier updates of storage collections synchronously. We could do
2141    /// synchronous updates for compute dependencies, but we refrain from doing for simplicity.
2142    fn downgrade_warmup_capabilities(&mut self) {
2143        let mut new_capabilities = BTreeMap::new();
2144        for (id, collection) in &self.collections {
2145            // For write-only collections that have advanced to the empty frontier, we can drop the
2146            // warmup capability entirely. There is no reason why we would need to hydrate those
2147            // collections again, so being able to warm them up is not useful.
2148            if collection.read_policy.is_none()
2149                && collection.shared.lock_write_frontier(|f| f.is_empty())
2150            {
2151                new_capabilities.insert(*id, Antichain::new());
2152                continue;
2153            }
2154
2155            let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2156                let collection = self.collections.get(&dep_id);
2157                collection.map(|c| c.write_frontier())
2158            });
2159            let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2160                let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2161                frontiers.map(|f| f.write_frontier)
2162            });
2163
2164            let mut new_capability = Antichain::new();
2165            for frontier in compute_frontiers.chain(storage_frontiers) {
2166                for time in frontier.iter() {
2167                    new_capability.insert(time.step_back().unwrap_or_else(|| time.clone()));
2168                }
2169            }
2170
2171            new_capabilities.insert(*id, new_capability);
2172        }
2173
2174        for (id, new_capability) in new_capabilities {
2175            let collection = self.expect_collection_mut(id);
2176            let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2177        }
2178    }
2179
2180    /// Process pending maintenance work.
2181    ///
2182    /// This method is invoked periodically by the global controller.
2183    /// It is a good place to perform maintenance work that arises from various controller state
2184    /// changes and that cannot conveniently be handled synchronously with those state changes.
2185    #[mz_ore::instrument(level = "debug")]
2186    pub fn maintain(&mut self) {
2187        self.rehydrate_failed_replicas();
2188        self.downgrade_warmup_capabilities();
2189        self.schedule_collections();
2190        self.cleanup_collections();
2191        self.update_frontier_introspection();
2192        self.refresh_state_metrics();
2193        self.refresh_wallclock_lag();
2194    }
2195}
2196
2197/// State maintained about individual compute collections.
2198///
2199/// A compute collection is either an index, or a storage sink, or a subscribe, exported by a
2200/// compute dataflow.
2201#[derive(Debug)]
2202struct CollectionState<T: ComputeControllerTimestamp> {
2203    /// Whether this collection is a log collection.
2204    ///
2205    /// Log collections are special in that they are only maintained by a subset of all replicas.
2206    log_collection: bool,
2207    /// Whether this collection has been dropped by a controller client.
2208    ///
2209    /// The controller is allowed to remove the `CollectionState` for a collection only when
2210    /// `dropped == true`. Otherwise, clients might still expect to be able to query information
2211    /// about this collection.
2212    dropped: bool,
2213    /// Whether this collection has been scheduled, i.e., the controller has sent a `Schedule`
2214    /// command for it.
2215    scheduled: bool,
2216
2217    /// State shared with the `ComputeController`.
2218    shared: SharedCollectionState<T>,
2219
2220    /// A read hold maintaining the implicit capability of the collection.
2221    ///
2222    /// This capability is kept to ensure that the collection remains readable according to its
2223    /// `read_policy`. It also ensures that read holds on the collection's dependencies are kept at
2224    /// some time not greater than the collection's `write_frontier`, guaranteeing that the
2225    /// collection's next outputs can always be computed without skipping times.
2226    implied_read_hold: ReadHold<T>,
2227    /// A read hold held to enable dataflow warmup.
2228    ///
2229    /// Dataflow warmup is an optimization that allows dataflows to immediately start hydrating
2230    /// even when their next output time (as implied by the `write_frontier`) is in the future.
2231    /// By installing a read capability derived from the write frontiers of the collection's
2232    /// inputs, we ensure that the as-of of new dataflows installed for the collection is at a time
2233    /// that is immediately available, so hydration can begin immediately too.
2234    warmup_read_hold: ReadHold<T>,
2235    /// The policy to use to downgrade `self.implied_read_hold`.
2236    ///
2237    /// If `None`, the collection is a write-only collection (i.e. a sink). For write-only
2238    /// collections, the `implied_read_hold` is only required for maintaining read holds on the
2239    /// inputs, so we can immediately downgrade it to the `write_frontier`.
2240    read_policy: Option<ReadPolicy<T>>,
2241
2242    /// Storage identifiers on which this collection depends, and read holds this collection
2243    /// requires on them.
2244    storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2245    /// Compute identifiers on which this collection depends, and read holds this collection
2246    /// requires on them.
2247    compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2248
2249    /// Introspection state associated with this collection.
2250    introspection: CollectionIntrospection<T>,
2251
2252    /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
2253    /// introspection update.
2254    ///
2255    /// Keys are `(period, lag, labels)` triples, values are counts.
2256    ///
2257    /// If this is `None`, wallclock lag is not tracked for this collection.
2258    wallclock_lag_histogram_stash: Option<
2259        BTreeMap<
2260            (
2261                WallclockLagHistogramPeriod,
2262                u64,
2263                BTreeMap<&'static str, String>,
2264            ),
2265            Diff,
2266        >,
2267    >,
2268}
2269
2270impl<T: ComputeControllerTimestamp> CollectionState<T> {
2271    /// Creates a new collection state, with an initial read policy valid from `since`.
2272    fn new(
2273        collection_id: GlobalId,
2274        as_of: Antichain<T>,
2275        shared: SharedCollectionState<T>,
2276        storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2277        compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2278        read_hold_tx: read_holds::ChangeTx<T>,
2279        introspection: CollectionIntrospection<T>,
2280    ) -> Self {
2281        // A collection is not readable before the `as_of`.
2282        let since = as_of.clone();
2283        // A collection won't produce updates for times before the `as_of`.
2284        let upper = as_of;
2285
2286        // Ensure that the provided `shared` is valid for the given `as_of`.
2287        assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2288        assert!(shared.lock_write_frontier(|f| f == &upper));
2289
2290        // Initialize collection read holds.
2291        // Note that the implied read hold was already added to the `read_capabilities` when
2292        // `shared` was created, so we only need to add the warmup read hold here.
2293        let implied_read_hold =
2294            ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2295        let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2296
2297        let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2298        shared.lock_read_capabilities(|c| {
2299            c.update_iter(updates);
2300        });
2301
2302        // In an effort to keep the produced wallclock lag introspection data small and
2303        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
2304        // select indexes and subscribes.
2305        let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2306            true => None,
2307            false => Some(Default::default()),
2308        };
2309
2310        Self {
2311            log_collection: false,
2312            dropped: false,
2313            scheduled: false,
2314            shared,
2315            implied_read_hold,
2316            warmup_read_hold,
2317            read_policy: Some(ReadPolicy::ValidFrom(since)),
2318            storage_dependencies,
2319            compute_dependencies,
2320            introspection,
2321            wallclock_lag_histogram_stash,
2322        }
2323    }
2324
2325    /// Creates a new collection state for a log collection.
2326    fn new_log_collection(
2327        id: GlobalId,
2328        shared: SharedCollectionState<T>,
2329        read_hold_tx: read_holds::ChangeTx<T>,
2330        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2331    ) -> Self {
2332        let since = Antichain::from_elem(T::minimum());
2333        let introspection =
2334            CollectionIntrospection::new(id, introspection_tx, since.clone(), false, None, None);
2335        let mut state = Self::new(
2336            id,
2337            since,
2338            shared,
2339            Default::default(),
2340            Default::default(),
2341            read_hold_tx,
2342            introspection,
2343        );
2344        state.log_collection = true;
2345        // Log collections are created and scheduled implicitly as part of replica initialization.
2346        state.scheduled = true;
2347        state
2348    }
2349
2350    /// Reports the current read frontier.
2351    fn read_frontier(&self) -> Antichain<T> {
2352        self.shared
2353            .lock_read_capabilities(|c| c.frontier().to_owned())
2354    }
2355
2356    /// Reports the current write frontier.
2357    fn write_frontier(&self) -> Antichain<T> {
2358        self.shared.lock_write_frontier(|f| f.clone())
2359    }
2360
2361    fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2362        self.storage_dependencies.keys().copied()
2363    }
2364
2365    fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2366        self.compute_dependencies.keys().copied()
2367    }
2368
2369    /// Reports the IDs of the dependencies of this collection.
2370    fn dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2371        self.compute_dependency_ids()
2372            .chain(self.storage_dependency_ids())
2373    }
2374}
2375
2376/// Collection state shared with the `ComputeController`.
2377///
2378/// Having this allows certain controller APIs, such as `ComputeController::collection_frontiers`
2379/// and `ComputeController::acquire_read_hold` to be non-`async`. This comes at the cost of
2380/// complexity (by introducing shared mutable state) and performance (by introducing locking). We
2381/// should aim to reduce the amount of shared state over time, rather than expand it.
2382///
2383/// Note that [`SharedCollectionState`]s are initialized by the `ComputeController` prior to the
2384/// collection's creation in the [`Instance`]. This is to allow compute clients to query frontiers
2385/// and take new read holds immediately, without having to wait for the [`Instance`] to update.
2386#[derive(Clone, Debug)]
2387pub(super) struct SharedCollectionState<T> {
2388    /// Accumulation of read capabilities for the collection.
2389    ///
2390    /// This accumulation contains the capabilities held by all [`ReadHold`]s given out for the
2391    /// collection, including `implied_read_hold` and `warmup_read_hold`.
2392    ///
2393    /// NOTE: This field may only be modified by [`Instance::apply_read_hold_change`] and
2394    /// `ComputeController::acquire_read_hold`. Nobody else should modify read capabilities
2395    /// directly. Instead, collection users should manage read holds through [`ReadHold`] objects
2396    /// acquired through `ComputeController::acquire_read_hold`.
2397    ///
2398    /// TODO(teskje): Restructure the code to enforce the above in the type system.
2399    read_capabilities: Arc<Mutex<MutableAntichain<T>>>,
2400    /// The write frontier of this collection.
2401    write_frontier: Arc<Mutex<Antichain<T>>>,
2402}
2403
2404impl<T: Timestamp> SharedCollectionState<T> {
2405    pub fn new(as_of: Antichain<T>) -> Self {
2406        // A collection is not readable before the `as_of`.
2407        let since = as_of.clone();
2408        // A collection won't produce updates for times before the `as_of`.
2409        let upper = as_of;
2410
2411        // Initialize read capabilities to the `since`.
2412        // The is the implied read capability. The corresponding [`ReadHold`] is created in
2413        // [`CollectionState::new`].
2414        let mut read_capabilities = MutableAntichain::new();
2415        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2416
2417        Self {
2418            read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2419            write_frontier: Arc::new(Mutex::new(upper)),
2420        }
2421    }
2422
2423    pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2424    where
2425        F: FnOnce(&mut MutableAntichain<T>) -> R,
2426    {
2427        let mut caps = self.read_capabilities.lock().expect("poisoned");
2428        f(&mut *caps)
2429    }
2430
2431    pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2432    where
2433        F: FnOnce(&mut Antichain<T>) -> R,
2434    {
2435        let mut frontier = self.write_frontier.lock().expect("poisoned");
2436        f(&mut *frontier)
2437    }
2438}
2439
2440/// Manages certain introspection relations associated with a collection. Upon creation, it adds
2441/// rows to introspection relations. When dropped, it retracts its managed rows.
2442///
2443/// TODO: `ComputeDependencies` could be moved under this.
2444#[derive(Debug)]
2445struct CollectionIntrospection<T: ComputeControllerTimestamp> {
2446    /// The ID of the compute collection.
2447    collection_id: GlobalId,
2448    /// A channel through which introspection updates are delivered.
2449    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2450    /// Introspection state for `IntrospectionType::Frontiers`.
2451    ///
2452    /// `Some` if the collection does _not_ sink into a storage collection (i.e. is not an MV). If
2453    /// the collection sinks into storage, the storage controller reports its frontiers instead.
2454    frontiers: Option<FrontiersIntrospectionState<T>>,
2455    /// Introspection state for `IntrospectionType::ComputeMaterializedViewRefreshes`.
2456    ///
2457    /// `Some` if the collection is a REFRESH MV.
2458    refresh: Option<RefreshIntrospectionState<T>>,
2459}
2460
2461impl<T: ComputeControllerTimestamp> CollectionIntrospection<T> {
2462    fn new(
2463        collection_id: GlobalId,
2464        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2465        as_of: Antichain<T>,
2466        storage_sink: bool,
2467        initial_as_of: Option<Antichain<T>>,
2468        refresh_schedule: Option<RefreshSchedule>,
2469    ) -> Self {
2470        let refresh =
2471            match (refresh_schedule, initial_as_of) {
2472                (Some(refresh_schedule), Some(initial_as_of)) => Some(
2473                    RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2474                ),
2475                (refresh_schedule, _) => {
2476                    // If we have a `refresh_schedule`, then the collection is a MV, so we should also have
2477                    // an `initial_as_of`.
2478                    soft_assert_or_log!(
2479                        refresh_schedule.is_none(),
2480                        "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2481                    );
2482                    None
2483                }
2484            };
2485        let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2486
2487        let self_ = Self {
2488            collection_id,
2489            introspection_tx,
2490            frontiers,
2491            refresh,
2492        };
2493
2494        self_.report_initial_state();
2495        self_
2496    }
2497
2498    /// Reports the initial introspection state.
2499    fn report_initial_state(&self) {
2500        if let Some(frontiers) = &self.frontiers {
2501            let row = frontiers.row_for_collection(self.collection_id);
2502            let updates = vec![(row, Diff::ONE)];
2503            self.send(IntrospectionType::Frontiers, updates);
2504        }
2505
2506        if let Some(refresh) = &self.refresh {
2507            let row = refresh.row_for_collection(self.collection_id);
2508            let updates = vec![(row, Diff::ONE)];
2509            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2510        }
2511    }
2512
2513    /// Observe the given current collection frontiers and update the introspection state as
2514    /// necessary.
2515    fn observe_frontiers(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2516        self.update_frontier_introspection(read_frontier, write_frontier);
2517        self.update_refresh_introspection(write_frontier);
2518    }
2519
2520    fn update_frontier_introspection(
2521        &mut self,
2522        read_frontier: &Antichain<T>,
2523        write_frontier: &Antichain<T>,
2524    ) {
2525        let Some(frontiers) = &mut self.frontiers else {
2526            return;
2527        };
2528
2529        if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2530        {
2531            return; // no change
2532        };
2533
2534        let retraction = frontiers.row_for_collection(self.collection_id);
2535        frontiers.update(read_frontier, write_frontier);
2536        let insertion = frontiers.row_for_collection(self.collection_id);
2537        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2538        self.send(IntrospectionType::Frontiers, updates);
2539    }
2540
2541    fn update_refresh_introspection(&mut self, write_frontier: &Antichain<T>) {
2542        let Some(refresh) = &mut self.refresh else {
2543            return;
2544        };
2545
2546        let retraction = refresh.row_for_collection(self.collection_id);
2547        refresh.frontier_update(write_frontier);
2548        let insertion = refresh.row_for_collection(self.collection_id);
2549
2550        if retraction == insertion {
2551            return; // no change
2552        }
2553
2554        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2555        self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2556    }
2557
2558    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2559        // Failure to send means the `ComputeController` has been dropped and doesn't care about
2560        // introspection updates anymore.
2561        let _ = self.introspection_tx.send((introspection_type, updates));
2562    }
2563}
2564
2565impl<T: ComputeControllerTimestamp> Drop for CollectionIntrospection<T> {
2566    fn drop(&mut self) {
2567        // Retract collection frontiers.
2568        if let Some(frontiers) = &self.frontiers {
2569            let row = frontiers.row_for_collection(self.collection_id);
2570            let updates = vec![(row, Diff::MINUS_ONE)];
2571            self.send(IntrospectionType::Frontiers, updates);
2572        }
2573
2574        // Retract MV refresh state.
2575        if let Some(refresh) = &self.refresh {
2576            let retraction = refresh.row_for_collection(self.collection_id);
2577            let updates = vec![(retraction, Diff::MINUS_ONE)];
2578            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2579        }
2580    }
2581}
2582
2583#[derive(Debug)]
2584struct FrontiersIntrospectionState<T> {
2585    read_frontier: Antichain<T>,
2586    write_frontier: Antichain<T>,
2587}
2588
2589impl<T: ComputeControllerTimestamp> FrontiersIntrospectionState<T> {
2590    fn new(as_of: Antichain<T>) -> Self {
2591        Self {
2592            read_frontier: as_of.clone(),
2593            write_frontier: as_of,
2594        }
2595    }
2596
2597    /// Return a `Row` reflecting the current collection frontiers.
2598    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2599        let read_frontier = self
2600            .read_frontier
2601            .as_option()
2602            .map_or(Datum::Null, |ts| ts.clone().into());
2603        let write_frontier = self
2604            .write_frontier
2605            .as_option()
2606            .map_or(Datum::Null, |ts| ts.clone().into());
2607        Row::pack_slice(&[
2608            Datum::String(&collection_id.to_string()),
2609            read_frontier,
2610            write_frontier,
2611        ])
2612    }
2613
2614    /// Update the introspection state with the given new frontiers.
2615    fn update(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2616        if read_frontier != &self.read_frontier {
2617            self.read_frontier.clone_from(read_frontier);
2618        }
2619        if write_frontier != &self.write_frontier {
2620            self.write_frontier.clone_from(write_frontier);
2621        }
2622    }
2623}
2624
2625/// Information needed to compute introspection updates for a REFRESH materialized view when the
2626/// write frontier advances.
2627#[derive(Debug)]
2628struct RefreshIntrospectionState<T> {
2629    // Immutable properties of the MV
2630    refresh_schedule: RefreshSchedule,
2631    initial_as_of: Antichain<T>,
2632    // Refresh state
2633    next_refresh: Datum<'static>,           // Null or an MzTimestamp
2634    last_completed_refresh: Datum<'static>, // Null or an MzTimestamp
2635}
2636
2637impl<T> RefreshIntrospectionState<T> {
2638    /// Return a `Row` reflecting the current refresh introspection state.
2639    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2640        Row::pack_slice(&[
2641            Datum::String(&collection_id.to_string()),
2642            self.last_completed_refresh,
2643            self.next_refresh,
2644        ])
2645    }
2646}
2647
2648impl<T: ComputeControllerTimestamp> RefreshIntrospectionState<T> {
2649    /// Construct a new [`RefreshIntrospectionState`], and apply an initial `frontier_update()` at
2650    /// the `upper`.
2651    fn new(
2652        refresh_schedule: RefreshSchedule,
2653        initial_as_of: Antichain<T>,
2654        upper: &Antichain<T>,
2655    ) -> Self {
2656        let mut self_ = Self {
2657            refresh_schedule: refresh_schedule.clone(),
2658            initial_as_of: initial_as_of.clone(),
2659            next_refresh: Datum::Null,
2660            last_completed_refresh: Datum::Null,
2661        };
2662        self_.frontier_update(upper);
2663        self_
2664    }
2665
2666    /// Should be called whenever the write frontier of the collection advances. It updates the
2667    /// state that should be recorded in introspection relations, but doesn't send the updates yet.
2668    fn frontier_update(&mut self, write_frontier: &Antichain<T>) {
2669        if write_frontier.is_empty() {
2670            self.last_completed_refresh =
2671                if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2672                    last_refresh.into()
2673                } else {
2674                    // If there is no last refresh, then we have a `REFRESH EVERY`, in which case
2675                    // the saturating roundup puts a refresh at the maximum possible timestamp.
2676                    T::maximum().into()
2677                };
2678            self.next_refresh = Datum::Null;
2679        } else {
2680            if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2681                // We are before the first refresh.
2682                self.last_completed_refresh = Datum::Null;
2683                let initial_as_of = self.initial_as_of.as_option().expect(
2684                    "initial_as_of can't be [], because then there would be no refreshes at all",
2685                );
2686                let first_refresh = initial_as_of
2687                    .round_up(&self.refresh_schedule)
2688                    .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2689                soft_assert_or_log!(
2690                    first_refresh == *initial_as_of,
2691                    "initial_as_of should be set to the first refresh"
2692                );
2693                self.next_refresh = first_refresh.into();
2694            } else {
2695                // The first refresh has already happened.
2696                let write_frontier = write_frontier.as_option().expect("checked above");
2697                self.last_completed_refresh = write_frontier
2698                    .round_down_minus_1(&self.refresh_schedule)
2699                    .map_or_else(
2700                        || {
2701                            soft_panic_or_log!(
2702                                "rounding down should have returned the first refresh or later"
2703                            );
2704                            Datum::Null
2705                        },
2706                        |last_completed_refresh| last_completed_refresh.into(),
2707                    );
2708                self.next_refresh = write_frontier.clone().into();
2709            }
2710        }
2711    }
2712}
2713
2714/// A note of an outstanding peek response.
2715#[derive(Debug)]
2716struct PendingPeek<T: Timestamp> {
2717    /// For replica-targeted peeks, this specifies the replica whose response we should pass on.
2718    ///
2719    /// If this value is `None`, we pass on the first response.
2720    target_replica: Option<ReplicaId>,
2721    /// The OpenTelemetry context for this peek.
2722    otel_ctx: OpenTelemetryContext,
2723    /// The time at which the peek was requested.
2724    ///
2725    /// Used to track peek durations.
2726    requested_at: Instant,
2727    /// The read hold installed to serve this peek.
2728    read_hold: ReadHold<T>,
2729    /// The channel to send peek results.
2730    peek_response_tx: oneshot::Sender<PeekResponse>,
2731    /// An optional limit of the peek's result size.
2732    limit: Option<usize>,
2733    /// The offset into the peek's result.
2734    offset: usize,
2735}
2736
2737#[derive(Debug, Clone)]
2738struct ActiveSubscribe<T> {
2739    /// Current upper frontier of this subscribe.
2740    frontier: Antichain<T>,
2741    /// For replica-targeted subscribes, this specifies the replica whose responses we should pass on.
2742    ///
2743    /// If this value is `None`, we pass on the first response for each time slice.
2744    target_replica: Option<ReplicaId>,
2745}
2746
2747impl<T: ComputeControllerTimestamp> ActiveSubscribe<T> {
2748    fn new(target_replica: Option<ReplicaId>) -> Self {
2749        Self {
2750            frontier: Antichain::from_elem(T::minimum()),
2751            target_replica,
2752        }
2753    }
2754}
2755
2756/// State maintained about individual replicas.
2757#[derive(Debug)]
2758struct ReplicaState<T: ComputeControllerTimestamp> {
2759    /// The ID of the replica.
2760    id: ReplicaId,
2761    /// Client for the running replica task.
2762    client: ReplicaClient<T>,
2763    /// The replica configuration.
2764    config: ReplicaConfig,
2765    /// Replica metrics.
2766    metrics: ReplicaMetrics,
2767    /// A channel through which introspection updates are delivered.
2768    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2769    /// Per-replica collection state.
2770    collections: BTreeMap<GlobalId, ReplicaCollectionState<T>>,
2771    /// The epoch of the replica.
2772    epoch: ClusterStartupEpoch,
2773}
2774
2775impl<T: ComputeControllerTimestamp> ReplicaState<T> {
2776    fn new(
2777        id: ReplicaId,
2778        client: ReplicaClient<T>,
2779        config: ReplicaConfig,
2780        metrics: ReplicaMetrics,
2781        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2782        epoch: ClusterStartupEpoch,
2783    ) -> Self {
2784        Self {
2785            id,
2786            client,
2787            config,
2788            metrics,
2789            introspection_tx,
2790            epoch,
2791            collections: Default::default(),
2792        }
2793    }
2794
2795    /// Add a collection to the replica state.
2796    ///
2797    /// # Panics
2798    ///
2799    /// Panics if a collection with the same ID exists already.
2800    fn add_collection(
2801        &mut self,
2802        id: GlobalId,
2803        as_of: Antichain<T>,
2804        input_read_holds: Vec<ReadHold<T>>,
2805    ) {
2806        let metrics = self.metrics.for_collection(id);
2807        let introspection = ReplicaCollectionIntrospection::new(
2808            self.id,
2809            id,
2810            self.introspection_tx.clone(),
2811            as_of.clone(),
2812        );
2813        let mut state =
2814            ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
2815
2816        // In an effort to keep the produced wallclock lag introspection data small and
2817        // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
2818        // select indexes and subscribes.
2819        if id.is_transient() {
2820            state.wallclock_lag_max = None;
2821        }
2822
2823        if let Some(previous) = self.collections.insert(id, state) {
2824            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
2825        }
2826    }
2827
2828    /// Remove state for a collection.
2829    fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState<T>> {
2830        self.collections.remove(&id)
2831    }
2832
2833    /// Returns whether all replica frontiers of the given collection are empty.
2834    fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
2835        self.collections.get(&id).map_or(true, |c| {
2836            c.write_frontier.is_empty()
2837                && c.input_frontier.is_empty()
2838                && c.output_frontier.is_empty()
2839        })
2840    }
2841
2842    /// Returns the state of the [`ReplicaState`] formatted as JSON.
2843    ///
2844    /// The returned value is not guaranteed to be stable and may change at any point in time.
2845    #[mz_ore::instrument(level = "debug")]
2846    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2847        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
2848        // returned object as a tradeoff between usability and stability. `serde_json` will fail
2849        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
2850        // prevents a future unrelated change from silently breaking this method.
2851
2852        // Destructure `self` here so we don't forget to consider dumping newly added fields.
2853        let Self {
2854            id,
2855            client: _,
2856            config: _,
2857            metrics: _,
2858            introspection_tx: _,
2859            epoch,
2860            collections,
2861        } = self;
2862
2863        fn field(
2864            key: &str,
2865            value: impl Serialize,
2866        ) -> Result<(String, serde_json::Value), anyhow::Error> {
2867            let value = serde_json::to_value(value)?;
2868            Ok((key.to_string(), value))
2869        }
2870
2871        let collections: BTreeMap<_, _> = collections
2872            .iter()
2873            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
2874            .collect();
2875
2876        let map = serde_json::Map::from_iter([
2877            field("id", id.to_string())?,
2878            field("collections", collections)?,
2879            field("epoch", epoch)?,
2880        ]);
2881        Ok(serde_json::Value::Object(map))
2882    }
2883}
2884
2885#[derive(Debug)]
2886struct ReplicaCollectionState<T: ComputeControllerTimestamp> {
2887    /// The replica write frontier of this collection.
2888    ///
2889    /// See [`FrontiersResponse::write_frontier`].
2890    write_frontier: Antichain<T>,
2891    /// The replica input frontier of this collection.
2892    ///
2893    /// See [`FrontiersResponse::input_frontier`].
2894    input_frontier: Antichain<T>,
2895    /// The replica output frontier of this collection.
2896    ///
2897    /// See [`FrontiersResponse::output_frontier`].
2898    output_frontier: Antichain<T>,
2899
2900    /// Metrics tracked for this collection.
2901    ///
2902    /// If this is `None`, no metrics are collected.
2903    metrics: Option<ReplicaCollectionMetrics>,
2904    /// As-of frontier with which this collection was installed on the replica.
2905    as_of: Antichain<T>,
2906    /// Tracks introspection state for this collection.
2907    introspection: ReplicaCollectionIntrospection<T>,
2908    /// Read holds on storage inputs to this collection.
2909    ///
2910    /// These read holds are kept to ensure that the replica is able to read from storage inputs at
2911    /// all times it hasn't read yet. We only need to install read holds for storage inputs since
2912    /// compaction of compute inputs is implicitly held back by Timely/DD.
2913    input_read_holds: Vec<ReadHold<T>>,
2914
2915    /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
2916    ///
2917    /// If this is `None`, wallclock lag is not tracked for this collection.
2918    wallclock_lag_max: Option<Duration>,
2919}
2920
2921impl<T: ComputeControllerTimestamp> ReplicaCollectionState<T> {
2922    fn new(
2923        metrics: Option<ReplicaCollectionMetrics>,
2924        as_of: Antichain<T>,
2925        introspection: ReplicaCollectionIntrospection<T>,
2926        input_read_holds: Vec<ReadHold<T>>,
2927    ) -> Self {
2928        Self {
2929            write_frontier: as_of.clone(),
2930            input_frontier: as_of.clone(),
2931            output_frontier: as_of.clone(),
2932            metrics,
2933            as_of,
2934            introspection,
2935            input_read_holds,
2936            wallclock_lag_max: Some(Duration::ZERO),
2937        }
2938    }
2939
2940    /// Returns whether this collection is hydrated.
2941    fn hydrated(&self) -> bool {
2942        // If the observed frontier is greater than the collection's as-of, the collection has
2943        // produced some output and is therefore hydrated.
2944        //
2945        // We need to consider the edge case where the as-of is the empty frontier. Such an as-of
2946        // is not useful for indexes, because they wouldn't be readable. For write-only
2947        // collections, an empty as-of means that the collection has been fully written and no new
2948        // dataflow needs to be created for it. Consequently, no hydration will happen either.
2949        //
2950        // Based on this, we could respond in two ways:
2951        //  * `false`, as in "the dataflow was never created"
2952        //  * `true`, as in "the dataflow completed immediately"
2953        //
2954        // Since hydration is often used as a measure of dataflow progress and we don't want to
2955        // give the impression that certain dataflows are somehow stuck when they are not, we go
2956        // with the second interpretation here.
2957        self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
2958    }
2959
2960    /// Updates the replica write frontier of this collection.
2961    fn update_write_frontier(&mut self, new_frontier: Antichain<T>) {
2962        if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
2963            soft_panic_or_log!(
2964                "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
2965                self.write_frontier,
2966            );
2967            return;
2968        } else if new_frontier == self.write_frontier {
2969            return;
2970        }
2971
2972        self.write_frontier = new_frontier;
2973    }
2974
2975    /// Updates the replica input frontier of this collection.
2976    fn update_input_frontier(&mut self, new_frontier: Antichain<T>) {
2977        if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
2978            soft_panic_or_log!(
2979                "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
2980                self.input_frontier,
2981            );
2982            return;
2983        } else if new_frontier == self.input_frontier {
2984            return;
2985        }
2986
2987        self.input_frontier = new_frontier;
2988
2989        // Relax our read holds on collection inputs.
2990        for read_hold in &mut self.input_read_holds {
2991            let result = read_hold.try_downgrade(self.input_frontier.clone());
2992            soft_assert_or_log!(
2993                result.is_ok(),
2994                "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
2995                self.input_frontier,
2996            );
2997        }
2998    }
2999
3000    /// Updates the replica output frontier of this collection.
3001    fn update_output_frontier(&mut self, new_frontier: Antichain<T>) {
3002        if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3003            soft_panic_or_log!(
3004                "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3005                self.output_frontier,
3006            );
3007            return;
3008        } else if new_frontier == self.output_frontier {
3009            return;
3010        }
3011
3012        self.output_frontier = new_frontier;
3013    }
3014}
3015
3016/// Maintains the introspection state for a given replica and collection, and ensures that reported
3017/// introspection data is retracted when the collection is dropped.
3018#[derive(Debug)]
3019struct ReplicaCollectionIntrospection<T: ComputeControllerTimestamp> {
3020    /// The ID of the replica.
3021    replica_id: ReplicaId,
3022    /// The ID of the compute collection.
3023    collection_id: GlobalId,
3024    /// Operator-level hydration state.
3025    /// (lir_id, worker_id) -> hydrated
3026    operators: BTreeMap<(LirId, usize), bool>,
3027    /// The collection's reported replica write frontier.
3028    write_frontier: Antichain<T>,
3029    /// A channel through which introspection updates are delivered.
3030    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3031}
3032
3033impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
3034    /// Create a new `HydrationState` and initialize introspection.
3035    fn new(
3036        replica_id: ReplicaId,
3037        collection_id: GlobalId,
3038        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3039        as_of: Antichain<T>,
3040    ) -> Self {
3041        let self_ = Self {
3042            replica_id,
3043            collection_id,
3044            operators: Default::default(),
3045            write_frontier: as_of,
3046            introspection_tx,
3047        };
3048
3049        self_.report_initial_state();
3050        self_
3051    }
3052
3053    /// Reports the initial introspection state.
3054    fn report_initial_state(&self) {
3055        let row = self.write_frontier_row();
3056        let updates = vec![(row, Diff::ONE)];
3057        self.send(IntrospectionType::ReplicaFrontiers, updates);
3058    }
3059
3060    /// Update the given (lir_id, worker_id) pair as hydrated.
3061    fn operator_hydrated(&mut self, lir_id: LirId, worker_id: usize, hydrated: bool) {
3062        let retraction = self.operator_hydration_row(lir_id, worker_id);
3063        self.operators.insert((lir_id, worker_id), hydrated);
3064        let insertion = self.operator_hydration_row(lir_id, worker_id);
3065
3066        if retraction == insertion {
3067            return; // no change
3068        }
3069
3070        let updates = retraction
3071            .map(|r| (r, Diff::MINUS_ONE))
3072            .into_iter()
3073            .chain(insertion.map(|r| (r, Diff::ONE)))
3074            .collect();
3075        self.send(IntrospectionType::ComputeOperatorHydrationStatus, updates);
3076    }
3077
3078    /// Return a `Row` reflecting the current hydration status of the identified operator.
3079    ///
3080    /// Returns `None` if the identified operator is not tracked.
3081    fn operator_hydration_row(&self, lir_id: LirId, worker_id: usize) -> Option<Row> {
3082        self.operators.get(&(lir_id, worker_id)).map(|hydrated| {
3083            Row::pack_slice(&[
3084                Datum::String(&self.collection_id.to_string()),
3085                Datum::UInt64(lir_id.into()),
3086                Datum::String(&self.replica_id.to_string()),
3087                Datum::UInt64(u64::cast_from(worker_id)),
3088                Datum::from(*hydrated),
3089            ])
3090        })
3091    }
3092
3093    /// Observe the given current write frontier and update the introspection state as necessary.
3094    fn observe_frontier(&mut self, write_frontier: &Antichain<T>) {
3095        if self.write_frontier == *write_frontier {
3096            return; // no change
3097        }
3098
3099        let retraction = self.write_frontier_row();
3100        self.write_frontier.clone_from(write_frontier);
3101        let insertion = self.write_frontier_row();
3102
3103        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3104        self.send(IntrospectionType::ReplicaFrontiers, updates);
3105    }
3106
3107    /// Return a `Row` reflecting the current replica write frontier.
3108    fn write_frontier_row(&self) -> Row {
3109        let write_frontier = self
3110            .write_frontier
3111            .as_option()
3112            .map_or(Datum::Null, |ts| ts.clone().into());
3113        Row::pack_slice(&[
3114            Datum::String(&self.collection_id.to_string()),
3115            Datum::String(&self.replica_id.to_string()),
3116            write_frontier,
3117        ])
3118    }
3119
3120    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3121        // Failure to send means the `ComputeController` has been dropped and doesn't care about
3122        // introspection updates anymore.
3123        let _ = self.introspection_tx.send((introspection_type, updates));
3124    }
3125}
3126
3127impl<T: ComputeControllerTimestamp> Drop for ReplicaCollectionIntrospection<T> {
3128    fn drop(&mut self) {
3129        // Retract operator hydration status.
3130        let operators: Vec<_> = self.operators.keys().collect();
3131        let updates: Vec<_> = operators
3132            .into_iter()
3133            .flat_map(|(lir_id, worker_id)| self.operator_hydration_row(*lir_id, *worker_id))
3134            .map(|r| (r, Diff::MINUS_ONE))
3135            .collect();
3136        if !updates.is_empty() {
3137            self.send(IntrospectionType::ComputeOperatorHydrationStatus, updates);
3138        }
3139
3140        // Retract the write frontier.
3141        let row = self.write_frontier_row();
3142        let updates = vec![(row, Diff::MINUS_ONE)];
3143        self.send(IntrospectionType::ReplicaFrontiers, updates);
3144    }
3145}