1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::ComputeInstanceId;
21use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
22use mz_compute_types::plan::render_plan::RenderPlan;
23use mz_compute_types::sinks::{
24    ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, MaterializedViewSinkConnection,
25};
26use mz_compute_types::sources::SourceInstanceDesc;
27use mz_controller_types::dyncfgs::{
28    ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, WALLCLOCK_LAG_RECORDING_INTERVAL,
29};
30use mz_dyncfg::ConfigSet;
31use mz_expr::RowSetFinishing;
32use mz_ore::cast::CastFrom;
33use mz_ore::channel::instrumented_unbounded_channel;
34use mz_ore::now::NowFn;
35use mz_ore::tracing::OpenTelemetryContext;
36use mz_ore::{soft_assert_or_log, soft_panic_or_log};
37use mz_persist_types::PersistLocation;
38use mz_repr::adt::timestamp::CheckedTimestamp;
39use mz_repr::refresh_schedule::RefreshSchedule;
40use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row};
41use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
42use mz_storage_types::read_holds::{self, ReadHold};
43use mz_storage_types::read_policy::ReadPolicy;
44use serde::Serialize;
45use thiserror::Error;
46use timely::PartialOrder;
47use timely::progress::frontier::MutableAntichain;
48use timely::progress::{Antichain, ChangeBatch, Timestamp};
49use tokio::sync::mpsc::error::SendError;
50use tokio::sync::{mpsc, oneshot};
51use uuid::Uuid;
52
53use crate::controller::error::{
54    CollectionLookupError, CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
55};
56use crate::controller::replica::{ReplicaClient, ReplicaConfig};
57use crate::controller::{
58    ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, PeekNotification,
59    ReplicaId, StorageCollections,
60};
61use crate::logging::LogVariant;
62use crate::metrics::IntCounter;
63use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
64use crate::protocol::command::{
65    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
66};
67use crate::protocol::history::ComputeCommandHistory;
68use crate::protocol::response::{
69    ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StatusResponse,
70    SubscribeBatch, SubscribeResponse,
71};
72
73#[derive(Error, Debug)]
74#[error("replica exists already: {0}")]
75pub(super) struct ReplicaExists(pub ReplicaId);
76
77#[derive(Error, Debug)]
78#[error("replica does not exist: {0}")]
79pub(super) struct ReplicaMissing(pub ReplicaId);
80
81#[derive(Error, Debug)]
82pub(super) enum DataflowCreationError {
83    #[error("collection does not exist: {0}")]
84    CollectionMissing(GlobalId),
85    #[error("replica does not exist: {0}")]
86    ReplicaMissing(ReplicaId),
87    #[error("dataflow definition lacks an as_of value")]
88    MissingAsOf,
89    #[error("subscribe dataflow has an empty as_of")]
90    EmptyAsOfForSubscribe,
91    #[error("copy to dataflow has an empty as_of")]
92    EmptyAsOfForCopyTo,
93    #[error("no read hold provided for dataflow import: {0}")]
94    ReadHoldMissing(GlobalId),
95    #[error("insufficient read hold provided for dataflow import: {0}")]
96    ReadHoldInsufficient(GlobalId),
97}
98
99impl From<CollectionMissing> for DataflowCreationError {
100    fn from(error: CollectionMissing) -> Self {
101        Self::CollectionMissing(error.0)
102    }
103}
104
105#[derive(Error, Debug)]
106pub(super) enum PeekError {
107    #[error("replica does not exist: {0}")]
108    ReplicaMissing(ReplicaId),
109    #[error("read hold ID does not match peeked collection: {0}")]
110    ReadHoldIdMismatch(GlobalId),
111    #[error("insufficient read hold provided: {0}")]
112    ReadHoldInsufficient(GlobalId),
113}
114
115#[derive(Error, Debug)]
116pub(super) enum ReadPolicyError {
117    #[error("collection does not exist: {0}")]
118    CollectionMissing(GlobalId),
119    #[error("collection is write-only: {0}")]
120    WriteOnlyCollection(GlobalId),
121}
122
123impl From<CollectionMissing> for ReadPolicyError {
124    fn from(error: CollectionMissing) -> Self {
125        Self::CollectionMissing(error.0)
126    }
127}
128
129pub type Command<T> = Box<dyn FnOnce(&mut Instance<T>) + Send>;
131
132#[derive(Clone, derivative::Derivative)]
134#[derivative(Debug)]
135pub(super) struct Client<T: ComputeControllerTimestamp> {
136    command_tx: mpsc::UnboundedSender<Command<T>>,
138    #[derivative(Debug = "ignore")]
140    read_hold_tx: read_holds::ChangeTx<T>,
141}
142
143impl<T: ComputeControllerTimestamp> Client<T> {
144    pub fn send(&self, command: Command<T>) -> Result<(), SendError<Command<T>>> {
145        self.command_tx.send(command)
146    }
147
148    pub fn read_hold_tx(&self) -> read_holds::ChangeTx<T> {
149        Arc::clone(&self.read_hold_tx)
150    }
151}
152
153impl<T> Client<T>
154where
155    T: ComputeControllerTimestamp,
156{
157    pub fn spawn(
158        id: ComputeInstanceId,
159        build_info: &'static BuildInfo,
160        storage: StorageCollections<T>,
161        peek_stash_persist_location: PersistLocation,
162        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
163        metrics: InstanceMetrics,
164        now: NowFn,
165        wallclock_lag: WallclockLagFn<T>,
166        dyncfg: Arc<ConfigSet>,
167        response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
168        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
169    ) -> Self {
170        let (command_tx, command_rx) = mpsc::unbounded_channel();
171
172        let read_hold_tx: read_holds::ChangeTx<_> = {
173            let command_tx = command_tx.clone();
174            Arc::new(move |id, change: ChangeBatch<_>| {
175                let cmd: Command<_> = {
176                    let change = change.clone();
177                    Box::new(move |i| i.apply_read_hold_change(id, change.clone()))
178                };
179                command_tx.send(cmd).map_err(|_| SendError((id, change)))
180            })
181        };
182
183        mz_ore::task::spawn(
184            || format!("compute-instance-{id}"),
185            Instance::new(
186                build_info,
187                storage,
188                peek_stash_persist_location,
189                arranged_logs,
190                metrics,
191                now,
192                wallclock_lag,
193                dyncfg,
194                command_rx,
195                response_tx,
196                Arc::clone(&read_hold_tx),
197                introspection_tx,
198            )
199            .run(),
200        );
201
202        Self {
203            command_tx,
204            read_hold_tx,
205        }
206    }
207}
208
209pub(super) type ReplicaResponse<T> = (ReplicaId, u64, ComputeResponse<T>);
212
213pub(super) struct Instance<T: ComputeControllerTimestamp> {
215    build_info: &'static BuildInfo,
217    storage_collections: StorageCollections<T>,
219    initialized: bool,
221    read_only: bool,
227    workload_class: Option<String>,
231    replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
233    collections: BTreeMap<GlobalId, CollectionState<T>>,
241    log_sources: BTreeMap<LogVariant, GlobalId>,
243    peeks: BTreeMap<Uuid, PendingPeek<T>>,
252    subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>,
266    copy_tos: BTreeSet<GlobalId>,
274    history: ComputeCommandHistory<UIntGauge, T>,
276    command_rx: mpsc::UnboundedReceiver<Command<T>>,
278    response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
280    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
282    metrics: InstanceMetrics,
284    dyncfg: Arc<ConfigSet>,
286
287    peek_stash_persist_location: PersistLocation,
289
290    now: NowFn,
292    wallclock_lag: WallclockLagFn<T>,
294    wallclock_lag_last_recorded: DateTime<Utc>,
296
297    read_hold_tx: read_holds::ChangeTx<T>,
302    replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
304    replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse<T>, IntCounter>,
306}
307
308impl<T: ComputeControllerTimestamp> Instance<T> {
309    fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, CollectionMissing> {
311        self.collections.get(&id).ok_or(CollectionMissing(id))
312    }
313
314    fn collection_mut(
316        &mut self,
317        id: GlobalId,
318    ) -> Result<&mut CollectionState<T>, CollectionMissing> {
319        self.collections.get_mut(&id).ok_or(CollectionMissing(id))
320    }
321
322    fn expect_collection(&self, id: GlobalId) -> &CollectionState<T> {
328        self.collections.get(&id).expect("collection must exist")
329    }
330
331    fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T> {
337        self.collections
338            .get_mut(&id)
339            .expect("collection must exist")
340    }
341
342    fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState<T>)> {
343        self.collections.iter().map(|(id, coll)| (*id, coll))
344    }
345
346    fn add_collection(
352        &mut self,
353        id: GlobalId,
354        as_of: Antichain<T>,
355        shared: SharedCollectionState<T>,
356        storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
357        compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
358        replica_input_read_holds: Vec<ReadHold<T>>,
359        write_only: bool,
360        storage_sink: bool,
361        initial_as_of: Option<Antichain<T>>,
362        refresh_schedule: Option<RefreshSchedule>,
363    ) {
364        let introspection = CollectionIntrospection::new(
366            id,
367            self.introspection_tx.clone(),
368            as_of.clone(),
369            storage_sink,
370            initial_as_of,
371            refresh_schedule,
372        );
373        let mut state = CollectionState::new(
374            id,
375            as_of.clone(),
376            shared,
377            storage_dependencies,
378            compute_dependencies,
379            Arc::clone(&self.read_hold_tx),
380            introspection,
381        );
382        if write_only {
384            state.read_policy = None;
385        }
386
387        if let Some(previous) = self.collections.insert(id, state) {
388            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
389        }
390
391        for replica in self.replicas.values_mut() {
393            replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
394        }
395
396        self.report_dependency_updates(id, Diff::ONE);
398    }
399
400    fn remove_collection(&mut self, id: GlobalId) {
401        self.report_dependency_updates(id, Diff::MINUS_ONE);
403
404        for replica in self.replicas.values_mut() {
406            replica.remove_collection(id);
407        }
408
409        self.collections.remove(&id);
411    }
412
413    fn add_replica_state(
414        &mut self,
415        id: ReplicaId,
416        client: ReplicaClient<T>,
417        config: ReplicaConfig,
418        epoch: u64,
419    ) {
420        let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
421
422        let metrics = self.metrics.for_replica(id);
423        let mut replica = ReplicaState::new(
424            id,
425            client,
426            config,
427            metrics,
428            self.introspection_tx.clone(),
429            epoch,
430        );
431
432        for (collection_id, collection) in &self.collections {
434            if collection.log_collection && !log_ids.contains(collection_id) {
436                continue;
437            }
438
439            let as_of = if collection.log_collection {
440                Antichain::from_elem(T::minimum())
445            } else {
446                collection.read_frontier().to_owned()
447            };
448
449            let input_read_holds = collection.storage_dependencies.values().cloned().collect();
450            replica.add_collection(*collection_id, as_of, input_read_holds);
451        }
452
453        self.replicas.insert(id, replica);
454    }
455
456    fn deliver_response(&self, response: ComputeControllerResponse<T>) {
458        let _ = self.response_tx.send(response);
461    }
462
463    fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
465        let _ = self.introspection_tx.send((type_, updates));
468    }
469
470    fn replica_exists(&self, id: ReplicaId) -> bool {
472        self.replicas.contains_key(&id)
473    }
474
475    fn peeks_targeting(
477        &self,
478        replica_id: ReplicaId,
479    ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)> {
480        self.peeks.iter().filter_map(move |(uuid, peek)| {
481            if peek.target_replica == Some(replica_id) {
482                Some((*uuid, peek))
483            } else {
484                None
485            }
486        })
487    }
488
489    fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
491        self.subscribes.iter().filter_map(move |(id, subscribe)| {
492            let targeting = subscribe.target_replica == Some(replica_id);
493            targeting.then_some(*id)
494        })
495    }
496
497    fn update_frontier_introspection(&mut self) {
506        for collection in self.collections.values_mut() {
507            collection
508                .introspection
509                .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
510        }
511
512        for replica in self.replicas.values_mut() {
513            for collection in replica.collections.values_mut() {
514                collection
515                    .introspection
516                    .observe_frontier(&collection.write_frontier);
517            }
518        }
519    }
520
521    fn refresh_state_metrics(&self) {
530        let unscheduled_collections_count =
531            self.collections.values().filter(|c| !c.scheduled).count();
532        let connected_replica_count = self
533            .replicas
534            .values()
535            .filter(|r| r.client.is_connected())
536            .count();
537
538        self.metrics
539            .replica_count
540            .set(u64::cast_from(self.replicas.len()));
541        self.metrics
542            .collection_count
543            .set(u64::cast_from(self.collections.len()));
544        self.metrics
545            .collection_unscheduled_count
546            .set(u64::cast_from(unscheduled_collections_count));
547        self.metrics
548            .peek_count
549            .set(u64::cast_from(self.peeks.len()));
550        self.metrics
551            .subscribe_count
552            .set(u64::cast_from(self.subscribes.len()));
553        self.metrics
554            .copy_to_count
555            .set(u64::cast_from(self.copy_tos.len()));
556        self.metrics
557            .connected_replica_count
558            .set(u64::cast_from(connected_replica_count));
559    }
560
561    fn refresh_wallclock_lag(&mut self) {
580        let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
581            Some(ts) => (self.wallclock_lag)(ts.clone()),
582            None => Duration::ZERO,
583        };
584
585        let now_ms = (self.now)();
586        let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
587        let histogram_labels = match &self.workload_class {
588            Some(wc) => [("workload_class", wc.clone())].into(),
589            None => BTreeMap::new(),
590        };
591
592        let mut unreadable_collections = BTreeSet::new();
596        for (id, collection) in &mut self.collections {
597            let read_frontier = match self.storage_collections.collection_frontiers(*id) {
599                Ok(f) => f.read_capabilities,
600                Err(_) => collection.read_frontier(),
601            };
602            let write_frontier = collection.write_frontier();
603            let collection_unreadable = PartialOrder::less_equal(&write_frontier, &read_frontier);
604            if collection_unreadable {
605                unreadable_collections.insert(id);
606            }
607
608            if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
609                let bucket = if collection_unreadable {
610                    WallclockLag::Undefined
611                } else {
612                    let lag = frontier_lag(&write_frontier);
613                    let lag = lag.as_secs().next_power_of_two();
614                    WallclockLag::Seconds(lag)
615                };
616
617                let key = (histogram_period, bucket, histogram_labels.clone());
618                *stash.entry(key).or_default() += Diff::ONE;
619            }
620        }
621
622        for replica in self.replicas.values_mut() {
624            for (id, collection) in &mut replica.collections {
625                let lag = if unreadable_collections.contains(&id) {
626                    WallclockLag::Undefined
627                } else {
628                    let lag = frontier_lag(&collection.write_frontier);
629                    WallclockLag::Seconds(lag.as_secs())
630                };
631
632                if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
633                    *wallclock_lag_max = (*wallclock_lag_max).max(lag);
634                }
635
636                if let Some(metrics) = &mut collection.metrics {
637                    let secs = lag.unwrap_seconds_or(u64::MAX);
640                    metrics.wallclock_lag.observe(secs);
641                };
642            }
643        }
644
645        self.maybe_record_wallclock_lag();
647    }
648
649    fn maybe_record_wallclock_lag(&mut self) {
657        if self.read_only {
658            return;
659        }
660
661        let duration_trunc = |datetime: DateTime<_>, interval| {
662            let td = TimeDelta::from_std(interval).ok()?;
663            datetime.duration_trunc(td).ok()
664        };
665
666        let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
667        let now_dt = mz_ore::now::to_datetime((self.now)());
668        let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
669            soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
670            let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
671            duration_trunc(now_dt, *default).unwrap()
672        });
673        if now_trunc <= self.wallclock_lag_last_recorded {
674            return;
675        }
676
677        let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
678
679        let mut history_updates = Vec::new();
680        for (replica_id, replica) in &mut self.replicas {
681            for (collection_id, collection) in &mut replica.collections {
682                let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
683                    continue;
684                };
685
686                let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
687                let row = Row::pack_slice(&[
688                    Datum::String(&collection_id.to_string()),
689                    Datum::String(&replica_id.to_string()),
690                    max_lag.into_interval_datum(),
691                    Datum::TimestampTz(now_ts),
692                ]);
693                history_updates.push((row, Diff::ONE));
694            }
695        }
696        if !history_updates.is_empty() {
697            self.deliver_introspection_updates(
698                IntrospectionType::WallclockLagHistory,
699                history_updates,
700            );
701        }
702
703        let mut histogram_updates = Vec::new();
704        let mut row_buf = Row::default();
705        for (collection_id, collection) in &mut self.collections {
706            let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
707                continue;
708            };
709
710            for ((period, lag, labels), count) in std::mem::take(stash) {
711                let mut packer = row_buf.packer();
712                packer.extend([
713                    Datum::TimestampTz(period.start),
714                    Datum::TimestampTz(period.end),
715                    Datum::String(&collection_id.to_string()),
716                    lag.into_uint64_datum(),
717                ]);
718                let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
719                packer.push_dict(labels);
720
721                histogram_updates.push((row_buf.clone(), count));
722            }
723        }
724        if !histogram_updates.is_empty() {
725            self.deliver_introspection_updates(
726                IntrospectionType::WallclockLagHistogram,
727                histogram_updates,
728            );
729        }
730
731        self.wallclock_lag_last_recorded = now_trunc;
732    }
733
734    fn report_dependency_updates(&self, id: GlobalId, diff: Diff) {
740        let collection = self.expect_collection(id);
741        let dependencies = collection.dependency_ids();
742
743        let updates = dependencies
744            .map(|dependency_id| {
745                let row = Row::pack_slice(&[
746                    Datum::String(&id.to_string()),
747                    Datum::String(&dependency_id.to_string()),
748                ]);
749                (row, diff)
750            })
751            .collect();
752
753        self.deliver_introspection_updates(IntrospectionType::ComputeDependencies, updates);
754    }
755
756    #[mz_ore::instrument(level = "debug")]
762    pub fn collection_hydrated(
763        &self,
764        collection_id: GlobalId,
765    ) -> Result<bool, CollectionLookupError> {
766        if self.replicas.is_empty() {
767            return Ok(true);
768        }
769
770        for replica_state in self.replicas.values() {
771            let collection_state = replica_state
772                .collections
773                .get(&collection_id)
774                .ok_or(CollectionLookupError::CollectionMissing(collection_id))?;
775
776            if collection_state.hydrated() {
777                return Ok(true);
778            }
779        }
780
781        Ok(false)
782    }
783
784    #[mz_ore::instrument(level = "debug")]
790    pub fn collections_hydrated_on_replicas(
791        &self,
792        target_replica_ids: Option<Vec<ReplicaId>>,
793        exclude_collections: &BTreeSet<GlobalId>,
794    ) -> Result<bool, HydrationCheckBadTarget> {
795        if self.replicas.is_empty() {
796            return Ok(true);
797        }
798        let mut all_hydrated = true;
799        let target_replicas: BTreeSet<ReplicaId> = self
800            .replicas
801            .keys()
802            .filter_map(|id| match target_replica_ids {
803                None => Some(id.clone()),
804                Some(ref ids) if ids.contains(id) => Some(id.clone()),
805                Some(_) => None,
806            })
807            .collect();
808        if let Some(targets) = target_replica_ids {
809            if target_replicas.is_empty() {
810                return Err(HydrationCheckBadTarget(targets));
811            }
812        }
813
814        for (id, _collection) in self.collections_iter() {
815            if id.is_transient() || exclude_collections.contains(&id) {
816                continue;
817            }
818
819            let mut collection_hydrated = false;
820            for replica_state in self.replicas.values() {
821                if !target_replicas.contains(&replica_state.id) {
822                    continue;
823                }
824                let collection_state = replica_state
825                    .collections
826                    .get(&id)
827                    .expect("missing collection state");
828
829                if collection_state.hydrated() {
830                    collection_hydrated = true;
831                    break;
832                }
833            }
834
835            if !collection_hydrated {
836                tracing::info!("collection {id} is not hydrated on any replica");
837                all_hydrated = false;
838                }
841        }
842
843        Ok(all_hydrated)
844    }
845
846    #[mz_ore::instrument(level = "debug")]
852    pub fn collections_hydrated(&self, exclude_collections: &BTreeSet<GlobalId>) -> bool {
853        self.collections_hydrated_on_replicas(None, exclude_collections)
854            .expect("Cannot error if target_replica_ids is None")
855    }
856
857    fn cleanup_collections(&mut self) {
873        let to_remove: Vec<_> = self
874            .collections_iter()
875            .filter(|(id, collection)| {
876                collection.dropped
877                    && collection.shared.lock_read_capabilities(|c| c.is_empty())
878                    && self
879                        .replicas
880                        .values()
881                        .all(|r| r.collection_frontiers_empty(*id))
882            })
883            .map(|(id, _collection)| id)
884            .collect();
885
886        for id in to_remove {
887            self.remove_collection(id);
888        }
889    }
890
891    #[mz_ore::instrument(level = "debug")]
895    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
896        let Self {
903            build_info: _,
904            storage_collections: _,
905            peek_stash_persist_location: _,
906            initialized,
907            read_only,
908            workload_class,
909            replicas,
910            collections,
911            log_sources: _,
912            peeks,
913            subscribes,
914            copy_tos,
915            history: _,
916            command_rx: _,
917            response_tx: _,
918            introspection_tx: _,
919            metrics: _,
920            dyncfg: _,
921            now: _,
922            wallclock_lag: _,
923            wallclock_lag_last_recorded,
924            read_hold_tx: _,
925            replica_tx: _,
926            replica_rx: _,
927        } = self;
928
929        fn field(
930            key: &str,
931            value: impl Serialize,
932        ) -> Result<(String, serde_json::Value), anyhow::Error> {
933            let value = serde_json::to_value(value)?;
934            Ok((key.to_string(), value))
935        }
936
937        let replicas: BTreeMap<_, _> = replicas
938            .iter()
939            .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
940            .collect::<Result<_, anyhow::Error>>()?;
941        let collections: BTreeMap<_, _> = collections
942            .iter()
943            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
944            .collect();
945        let peeks: BTreeMap<_, _> = peeks
946            .iter()
947            .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
948            .collect();
949        let subscribes: BTreeMap<_, _> = subscribes
950            .iter()
951            .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
952            .collect();
953        let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
954        let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
955
956        let map = serde_json::Map::from_iter([
957            field("initialized", initialized)?,
958            field("read_only", read_only)?,
959            field("workload_class", workload_class)?,
960            field("replicas", replicas)?,
961            field("collections", collections)?,
962            field("peeks", peeks)?,
963            field("subscribes", subscribes)?,
964            field("copy_tos", copy_tos)?,
965            field("wallclock_lag_last_recorded", wallclock_lag_last_recorded)?,
966        ]);
967        Ok(serde_json::Value::Object(map))
968    }
969}
970
971impl<T> Instance<T>
972where
973    T: ComputeControllerTimestamp,
974{
975    fn new(
976        build_info: &'static BuildInfo,
977        storage: StorageCollections<T>,
978        peek_stash_persist_location: PersistLocation,
979        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
980        metrics: InstanceMetrics,
981        now: NowFn,
982        wallclock_lag: WallclockLagFn<T>,
983        dyncfg: Arc<ConfigSet>,
984        command_rx: mpsc::UnboundedReceiver<Command<T>>,
985        response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
986        read_hold_tx: read_holds::ChangeTx<T>,
987        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
988    ) -> Self {
989        let mut collections = BTreeMap::new();
990        let mut log_sources = BTreeMap::new();
991        for (log, id, shared) in arranged_logs {
992            let collection = CollectionState::new_log_collection(
993                id,
994                shared,
995                Arc::clone(&read_hold_tx),
996                introspection_tx.clone(),
997            );
998            collections.insert(id, collection);
999            log_sources.insert(log, id);
1000        }
1001
1002        let history = ComputeCommandHistory::new(metrics.for_history());
1003
1004        let send_count = metrics.response_send_count.clone();
1005        let recv_count = metrics.response_recv_count.clone();
1006        let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
1007
1008        let now_dt = mz_ore::now::to_datetime(now());
1009
1010        Self {
1011            build_info,
1012            storage_collections: storage,
1013            peek_stash_persist_location,
1014            initialized: false,
1015            read_only: true,
1016            workload_class: None,
1017            replicas: Default::default(),
1018            collections,
1019            log_sources,
1020            peeks: Default::default(),
1021            subscribes: Default::default(),
1022            copy_tos: Default::default(),
1023            history,
1024            command_rx,
1025            response_tx,
1026            introspection_tx,
1027            metrics,
1028            dyncfg,
1029            now,
1030            wallclock_lag,
1031            wallclock_lag_last_recorded: now_dt,
1032            read_hold_tx,
1033            replica_tx,
1034            replica_rx,
1035        }
1036    }
1037
1038    async fn run(mut self) {
1039        self.send(ComputeCommand::Hello {
1040            nonce: Uuid::default(),
1043        });
1044
1045        let instance_config = InstanceConfig {
1046            peek_stash_persist_location: self.peek_stash_persist_location.clone(),
1047            logging: Default::default(),
1050            expiration_offset: Default::default(),
1051        };
1052
1053        self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
1054
1055        loop {
1056            tokio::select! {
1057                command = self.command_rx.recv() => match command {
1058                    Some(cmd) => cmd(&mut self),
1059                    None => break,
1060                },
1061                response = self.replica_rx.recv() => match response {
1062                    Some(response) => self.handle_response(response),
1063                    None => unreachable!("self owns a sender side of the channel"),
1064                }
1065            }
1066        }
1067    }
1068
1069    #[mz_ore::instrument(level = "debug")]
1071    pub fn update_configuration(&mut self, config_params: ComputeParameters) {
1072        if let Some(workload_class) = &config_params.workload_class {
1073            self.workload_class = workload_class.clone();
1074        }
1075
1076        let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
1077        self.send(command);
1078    }
1079
1080    #[mz_ore::instrument(level = "debug")]
1085    pub fn initialization_complete(&mut self) {
1086        if !self.initialized {
1088            self.send(ComputeCommand::InitializationComplete);
1089            self.initialized = true;
1090        }
1091    }
1092
1093    #[mz_ore::instrument(level = "debug")]
1097    pub fn allow_writes(&mut self) {
1098        if self.read_only {
1099            self.read_only = false;
1100            self.send(ComputeCommand::AllowWrites);
1101        }
1102    }
1103
1104    #[mz_ore::instrument(level = "debug")]
1115    pub fn shutdown(&mut self) {
1116        let (_tx, rx) = mpsc::unbounded_channel();
1118        let mut command_rx = std::mem::replace(&mut self.command_rx, rx);
1119
1120        while let Ok(cmd) = command_rx.try_recv() {
1126            cmd(self);
1127        }
1128
1129        self.cleanup_collections();
1131
1132        let stray_replicas: Vec<_> = self.replicas.keys().collect();
1133        soft_assert_or_log!(
1134            stray_replicas.is_empty(),
1135            "dropped instance still has provisioned replicas: {stray_replicas:?}",
1136        );
1137
1138        let collections = self.collections.iter();
1139        let stray_collections: Vec<_> = collections
1140            .filter(|(_, c)| !c.log_collection)
1141            .map(|(id, _)| id)
1142            .collect();
1143        soft_assert_or_log!(
1144            stray_collections.is_empty(),
1145            "dropped instance still has installed collections: {stray_collections:?}",
1146        );
1147    }
1148
1149    #[mz_ore::instrument(level = "debug")]
1151    fn send(&mut self, cmd: ComputeCommand<T>) {
1152        self.history.push(cmd.clone());
1154
1155        for replica in self.replicas.values_mut() {
1157            let _ = replica.client.send(cmd.clone());
1159        }
1160    }
1161
1162    #[mz_ore::instrument(level = "debug")]
1164    pub fn add_replica(
1165        &mut self,
1166        id: ReplicaId,
1167        mut config: ReplicaConfig,
1168        epoch: Option<u64>,
1169    ) -> Result<(), ReplicaExists> {
1170        if self.replica_exists(id) {
1171            return Err(ReplicaExists(id));
1172        }
1173
1174        config.logging.index_logs = self.log_sources.clone();
1175
1176        let epoch = epoch.unwrap_or(1);
1177        let metrics = self.metrics.for_replica(id);
1178        let client = ReplicaClient::spawn(
1179            id,
1180            self.build_info,
1181            config.clone(),
1182            epoch,
1183            metrics.clone(),
1184            Arc::clone(&self.dyncfg),
1185            self.replica_tx.clone(),
1186        );
1187
1188        self.history.reduce();
1190
1191        self.history.update_source_uppers(&self.storage_collections);
1193
1194        for command in self.history.iter() {
1196            if client.send(command.clone()).is_err() {
1197                tracing::warn!("Replica {:?} connection terminated during hydration", id);
1200                break;
1201            }
1202        }
1203
1204        self.add_replica_state(id, client, config, epoch);
1206
1207        Ok(())
1208    }
1209
1210    #[mz_ore::instrument(level = "debug")]
1212    pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1213        self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1214
1215        let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1219        for subscribe_id in to_drop {
1220            let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1221            let response = ComputeControllerResponse::SubscribeResponse(
1222                subscribe_id,
1223                SubscribeBatch {
1224                    lower: subscribe.frontier.clone(),
1225                    upper: subscribe.frontier,
1226                    updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1227                },
1228            );
1229            self.deliver_response(response);
1230        }
1231
1232        let mut peek_responses = Vec::new();
1237        let mut to_drop = Vec::new();
1238        for (uuid, peek) in self.peeks_targeting(id) {
1239            peek_responses.push(ComputeControllerResponse::PeekNotification(
1240                uuid,
1241                PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1242                peek.otel_ctx.clone(),
1243            ));
1244            to_drop.push(uuid);
1245        }
1246        for response in peek_responses {
1247            self.deliver_response(response);
1248        }
1249        for uuid in to_drop {
1250            let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1251            self.finish_peek(uuid, response);
1252        }
1253
1254        self.forward_implied_capabilities();
1257
1258        Ok(())
1259    }
1260
1261    fn rehydrate_replica(&mut self, id: ReplicaId) {
1267        let config = self.replicas[&id].config.clone();
1268        let epoch = self.replicas[&id].epoch + 1;
1269
1270        self.remove_replica(id).expect("replica must exist");
1271        let result = self.add_replica(id, config, Some(epoch));
1272
1273        match result {
1274            Ok(()) => (),
1275            Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1276        }
1277    }
1278
1279    fn rehydrate_failed_replicas(&mut self) {
1281        let replicas = self.replicas.iter();
1282        let failed_replicas: Vec<_> = replicas
1283            .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1284            .collect();
1285
1286        for replica_id in failed_replicas {
1287            self.rehydrate_replica(replica_id);
1288        }
1289    }
1290
1291    #[mz_ore::instrument(level = "debug")]
1300    pub fn create_dataflow(
1301        &mut self,
1302        dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1303        import_read_holds: Vec<ReadHold<T>>,
1304        subscribe_target_replica: Option<ReplicaId>,
1305        mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState<T>>,
1306    ) -> Result<(), DataflowCreationError> {
1307        use DataflowCreationError::*;
1308
1309        if let Some(replica_id) = subscribe_target_replica {
1310            if !self.replica_exists(replica_id) {
1311                return Err(ReplicaMissing(replica_id));
1312            }
1313        }
1314
1315        let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1317        if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1318            return Err(EmptyAsOfForSubscribe);
1319        }
1320        if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1321            return Err(EmptyAsOfForCopyTo);
1322        }
1323
1324        let mut storage_dependencies = BTreeMap::new();
1326        let mut compute_dependencies = BTreeMap::new();
1327
1328        let mut replica_input_read_holds = Vec::new();
1333
1334        let mut import_read_holds: BTreeMap<_, _> =
1335            import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1336
1337        for &id in dataflow.source_imports.keys() {
1338            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1339            replica_input_read_holds.push(read_hold.clone());
1340
1341            read_hold
1342                .try_downgrade(as_of.clone())
1343                .map_err(|_| ReadHoldInsufficient(id))?;
1344            storage_dependencies.insert(id, read_hold);
1345        }
1346
1347        for &id in dataflow.index_imports.keys() {
1348            let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1349            read_hold
1350                .try_downgrade(as_of.clone())
1351                .map_err(|_| ReadHoldInsufficient(id))?;
1352            compute_dependencies.insert(id, read_hold);
1353        }
1354
1355        if as_of.is_empty() {
1358            replica_input_read_holds = Default::default();
1359        }
1360
1361        for export_id in dataflow.export_ids() {
1363            let shared = shared_collection_state
1364                .remove(&export_id)
1365                .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1366            let write_only = dataflow.sink_exports.contains_key(&export_id);
1367            let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1368
1369            self.add_collection(
1370                export_id,
1371                as_of.clone(),
1372                shared,
1373                storage_dependencies.clone(),
1374                compute_dependencies.clone(),
1375                replica_input_read_holds.clone(),
1376                write_only,
1377                storage_sink,
1378                dataflow.initial_storage_as_of.clone(),
1379                dataflow.refresh_schedule.clone(),
1380            );
1381
1382            if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1385                self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1386            }
1387        }
1388
1389        for subscribe_id in dataflow.subscribe_ids() {
1391            self.subscribes
1392                .insert(subscribe_id, ActiveSubscribe::new(subscribe_target_replica));
1393        }
1394
1395        for copy_to_id in dataflow.copy_to_ids() {
1397            self.copy_tos.insert(copy_to_id);
1398        }
1399
1400        let mut source_imports = BTreeMap::new();
1403        for (id, (si, monotonic, _upper)) in dataflow.source_imports {
1404            let frontiers = self
1405                .storage_collections
1406                .collection_frontiers(id)
1407                .expect("collection exists");
1408
1409            let collection_metadata = self
1410                .storage_collections
1411                .collection_metadata(id)
1412                .expect("we have a read hold on this collection");
1413
1414            let desc = SourceInstanceDesc {
1415                storage_metadata: collection_metadata.clone(),
1416                arguments: si.arguments,
1417                typ: si.typ.clone(),
1418            };
1419            source_imports.insert(id, (desc, monotonic, frontiers.write_frontier));
1420        }
1421
1422        let mut sink_exports = BTreeMap::new();
1423        for (id, se) in dataflow.sink_exports {
1424            let connection = match se.connection {
1425                ComputeSinkConnection::MaterializedView(conn) => {
1426                    let metadata = self
1427                        .storage_collections
1428                        .collection_metadata(id)
1429                        .map_err(|_| CollectionMissing(id))?
1430                        .clone();
1431                    let conn = MaterializedViewSinkConnection {
1432                        value_desc: conn.value_desc,
1433                        storage_metadata: metadata,
1434                    };
1435                    ComputeSinkConnection::MaterializedView(conn)
1436                }
1437                ComputeSinkConnection::ContinualTask(conn) => {
1438                    let metadata = self
1439                        .storage_collections
1440                        .collection_metadata(id)
1441                        .map_err(|_| DataflowCreationError::CollectionMissing(id))?
1442                        .clone();
1443                    let conn = ContinualTaskConnection {
1444                        input_id: conn.input_id,
1445                        storage_metadata: metadata,
1446                    };
1447                    ComputeSinkConnection::ContinualTask(conn)
1448                }
1449                ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1450                ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1451                    ComputeSinkConnection::CopyToS3Oneshot(conn)
1452                }
1453            };
1454            let desc = ComputeSinkDesc {
1455                from: se.from,
1456                from_desc: se.from_desc,
1457                connection,
1458                with_snapshot: se.with_snapshot,
1459                up_to: se.up_to,
1460                non_null_assertions: se.non_null_assertions,
1461                refresh_schedule: se.refresh_schedule,
1462            };
1463            sink_exports.insert(id, desc);
1464        }
1465
1466        let objects_to_build = dataflow
1468            .objects_to_build
1469            .into_iter()
1470            .map(|object| BuildDesc {
1471                id: object.id,
1472                plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1473            })
1474            .collect();
1475
1476        let augmented_dataflow = DataflowDescription {
1477            source_imports,
1478            sink_exports,
1479            objects_to_build,
1480            index_imports: dataflow.index_imports,
1482            index_exports: dataflow.index_exports,
1483            as_of: dataflow.as_of.clone(),
1484            until: dataflow.until,
1485            initial_storage_as_of: dataflow.initial_storage_as_of,
1486            refresh_schedule: dataflow.refresh_schedule,
1487            debug_name: dataflow.debug_name,
1488            time_dependence: dataflow.time_dependence,
1489        };
1490
1491        if augmented_dataflow.is_transient() {
1492            tracing::debug!(
1493                name = %augmented_dataflow.debug_name,
1494                import_ids = %augmented_dataflow.display_import_ids(),
1495                export_ids = %augmented_dataflow.display_export_ids(),
1496                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1497                until = ?augmented_dataflow.until.elements(),
1498                "creating dataflow",
1499            );
1500        } else {
1501            tracing::info!(
1502                name = %augmented_dataflow.debug_name,
1503                import_ids = %augmented_dataflow.display_import_ids(),
1504                export_ids = %augmented_dataflow.display_export_ids(),
1505                as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1506                until = ?augmented_dataflow.until.elements(),
1507                "creating dataflow",
1508            );
1509        }
1510
1511        if as_of.is_empty() {
1514            tracing::info!(
1515                name = %augmented_dataflow.debug_name,
1516                "not sending `CreateDataflow`, because of empty `as_of`",
1517            );
1518        } else {
1519            let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1520            let dataflow = Box::new(augmented_dataflow);
1521            self.send(ComputeCommand::CreateDataflow(dataflow));
1522
1523            for id in collections {
1524                self.maybe_schedule_collection(id);
1525            }
1526        }
1527
1528        Ok(())
1529    }
1530
1531    fn maybe_schedule_collection(&mut self, id: GlobalId) {
1537        let collection = self.expect_collection(id);
1538
1539        if collection.scheduled {
1541            return;
1542        }
1543
1544        let as_of = collection.read_frontier();
1545
1546        if as_of.is_empty() {
1549            return;
1550        }
1551
1552        let ready = if id.is_transient() {
1553            true
1559        } else {
1560            let not_self_dep = |x: &GlobalId| *x != id;
1566
1567            let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1572            let compute_frontiers = compute_deps.map(|id| {
1573                let dep = &self.expect_collection(id);
1574                dep.write_frontier()
1575            });
1576
1577            let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1578            let storage_frontiers = self
1579                .storage_collections
1580                .collections_frontiers(storage_deps.collect())
1581                .expect("must exist");
1582            let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1583
1584            let ready = compute_frontiers
1585                .chain(storage_frontiers)
1586                .all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1587
1588            ready
1589        };
1590
1591        if ready {
1592            self.send(ComputeCommand::Schedule(id));
1593            let collection = self.expect_collection_mut(id);
1594            collection.scheduled = true;
1595        }
1596    }
1597
1598    fn schedule_collections(&mut self) {
1600        let ids: Vec<_> = self.collections.keys().copied().collect();
1601        for id in ids {
1602            self.maybe_schedule_collection(id);
1603        }
1604    }
1605
1606    #[mz_ore::instrument(level = "debug")]
1609    pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1610        for id in &ids {
1611            let collection = self.collection_mut(*id)?;
1612
1613            collection.dropped = true;
1615
1616            collection.implied_read_hold.release();
1619            collection.warmup_read_hold.release();
1620
1621            self.subscribes.remove(id);
1624            self.copy_tos.remove(id);
1627        }
1628
1629        Ok(())
1630    }
1631
1632    #[mz_ore::instrument(level = "debug")]
1634    pub fn peek(
1635        &mut self,
1636        peek_target: PeekTarget,
1637        literal_constraints: Option<Vec<Row>>,
1638        uuid: Uuid,
1639        timestamp: T,
1640        result_desc: RelationDesc,
1641        finishing: RowSetFinishing,
1642        map_filter_project: mz_expr::SafeMfpPlan,
1643        mut read_hold: ReadHold<T>,
1644        target_replica: Option<ReplicaId>,
1645        peek_response_tx: oneshot::Sender<PeekResponse>,
1646    ) -> Result<(), PeekError> {
1647        use PeekError::*;
1648
1649        let target_id = peek_target.id();
1651        if read_hold.id() != target_id {
1652            return Err(ReadHoldIdMismatch(read_hold.id()));
1653        }
1654        read_hold
1655            .try_downgrade(Antichain::from_elem(timestamp.clone()))
1656            .map_err(|_| ReadHoldInsufficient(target_id))?;
1657
1658        if let Some(target) = target_replica {
1659            if !self.replica_exists(target) {
1660                return Err(ReplicaMissing(target));
1661            }
1662        }
1663
1664        let otel_ctx = OpenTelemetryContext::obtain();
1665
1666        self.peeks.insert(
1667            uuid,
1668            PendingPeek {
1669                target_replica,
1670                otel_ctx: otel_ctx.clone(),
1672                requested_at: Instant::now(),
1673                read_hold,
1674                peek_response_tx,
1675                limit: finishing.limit.map(usize::cast_from),
1676                offset: finishing.offset,
1677            },
1678        );
1679
1680        let peek = Peek {
1681            literal_constraints,
1682            uuid,
1683            timestamp,
1684            finishing,
1685            map_filter_project,
1686            otel_ctx,
1689            target: peek_target,
1690            result_desc,
1691        };
1692        self.send(ComputeCommand::Peek(Box::new(peek)));
1693
1694        Ok(())
1695    }
1696
1697    #[mz_ore::instrument(level = "debug")]
1699    pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1700        let Some(peek) = self.peeks.get_mut(&uuid) else {
1701            tracing::warn!("did not find pending peek for {uuid}");
1702            return;
1703        };
1704
1705        let duration = peek.requested_at.elapsed();
1706        self.metrics
1707            .observe_peek_response(&PeekResponse::Canceled, duration);
1708
1709        let otel_ctx = peek.otel_ctx.clone();
1711        otel_ctx.attach_as_parent();
1712
1713        self.deliver_response(ComputeControllerResponse::PeekNotification(
1714            uuid,
1715            PeekNotification::Canceled,
1716            otel_ctx,
1717        ));
1718
1719        self.finish_peek(uuid, reason);
1722    }
1723
1724    #[mz_ore::instrument(level = "debug")]
1736    pub fn set_read_policy(
1737        &mut self,
1738        policies: Vec<(GlobalId, ReadPolicy<T>)>,
1739    ) -> Result<(), ReadPolicyError> {
1740        for (id, _policy) in &policies {
1743            let collection = self.collection(*id)?;
1744            if collection.read_policy.is_none() {
1745                return Err(ReadPolicyError::WriteOnlyCollection(*id));
1746            }
1747        }
1748
1749        for (id, new_policy) in policies {
1750            let collection = self.expect_collection_mut(id);
1751            let new_since = new_policy.frontier(collection.write_frontier().borrow());
1752            let _ = collection.implied_read_hold.try_downgrade(new_since);
1753            collection.read_policy = Some(new_policy);
1754        }
1755
1756        Ok(())
1757    }
1758
1759    #[mz_ore::instrument(level = "debug")]
1767    fn maybe_update_global_write_frontier(&mut self, id: GlobalId, new_frontier: Antichain<T>) {
1768        let collection = self.expect_collection_mut(id);
1769
1770        let advanced = collection.shared.lock_write_frontier(|f| {
1771            let advanced = PartialOrder::less_than(f, &new_frontier);
1772            if advanced {
1773                f.clone_from(&new_frontier);
1774            }
1775            advanced
1776        });
1777
1778        if !advanced {
1779            return;
1780        }
1781
1782        let new_since = match &collection.read_policy {
1784            Some(read_policy) => {
1785                read_policy.frontier(new_frontier.borrow())
1788            }
1789            None => {
1790                Antichain::from_iter(
1799                    new_frontier
1800                        .iter()
1801                        .map(|t| t.step_back().unwrap_or_else(T::minimum)),
1802                )
1803            }
1804        };
1805        let _ = collection.implied_read_hold.try_downgrade(new_since);
1806
1807        self.deliver_response(ComputeControllerResponse::FrontierUpper {
1809            id,
1810            upper: new_frontier,
1811        });
1812    }
1813
1814    fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch<T>) {
1816        let Some(collection) = self.collections.get_mut(&id) else {
1817            soft_panic_or_log!(
1818                "read hold change for absent collection (id={id}, changes={update:?})"
1819            );
1820            return;
1821        };
1822
1823        let new_since = collection.shared.lock_read_capabilities(|caps| {
1824            let read_frontier = caps.frontier();
1827            for (time, diff) in update.iter() {
1828                let count = caps.count_for(time) + diff;
1829                assert!(
1830                    count >= 0,
1831                    "invalid read capabilities update: negative capability \
1832             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1833                );
1834                assert!(
1835                    count == 0 || read_frontier.less_equal(time),
1836                    "invalid read capabilities update: frontier regression \
1837             (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1838                );
1839            }
1840
1841            let changes = caps.update_iter(update.drain());
1844
1845            let changed = changes.count() > 0;
1846            changed.then(|| caps.frontier().to_owned())
1847        });
1848
1849        let Some(new_since) = new_since else {
1850            return; };
1852
1853        for read_hold in collection.compute_dependencies.values_mut() {
1855            read_hold
1856                .try_downgrade(new_since.clone())
1857                .expect("frontiers don't regress");
1858        }
1859        for read_hold in collection.storage_dependencies.values_mut() {
1860            read_hold
1861                .try_downgrade(new_since.clone())
1862                .expect("frontiers don't regress");
1863        }
1864
1865        self.send(ComputeCommand::AllowCompaction {
1867            id,
1868            frontier: new_since,
1869        });
1870    }
1871
1872    fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1881        let Some(peek) = self.peeks.remove(&uuid) else {
1882            return;
1883        };
1884
1885        let _ = peek.peek_response_tx.send(response);
1887
1888        self.send(ComputeCommand::CancelPeek { uuid });
1891
1892        drop(peek.read_hold);
1893    }
1894
1895    fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse<T>) {
1898        if self
1900            .replicas
1901            .get(&replica_id)
1902            .filter(|replica| replica.epoch == epoch)
1903            .is_none()
1904        {
1905            return;
1906        }
1907
1908        match response {
1911            ComputeResponse::Frontiers(id, frontiers) => {
1912                self.handle_frontiers_response(id, frontiers, replica_id);
1913            }
1914            ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
1915                self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
1916            }
1917            ComputeResponse::CopyToResponse(id, response) => {
1918                self.handle_copy_to_response(id, response, replica_id);
1919            }
1920            ComputeResponse::SubscribeResponse(id, response) => {
1921                self.handle_subscribe_response(id, response, replica_id);
1922            }
1923            ComputeResponse::Status(response) => {
1924                self.handle_status_response(response, replica_id);
1925            }
1926        }
1927    }
1928
1929    fn handle_frontiers_response(
1932        &mut self,
1933        id: GlobalId,
1934        frontiers: FrontiersResponse<T>,
1935        replica_id: ReplicaId,
1936    ) {
1937        if !self.collections.contains_key(&id) {
1938            soft_panic_or_log!(
1939                "frontiers update for an unknown collection \
1940                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1941            );
1942            return;
1943        }
1944        let Some(replica) = self.replicas.get_mut(&replica_id) else {
1945            soft_panic_or_log!(
1946                "frontiers update for an unknown replica \
1947                 (replica_id={replica_id}, frontiers={frontiers:?})"
1948            );
1949            return;
1950        };
1951        let Some(replica_collection) = replica.collections.get_mut(&id) else {
1952            soft_panic_or_log!(
1953                "frontiers update for an unknown replica collection \
1954                 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1955            );
1956            return;
1957        };
1958
1959        if let Some(new_frontier) = frontiers.input_frontier {
1960            replica_collection.update_input_frontier(new_frontier.clone());
1961        }
1962        if let Some(new_frontier) = frontiers.output_frontier {
1963            replica_collection.update_output_frontier(new_frontier.clone());
1964        }
1965        if let Some(new_frontier) = frontiers.write_frontier {
1966            replica_collection.update_write_frontier(new_frontier.clone());
1967            self.maybe_update_global_write_frontier(id, new_frontier);
1968        }
1969    }
1970
1971    #[mz_ore::instrument(level = "debug")]
1972    fn handle_peek_response(
1973        &mut self,
1974        uuid: Uuid,
1975        response: PeekResponse,
1976        otel_ctx: OpenTelemetryContext,
1977        replica_id: ReplicaId,
1978    ) {
1979        otel_ctx.attach_as_parent();
1980
1981        let Some(peek) = self.peeks.get(&uuid) else {
1984            return;
1985        };
1986
1987        let target_replica = peek.target_replica.unwrap_or(replica_id);
1989        if target_replica != replica_id {
1990            return;
1991        }
1992
1993        let duration = peek.requested_at.elapsed();
1994        self.metrics.observe_peek_response(&response, duration);
1995
1996        let notification = PeekNotification::new(&response, peek.offset, peek.limit);
1997        self.deliver_response(ComputeControllerResponse::PeekNotification(
2000            uuid,
2001            notification,
2002            otel_ctx,
2003        ));
2004
2005        self.finish_peek(uuid, response)
2006    }
2007
2008    fn handle_copy_to_response(
2009        &mut self,
2010        sink_id: GlobalId,
2011        response: CopyToResponse,
2012        replica_id: ReplicaId,
2013    ) {
2014        if !self.collections.contains_key(&sink_id) {
2015            soft_panic_or_log!(
2016                "received response for an unknown copy-to \
2017                 (sink_id={sink_id}, replica_id={replica_id})",
2018            );
2019            return;
2020        }
2021        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2022            soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
2023            return;
2024        };
2025        let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
2026            soft_panic_or_log!(
2027                "copy-to response for an unknown replica collection \
2028                 (sink_id={sink_id}, replica_id={replica_id})"
2029            );
2030            return;
2031        };
2032
2033        replica_collection.update_write_frontier(Antichain::new());
2037        replica_collection.update_input_frontier(Antichain::new());
2038        replica_collection.update_output_frontier(Antichain::new());
2039
2040        if !self.copy_tos.remove(&sink_id) {
2043            return;
2044        }
2045
2046        let result = match response {
2047            CopyToResponse::RowCount(count) => Ok(count),
2048            CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
2049            CopyToResponse::Dropped => {
2054                tracing::error!(
2055                    %sink_id, %replica_id,
2056                    "received `Dropped` response for a tracked copy to",
2057                );
2058                return;
2059            }
2060        };
2061
2062        self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2063    }
2064
2065    fn handle_subscribe_response(
2066        &mut self,
2067        subscribe_id: GlobalId,
2068        response: SubscribeResponse<T>,
2069        replica_id: ReplicaId,
2070    ) {
2071        if !self.collections.contains_key(&subscribe_id) {
2072            soft_panic_or_log!(
2073                "received response for an unknown subscribe \
2074                 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2075            );
2076            return;
2077        }
2078        let Some(replica) = self.replicas.get_mut(&replica_id) else {
2079            soft_panic_or_log!(
2080                "subscribe response for an unknown replica (replica_id={replica_id})"
2081            );
2082            return;
2083        };
2084        let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2085            soft_panic_or_log!(
2086                "subscribe response for an unknown replica collection \
2087                 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2088            );
2089            return;
2090        };
2091
2092        let write_frontier = match &response {
2096            SubscribeResponse::Batch(batch) => batch.upper.clone(),
2097            SubscribeResponse::DroppedAt(_) => Antichain::new(),
2098        };
2099
2100        replica_collection.update_write_frontier(write_frontier.clone());
2104        replica_collection.update_input_frontier(write_frontier.clone());
2105        replica_collection.update_output_frontier(write_frontier.clone());
2106
2107        let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2109            return;
2110        };
2111        let replica_targeted = subscribe.target_replica.unwrap_or(replica_id) == replica_id;
2112        if !replica_targeted {
2113            return;
2114        }
2115
2116        self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2122
2123        match response {
2124            SubscribeResponse::Batch(batch) => {
2125                let upper = batch.upper;
2126                let mut updates = batch.updates;
2127
2128                if PartialOrder::less_than(&subscribe.frontier, &upper) {
2131                    let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2132
2133                    if upper.is_empty() {
2134                        self.subscribes.remove(&subscribe_id);
2136                    } else {
2137                        self.subscribes.insert(subscribe_id, subscribe);
2139                    }
2140
2141                    if let Ok(updates) = updates.as_mut() {
2142                        updates.retain(|(time, _data, _diff)| lower.less_equal(time));
2143                    }
2144                    self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2145                        subscribe_id,
2146                        SubscribeBatch {
2147                            lower,
2148                            upper,
2149                            updates,
2150                        },
2151                    ));
2152                }
2153            }
2154            SubscribeResponse::DroppedAt(frontier) => {
2155                tracing::error!(
2160                    %subscribe_id,
2161                    %replica_id,
2162                    frontier = ?frontier.elements(),
2163                    "received `DroppedAt` response for a tracked subscribe",
2164                );
2165                self.subscribes.remove(&subscribe_id);
2166            }
2167        }
2168    }
2169
2170    fn handle_status_response(&self, response: StatusResponse, _replica_id: ReplicaId) {
2171        match response {
2172            StatusResponse::Placeholder => {}
2173        }
2174    }
2175
2176    fn dependency_write_frontiers<'b>(
2178        &'b self,
2179        collection: &'b CollectionState<T>,
2180    ) -> impl Iterator<Item = Antichain<T>> + 'b {
2181        let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2182            let collection = self.collections.get(&dep_id);
2183            collection.map(|c| c.write_frontier())
2184        });
2185        let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2186            let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2187            frontiers.map(|f| f.write_frontier)
2188        });
2189
2190        compute_frontiers.chain(storage_frontiers)
2191    }
2192
2193    fn transitive_storage_dependency_write_frontiers<'b>(
2195        &'b self,
2196        collection: &'b CollectionState<T>,
2197    ) -> impl Iterator<Item = Antichain<T>> + 'b {
2198        let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2199        let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2200        let mut done = BTreeSet::new();
2201
2202        while let Some(id) = todo.pop() {
2203            if done.contains(&id) {
2204                continue;
2205            }
2206            if let Some(dep) = self.collections.get(&id) {
2207                storage_ids.extend(dep.storage_dependency_ids());
2208                todo.extend(dep.compute_dependency_ids())
2209            }
2210            done.insert(id);
2211        }
2212
2213        let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2214            let frontiers = self.storage_collections.collection_frontiers(id).ok();
2215            frontiers.map(|f| f.write_frontier)
2216        });
2217
2218        storage_frontiers
2219    }
2220
2221    fn downgrade_warmup_capabilities(&mut self) {
2234        let mut new_capabilities = BTreeMap::new();
2235        for (id, collection) in &self.collections {
2236            if collection.read_policy.is_none()
2240                && collection.shared.lock_write_frontier(|f| f.is_empty())
2241            {
2242                new_capabilities.insert(*id, Antichain::new());
2243                continue;
2244            }
2245
2246            let mut new_capability = Antichain::new();
2247            for frontier in self.dependency_write_frontiers(collection) {
2248                for time in frontier {
2249                    new_capability.insert(time.step_back().unwrap_or(time));
2250                }
2251            }
2252
2253            new_capabilities.insert(*id, new_capability);
2254        }
2255
2256        for (id, new_capability) in new_capabilities {
2257            let collection = self.expect_collection_mut(id);
2258            let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2259        }
2260    }
2261
2262    fn forward_implied_capabilities(&mut self) {
2290        if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2291            return;
2292        }
2293        if !self.replicas.is_empty() {
2294            return;
2295        }
2296
2297        let mut new_capabilities = BTreeMap::new();
2298        for (id, collection) in &self.collections {
2299            let Some(read_policy) = &collection.read_policy else {
2300                continue;
2302            };
2303
2304            let mut dep_frontier = Antichain::new();
2308            for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2309                dep_frontier.extend(frontier);
2310            }
2311
2312            let new_capability = read_policy.frontier(dep_frontier.borrow());
2313            if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2314                new_capabilities.insert(*id, new_capability);
2315            }
2316        }
2317
2318        for (id, new_capability) in new_capabilities {
2319            let collection = self.expect_collection_mut(id);
2320            let _ = collection.implied_read_hold.try_downgrade(new_capability);
2321        }
2322    }
2323
2324    #[mz_ore::instrument(level = "debug")]
2330    pub fn maintain(&mut self) {
2331        self.rehydrate_failed_replicas();
2332        self.downgrade_warmup_capabilities();
2333        self.forward_implied_capabilities();
2334        self.schedule_collections();
2335        self.cleanup_collections();
2336        self.update_frontier_introspection();
2337        self.refresh_state_metrics();
2338        self.refresh_wallclock_lag();
2339    }
2340}
2341
2342#[derive(Debug)]
2347struct CollectionState<T: ComputeControllerTimestamp> {
2348    log_collection: bool,
2352    dropped: bool,
2358    scheduled: bool,
2361
2362    shared: SharedCollectionState<T>,
2364
2365    implied_read_hold: ReadHold<T>,
2372    warmup_read_hold: ReadHold<T>,
2380    read_policy: Option<ReadPolicy<T>>,
2386
2387    storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2390    compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2393
2394    introspection: CollectionIntrospection<T>,
2396
2397    wallclock_lag_histogram_stash: Option<
2404        BTreeMap<
2405            (
2406                WallclockLagHistogramPeriod,
2407                WallclockLag,
2408                BTreeMap<&'static str, String>,
2409            ),
2410            Diff,
2411        >,
2412    >,
2413}
2414
2415impl<T: ComputeControllerTimestamp> CollectionState<T> {
2416    fn new(
2418        collection_id: GlobalId,
2419        as_of: Antichain<T>,
2420        shared: SharedCollectionState<T>,
2421        storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2422        compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2423        read_hold_tx: read_holds::ChangeTx<T>,
2424        introspection: CollectionIntrospection<T>,
2425    ) -> Self {
2426        let since = as_of.clone();
2428        let upper = as_of;
2430
2431        assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2433        assert!(shared.lock_write_frontier(|f| f == &upper));
2434
2435        let implied_read_hold =
2439            ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2440        let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2441
2442        let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2443        shared.lock_read_capabilities(|c| {
2444            c.update_iter(updates);
2445        });
2446
2447        let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2451            true => None,
2452            false => Some(Default::default()),
2453        };
2454
2455        Self {
2456            log_collection: false,
2457            dropped: false,
2458            scheduled: false,
2459            shared,
2460            implied_read_hold,
2461            warmup_read_hold,
2462            read_policy: Some(ReadPolicy::ValidFrom(since)),
2463            storage_dependencies,
2464            compute_dependencies,
2465            introspection,
2466            wallclock_lag_histogram_stash,
2467        }
2468    }
2469
2470    fn new_log_collection(
2472        id: GlobalId,
2473        shared: SharedCollectionState<T>,
2474        read_hold_tx: read_holds::ChangeTx<T>,
2475        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2476    ) -> Self {
2477        let since = Antichain::from_elem(T::minimum());
2478        let introspection =
2479            CollectionIntrospection::new(id, introspection_tx, since.clone(), false, None, None);
2480        let mut state = Self::new(
2481            id,
2482            since,
2483            shared,
2484            Default::default(),
2485            Default::default(),
2486            read_hold_tx,
2487            introspection,
2488        );
2489        state.log_collection = true;
2490        state.scheduled = true;
2492        state
2493    }
2494
2495    fn read_frontier(&self) -> Antichain<T> {
2497        self.shared
2498            .lock_read_capabilities(|c| c.frontier().to_owned())
2499    }
2500
2501    fn write_frontier(&self) -> Antichain<T> {
2503        self.shared.lock_write_frontier(|f| f.clone())
2504    }
2505
2506    fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2507        self.storage_dependencies.keys().copied()
2508    }
2509
2510    fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2511        self.compute_dependencies.keys().copied()
2512    }
2513
2514    fn dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2516        self.compute_dependency_ids()
2517            .chain(self.storage_dependency_ids())
2518    }
2519}
2520
2521#[derive(Clone, Debug)]
2532pub(super) struct SharedCollectionState<T> {
2533    read_capabilities: Arc<Mutex<MutableAntichain<T>>>,
2545    write_frontier: Arc<Mutex<Antichain<T>>>,
2547}
2548
2549impl<T: Timestamp> SharedCollectionState<T> {
2550    pub fn new(as_of: Antichain<T>) -> Self {
2551        let since = as_of.clone();
2553        let upper = as_of;
2555
2556        let mut read_capabilities = MutableAntichain::new();
2560        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2561
2562        Self {
2563            read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2564            write_frontier: Arc::new(Mutex::new(upper)),
2565        }
2566    }
2567
2568    pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2569    where
2570        F: FnOnce(&mut MutableAntichain<T>) -> R,
2571    {
2572        let mut caps = self.read_capabilities.lock().expect("poisoned");
2573        f(&mut *caps)
2574    }
2575
2576    pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2577    where
2578        F: FnOnce(&mut Antichain<T>) -> R,
2579    {
2580        let mut frontier = self.write_frontier.lock().expect("poisoned");
2581        f(&mut *frontier)
2582    }
2583}
2584
2585#[derive(Debug)]
2590struct CollectionIntrospection<T: ComputeControllerTimestamp> {
2591    collection_id: GlobalId,
2593    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2595    frontiers: Option<FrontiersIntrospectionState<T>>,
2600    refresh: Option<RefreshIntrospectionState<T>>,
2604}
2605
2606impl<T: ComputeControllerTimestamp> CollectionIntrospection<T> {
2607    fn new(
2608        collection_id: GlobalId,
2609        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2610        as_of: Antichain<T>,
2611        storage_sink: bool,
2612        initial_as_of: Option<Antichain<T>>,
2613        refresh_schedule: Option<RefreshSchedule>,
2614    ) -> Self {
2615        let refresh =
2616            match (refresh_schedule, initial_as_of) {
2617                (Some(refresh_schedule), Some(initial_as_of)) => Some(
2618                    RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2619                ),
2620                (refresh_schedule, _) => {
2621                    soft_assert_or_log!(
2624                        refresh_schedule.is_none(),
2625                        "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2626                    );
2627                    None
2628                }
2629            };
2630        let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2631
2632        let self_ = Self {
2633            collection_id,
2634            introspection_tx,
2635            frontiers,
2636            refresh,
2637        };
2638
2639        self_.report_initial_state();
2640        self_
2641    }
2642
2643    fn report_initial_state(&self) {
2645        if let Some(frontiers) = &self.frontiers {
2646            let row = frontiers.row_for_collection(self.collection_id);
2647            let updates = vec![(row, Diff::ONE)];
2648            self.send(IntrospectionType::Frontiers, updates);
2649        }
2650
2651        if let Some(refresh) = &self.refresh {
2652            let row = refresh.row_for_collection(self.collection_id);
2653            let updates = vec![(row, Diff::ONE)];
2654            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2655        }
2656    }
2657
2658    fn observe_frontiers(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2661        self.update_frontier_introspection(read_frontier, write_frontier);
2662        self.update_refresh_introspection(write_frontier);
2663    }
2664
2665    fn update_frontier_introspection(
2666        &mut self,
2667        read_frontier: &Antichain<T>,
2668        write_frontier: &Antichain<T>,
2669    ) {
2670        let Some(frontiers) = &mut self.frontiers else {
2671            return;
2672        };
2673
2674        if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2675        {
2676            return; };
2678
2679        let retraction = frontiers.row_for_collection(self.collection_id);
2680        frontiers.update(read_frontier, write_frontier);
2681        let insertion = frontiers.row_for_collection(self.collection_id);
2682        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2683        self.send(IntrospectionType::Frontiers, updates);
2684    }
2685
2686    fn update_refresh_introspection(&mut self, write_frontier: &Antichain<T>) {
2687        let Some(refresh) = &mut self.refresh else {
2688            return;
2689        };
2690
2691        let retraction = refresh.row_for_collection(self.collection_id);
2692        refresh.frontier_update(write_frontier);
2693        let insertion = refresh.row_for_collection(self.collection_id);
2694
2695        if retraction == insertion {
2696            return; }
2698
2699        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2700        self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2701    }
2702
2703    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2704        let _ = self.introspection_tx.send((introspection_type, updates));
2707    }
2708}
2709
2710impl<T: ComputeControllerTimestamp> Drop for CollectionIntrospection<T> {
2711    fn drop(&mut self) {
2712        if let Some(frontiers) = &self.frontiers {
2714            let row = frontiers.row_for_collection(self.collection_id);
2715            let updates = vec![(row, Diff::MINUS_ONE)];
2716            self.send(IntrospectionType::Frontiers, updates);
2717        }
2718
2719        if let Some(refresh) = &self.refresh {
2721            let retraction = refresh.row_for_collection(self.collection_id);
2722            let updates = vec![(retraction, Diff::MINUS_ONE)];
2723            self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2724        }
2725    }
2726}
2727
2728#[derive(Debug)]
2729struct FrontiersIntrospectionState<T> {
2730    read_frontier: Antichain<T>,
2731    write_frontier: Antichain<T>,
2732}
2733
2734impl<T: ComputeControllerTimestamp> FrontiersIntrospectionState<T> {
2735    fn new(as_of: Antichain<T>) -> Self {
2736        Self {
2737            read_frontier: as_of.clone(),
2738            write_frontier: as_of,
2739        }
2740    }
2741
2742    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2744        let read_frontier = self
2745            .read_frontier
2746            .as_option()
2747            .map_or(Datum::Null, |ts| ts.clone().into());
2748        let write_frontier = self
2749            .write_frontier
2750            .as_option()
2751            .map_or(Datum::Null, |ts| ts.clone().into());
2752        Row::pack_slice(&[
2753            Datum::String(&collection_id.to_string()),
2754            read_frontier,
2755            write_frontier,
2756        ])
2757    }
2758
2759    fn update(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2761        if read_frontier != &self.read_frontier {
2762            self.read_frontier.clone_from(read_frontier);
2763        }
2764        if write_frontier != &self.write_frontier {
2765            self.write_frontier.clone_from(write_frontier);
2766        }
2767    }
2768}
2769
2770#[derive(Debug)]
2773struct RefreshIntrospectionState<T> {
2774    refresh_schedule: RefreshSchedule,
2776    initial_as_of: Antichain<T>,
2777    next_refresh: Datum<'static>,           last_completed_refresh: Datum<'static>, }
2781
2782impl<T> RefreshIntrospectionState<T> {
2783    fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2785        Row::pack_slice(&[
2786            Datum::String(&collection_id.to_string()),
2787            self.last_completed_refresh,
2788            self.next_refresh,
2789        ])
2790    }
2791}
2792
2793impl<T: ComputeControllerTimestamp> RefreshIntrospectionState<T> {
2794    fn new(
2797        refresh_schedule: RefreshSchedule,
2798        initial_as_of: Antichain<T>,
2799        upper: &Antichain<T>,
2800    ) -> Self {
2801        let mut self_ = Self {
2802            refresh_schedule: refresh_schedule.clone(),
2803            initial_as_of: initial_as_of.clone(),
2804            next_refresh: Datum::Null,
2805            last_completed_refresh: Datum::Null,
2806        };
2807        self_.frontier_update(upper);
2808        self_
2809    }
2810
2811    fn frontier_update(&mut self, write_frontier: &Antichain<T>) {
2814        if write_frontier.is_empty() {
2815            self.last_completed_refresh =
2816                if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2817                    last_refresh.into()
2818                } else {
2819                    T::maximum().into()
2822                };
2823            self.next_refresh = Datum::Null;
2824        } else {
2825            if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2826                self.last_completed_refresh = Datum::Null;
2828                let initial_as_of = self.initial_as_of.as_option().expect(
2829                    "initial_as_of can't be [], because then there would be no refreshes at all",
2830                );
2831                let first_refresh = initial_as_of
2832                    .round_up(&self.refresh_schedule)
2833                    .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2834                soft_assert_or_log!(
2835                    first_refresh == *initial_as_of,
2836                    "initial_as_of should be set to the first refresh"
2837                );
2838                self.next_refresh = first_refresh.into();
2839            } else {
2840                let write_frontier = write_frontier.as_option().expect("checked above");
2842                self.last_completed_refresh = write_frontier
2843                    .round_down_minus_1(&self.refresh_schedule)
2844                    .map_or_else(
2845                        || {
2846                            soft_panic_or_log!(
2847                                "rounding down should have returned the first refresh or later"
2848                            );
2849                            Datum::Null
2850                        },
2851                        |last_completed_refresh| last_completed_refresh.into(),
2852                    );
2853                self.next_refresh = write_frontier.clone().into();
2854            }
2855        }
2856    }
2857}
2858
2859#[derive(Debug)]
2861struct PendingPeek<T: Timestamp> {
2862    target_replica: Option<ReplicaId>,
2866    otel_ctx: OpenTelemetryContext,
2868    requested_at: Instant,
2872    read_hold: ReadHold<T>,
2874    peek_response_tx: oneshot::Sender<PeekResponse>,
2876    limit: Option<usize>,
2878    offset: usize,
2880}
2881
2882#[derive(Debug, Clone)]
2883struct ActiveSubscribe<T> {
2884    frontier: Antichain<T>,
2886    target_replica: Option<ReplicaId>,
2890}
2891
2892impl<T: ComputeControllerTimestamp> ActiveSubscribe<T> {
2893    fn new(target_replica: Option<ReplicaId>) -> Self {
2894        Self {
2895            frontier: Antichain::from_elem(T::minimum()),
2896            target_replica,
2897        }
2898    }
2899}
2900
2901#[derive(Debug)]
2903struct ReplicaState<T: ComputeControllerTimestamp> {
2904    id: ReplicaId,
2906    client: ReplicaClient<T>,
2908    config: ReplicaConfig,
2910    metrics: ReplicaMetrics,
2912    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2914    collections: BTreeMap<GlobalId, ReplicaCollectionState<T>>,
2916    epoch: u64,
2918}
2919
2920impl<T: ComputeControllerTimestamp> ReplicaState<T> {
2921    fn new(
2922        id: ReplicaId,
2923        client: ReplicaClient<T>,
2924        config: ReplicaConfig,
2925        metrics: ReplicaMetrics,
2926        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2927        epoch: u64,
2928    ) -> Self {
2929        Self {
2930            id,
2931            client,
2932            config,
2933            metrics,
2934            introspection_tx,
2935            epoch,
2936            collections: Default::default(),
2937        }
2938    }
2939
2940    fn add_collection(
2946        &mut self,
2947        id: GlobalId,
2948        as_of: Antichain<T>,
2949        input_read_holds: Vec<ReadHold<T>>,
2950    ) {
2951        let metrics = self.metrics.for_collection(id);
2952        let introspection = ReplicaCollectionIntrospection::new(
2953            self.id,
2954            id,
2955            self.introspection_tx.clone(),
2956            as_of.clone(),
2957        );
2958        let mut state =
2959            ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
2960
2961        if id.is_transient() {
2965            state.wallclock_lag_max = None;
2966        }
2967
2968        if let Some(previous) = self.collections.insert(id, state) {
2969            panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
2970        }
2971    }
2972
2973    fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState<T>> {
2975        self.collections.remove(&id)
2976    }
2977
2978    fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
2980        self.collections.get(&id).map_or(true, |c| {
2981            c.write_frontier.is_empty()
2982                && c.input_frontier.is_empty()
2983                && c.output_frontier.is_empty()
2984        })
2985    }
2986
2987    #[mz_ore::instrument(level = "debug")]
2991    pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2992        let Self {
2999            id,
3000            client: _,
3001            config: _,
3002            metrics: _,
3003            introspection_tx: _,
3004            epoch,
3005            collections,
3006        } = self;
3007
3008        fn field(
3009            key: &str,
3010            value: impl Serialize,
3011        ) -> Result<(String, serde_json::Value), anyhow::Error> {
3012            let value = serde_json::to_value(value)?;
3013            Ok((key.to_string(), value))
3014        }
3015
3016        let collections: BTreeMap<_, _> = collections
3017            .iter()
3018            .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
3019            .collect();
3020
3021        let map = serde_json::Map::from_iter([
3022            field("id", id.to_string())?,
3023            field("collections", collections)?,
3024            field("epoch", epoch)?,
3025        ]);
3026        Ok(serde_json::Value::Object(map))
3027    }
3028}
3029
3030#[derive(Debug)]
3031struct ReplicaCollectionState<T: ComputeControllerTimestamp> {
3032    write_frontier: Antichain<T>,
3036    input_frontier: Antichain<T>,
3040    output_frontier: Antichain<T>,
3044
3045    metrics: Option<ReplicaCollectionMetrics>,
3049    as_of: Antichain<T>,
3051    introspection: ReplicaCollectionIntrospection<T>,
3053    input_read_holds: Vec<ReadHold<T>>,
3059
3060    wallclock_lag_max: Option<WallclockLag>,
3064}
3065
3066impl<T: ComputeControllerTimestamp> ReplicaCollectionState<T> {
3067    fn new(
3068        metrics: Option<ReplicaCollectionMetrics>,
3069        as_of: Antichain<T>,
3070        introspection: ReplicaCollectionIntrospection<T>,
3071        input_read_holds: Vec<ReadHold<T>>,
3072    ) -> Self {
3073        Self {
3074            write_frontier: as_of.clone(),
3075            input_frontier: as_of.clone(),
3076            output_frontier: as_of.clone(),
3077            metrics,
3078            as_of,
3079            introspection,
3080            input_read_holds,
3081            wallclock_lag_max: Some(WallclockLag::MIN),
3082        }
3083    }
3084
3085    fn hydrated(&self) -> bool {
3087        self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3103    }
3104
3105    fn update_write_frontier(&mut self, new_frontier: Antichain<T>) {
3107        if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3108            soft_panic_or_log!(
3109                "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3110                self.write_frontier,
3111            );
3112            return;
3113        } else if new_frontier == self.write_frontier {
3114            return;
3115        }
3116
3117        self.write_frontier = new_frontier;
3118    }
3119
3120    fn update_input_frontier(&mut self, new_frontier: Antichain<T>) {
3122        if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3123            soft_panic_or_log!(
3124                "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3125                self.input_frontier,
3126            );
3127            return;
3128        } else if new_frontier == self.input_frontier {
3129            return;
3130        }
3131
3132        self.input_frontier = new_frontier;
3133
3134        for read_hold in &mut self.input_read_holds {
3136            let result = read_hold.try_downgrade(self.input_frontier.clone());
3137            soft_assert_or_log!(
3138                result.is_ok(),
3139                "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3140                self.input_frontier,
3141            );
3142        }
3143    }
3144
3145    fn update_output_frontier(&mut self, new_frontier: Antichain<T>) {
3147        if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3148            soft_panic_or_log!(
3149                "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3150                self.output_frontier,
3151            );
3152            return;
3153        } else if new_frontier == self.output_frontier {
3154            return;
3155        }
3156
3157        self.output_frontier = new_frontier;
3158    }
3159}
3160
3161#[derive(Debug)]
3164struct ReplicaCollectionIntrospection<T: ComputeControllerTimestamp> {
3165    replica_id: ReplicaId,
3167    collection_id: GlobalId,
3169    write_frontier: Antichain<T>,
3171    introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3173}
3174
3175impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
3176    fn new(
3178        replica_id: ReplicaId,
3179        collection_id: GlobalId,
3180        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3181        as_of: Antichain<T>,
3182    ) -> Self {
3183        let self_ = Self {
3184            replica_id,
3185            collection_id,
3186            write_frontier: as_of,
3187            introspection_tx,
3188        };
3189
3190        self_.report_initial_state();
3191        self_
3192    }
3193
3194    fn report_initial_state(&self) {
3196        let row = self.write_frontier_row();
3197        let updates = vec![(row, Diff::ONE)];
3198        self.send(IntrospectionType::ReplicaFrontiers, updates);
3199    }
3200
3201    fn observe_frontier(&mut self, write_frontier: &Antichain<T>) {
3203        if self.write_frontier == *write_frontier {
3204            return; }
3206
3207        let retraction = self.write_frontier_row();
3208        self.write_frontier.clone_from(write_frontier);
3209        let insertion = self.write_frontier_row();
3210
3211        let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3212        self.send(IntrospectionType::ReplicaFrontiers, updates);
3213    }
3214
3215    fn write_frontier_row(&self) -> Row {
3217        let write_frontier = self
3218            .write_frontier
3219            .as_option()
3220            .map_or(Datum::Null, |ts| ts.clone().into());
3221        Row::pack_slice(&[
3222            Datum::String(&self.collection_id.to_string()),
3223            Datum::String(&self.replica_id.to_string()),
3224            write_frontier,
3225        ])
3226    }
3227
3228    fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3229        let _ = self.introspection_tx.send((introspection_type, updates));
3232    }
3233}
3234
3235impl<T: ComputeControllerTimestamp> Drop for ReplicaCollectionIntrospection<T> {
3236    fn drop(&mut self) {
3237        let row = self.write_frontier_row();
3239        let updates = vec![(row, Diff::MINUS_ONE)];
3240        self.send(IntrospectionType::ReplicaFrontiers, updates);
3241    }
3242}