1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::ComputeInstanceId;
21use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
22use mz_compute_types::plan::render_plan::RenderPlan;
23use mz_compute_types::sinks::{
24 ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, MaterializedViewSinkConnection,
25};
26use mz_compute_types::sources::SourceInstanceDesc;
27use mz_controller_types::dyncfgs::{
28 ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, WALLCLOCK_LAG_RECORDING_INTERVAL,
29};
30use mz_dyncfg::ConfigSet;
31use mz_expr::RowSetFinishing;
32use mz_ore::cast::CastFrom;
33use mz_ore::channel::instrumented_unbounded_channel;
34use mz_ore::now::NowFn;
35use mz_ore::tracing::OpenTelemetryContext;
36use mz_ore::{soft_assert_or_log, soft_panic_or_log};
37use mz_persist_types::PersistLocation;
38use mz_repr::adt::timestamp::CheckedTimestamp;
39use mz_repr::refresh_schedule::RefreshSchedule;
40use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row};
41use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
42use mz_storage_types::read_holds::{self, ReadHold};
43use mz_storage_types::read_policy::ReadPolicy;
44use serde::Serialize;
45use thiserror::Error;
46use timely::PartialOrder;
47use timely::progress::frontier::MutableAntichain;
48use timely::progress::{Antichain, ChangeBatch, Timestamp};
49use tokio::sync::mpsc::error::SendError;
50use tokio::sync::{mpsc, oneshot};
51use uuid::Uuid;
52
53use crate::controller::error::{
54 CollectionLookupError, CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
55};
56use crate::controller::replica::{ReplicaClient, ReplicaConfig};
57use crate::controller::{
58 ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, PeekNotification,
59 ReplicaId, StorageCollections,
60};
61use crate::logging::LogVariant;
62use crate::metrics::IntCounter;
63use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
64use crate::protocol::command::{
65 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
66};
67use crate::protocol::history::ComputeCommandHistory;
68use crate::protocol::response::{
69 ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StatusResponse,
70 SubscribeBatch, SubscribeResponse,
71};
72
73#[derive(Error, Debug)]
74#[error("replica exists already: {0}")]
75pub(super) struct ReplicaExists(pub ReplicaId);
76
77#[derive(Error, Debug)]
78#[error("replica does not exist: {0}")]
79pub(super) struct ReplicaMissing(pub ReplicaId);
80
81#[derive(Error, Debug)]
82pub(super) enum DataflowCreationError {
83 #[error("collection does not exist: {0}")]
84 CollectionMissing(GlobalId),
85 #[error("replica does not exist: {0}")]
86 ReplicaMissing(ReplicaId),
87 #[error("dataflow definition lacks an as_of value")]
88 MissingAsOf,
89 #[error("subscribe dataflow has an empty as_of")]
90 EmptyAsOfForSubscribe,
91 #[error("copy to dataflow has an empty as_of")]
92 EmptyAsOfForCopyTo,
93 #[error("no read hold provided for dataflow import: {0}")]
94 ReadHoldMissing(GlobalId),
95 #[error("insufficient read hold provided for dataflow import: {0}")]
96 ReadHoldInsufficient(GlobalId),
97}
98
99impl From<CollectionMissing> for DataflowCreationError {
100 fn from(error: CollectionMissing) -> Self {
101 Self::CollectionMissing(error.0)
102 }
103}
104
105#[derive(Error, Debug)]
106pub(super) enum PeekError {
107 #[error("replica does not exist: {0}")]
108 ReplicaMissing(ReplicaId),
109 #[error("read hold ID does not match peeked collection: {0}")]
110 ReadHoldIdMismatch(GlobalId),
111 #[error("insufficient read hold provided: {0}")]
112 ReadHoldInsufficient(GlobalId),
113}
114
115#[derive(Error, Debug)]
116pub(super) enum ReadPolicyError {
117 #[error("collection does not exist: {0}")]
118 CollectionMissing(GlobalId),
119 #[error("collection is write-only: {0}")]
120 WriteOnlyCollection(GlobalId),
121}
122
123impl From<CollectionMissing> for ReadPolicyError {
124 fn from(error: CollectionMissing) -> Self {
125 Self::CollectionMissing(error.0)
126 }
127}
128
129pub type Command<T> = Box<dyn FnOnce(&mut Instance<T>) + Send>;
131
132#[derive(Clone, derivative::Derivative)]
134#[derivative(Debug)]
135pub(super) struct Client<T: ComputeControllerTimestamp> {
136 command_tx: mpsc::UnboundedSender<Command<T>>,
138 #[derivative(Debug = "ignore")]
140 read_hold_tx: read_holds::ChangeTx<T>,
141}
142
143impl<T: ComputeControllerTimestamp> Client<T> {
144 pub fn send(&self, command: Command<T>) -> Result<(), SendError<Command<T>>> {
145 self.command_tx.send(command)
146 }
147
148 pub fn read_hold_tx(&self) -> read_holds::ChangeTx<T> {
149 Arc::clone(&self.read_hold_tx)
150 }
151}
152
153impl<T> Client<T>
154where
155 T: ComputeControllerTimestamp,
156{
157 pub fn spawn(
158 id: ComputeInstanceId,
159 build_info: &'static BuildInfo,
160 storage: StorageCollections<T>,
161 peek_stash_persist_location: PersistLocation,
162 arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
163 metrics: InstanceMetrics,
164 now: NowFn,
165 wallclock_lag: WallclockLagFn<T>,
166 dyncfg: Arc<ConfigSet>,
167 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
168 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
169 ) -> Self {
170 let (command_tx, command_rx) = mpsc::unbounded_channel();
171
172 let read_hold_tx: read_holds::ChangeTx<_> = {
173 let command_tx = command_tx.clone();
174 Arc::new(move |id, change: ChangeBatch<_>| {
175 let cmd: Command<_> = {
176 let change = change.clone();
177 Box::new(move |i| i.apply_read_hold_change(id, change.clone()))
178 };
179 command_tx.send(cmd).map_err(|_| SendError((id, change)))
180 })
181 };
182
183 mz_ore::task::spawn(
184 || format!("compute-instance-{id}"),
185 Instance::new(
186 build_info,
187 storage,
188 peek_stash_persist_location,
189 arranged_logs,
190 metrics,
191 now,
192 wallclock_lag,
193 dyncfg,
194 command_rx,
195 response_tx,
196 Arc::clone(&read_hold_tx),
197 introspection_tx,
198 )
199 .run(),
200 );
201
202 Self {
203 command_tx,
204 read_hold_tx,
205 }
206 }
207}
208
209pub(super) type ReplicaResponse<T> = (ReplicaId, u64, ComputeResponse<T>);
212
213pub(super) struct Instance<T: ComputeControllerTimestamp> {
215 build_info: &'static BuildInfo,
217 storage_collections: StorageCollections<T>,
219 initialized: bool,
221 read_only: bool,
227 workload_class: Option<String>,
231 replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
233 collections: BTreeMap<GlobalId, CollectionState<T>>,
241 log_sources: BTreeMap<LogVariant, GlobalId>,
243 peeks: BTreeMap<Uuid, PendingPeek<T>>,
252 subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>,
266 copy_tos: BTreeSet<GlobalId>,
274 history: ComputeCommandHistory<UIntGauge, T>,
276 command_rx: mpsc::UnboundedReceiver<Command<T>>,
278 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
280 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
282 metrics: InstanceMetrics,
284 dyncfg: Arc<ConfigSet>,
286
287 peek_stash_persist_location: PersistLocation,
289
290 now: NowFn,
292 wallclock_lag: WallclockLagFn<T>,
294 wallclock_lag_last_recorded: DateTime<Utc>,
296
297 read_hold_tx: read_holds::ChangeTx<T>,
302 replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
304 replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse<T>, IntCounter>,
306}
307
308impl<T: ComputeControllerTimestamp> Instance<T> {
309 fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, CollectionMissing> {
311 self.collections.get(&id).ok_or(CollectionMissing(id))
312 }
313
314 fn collection_mut(
316 &mut self,
317 id: GlobalId,
318 ) -> Result<&mut CollectionState<T>, CollectionMissing> {
319 self.collections.get_mut(&id).ok_or(CollectionMissing(id))
320 }
321
322 fn expect_collection(&self, id: GlobalId) -> &CollectionState<T> {
328 self.collections.get(&id).expect("collection must exist")
329 }
330
331 fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T> {
337 self.collections
338 .get_mut(&id)
339 .expect("collection must exist")
340 }
341
342 fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState<T>)> {
343 self.collections.iter().map(|(id, coll)| (*id, coll))
344 }
345
346 fn add_collection(
352 &mut self,
353 id: GlobalId,
354 as_of: Antichain<T>,
355 shared: SharedCollectionState<T>,
356 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
357 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
358 replica_input_read_holds: Vec<ReadHold<T>>,
359 write_only: bool,
360 storage_sink: bool,
361 initial_as_of: Option<Antichain<T>>,
362 refresh_schedule: Option<RefreshSchedule>,
363 ) {
364 let introspection = CollectionIntrospection::new(
366 id,
367 self.introspection_tx.clone(),
368 as_of.clone(),
369 storage_sink,
370 initial_as_of,
371 refresh_schedule,
372 );
373 let mut state = CollectionState::new(
374 id,
375 as_of.clone(),
376 shared,
377 storage_dependencies,
378 compute_dependencies,
379 Arc::clone(&self.read_hold_tx),
380 introspection,
381 );
382 if write_only {
384 state.read_policy = None;
385 }
386
387 if let Some(previous) = self.collections.insert(id, state) {
388 panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
389 }
390
391 for replica in self.replicas.values_mut() {
393 replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
394 }
395
396 self.report_dependency_updates(id, Diff::ONE);
398 }
399
400 fn remove_collection(&mut self, id: GlobalId) {
401 self.report_dependency_updates(id, Diff::MINUS_ONE);
403
404 for replica in self.replicas.values_mut() {
406 replica.remove_collection(id);
407 }
408
409 self.collections.remove(&id);
411 }
412
413 fn add_replica_state(
414 &mut self,
415 id: ReplicaId,
416 client: ReplicaClient<T>,
417 config: ReplicaConfig,
418 epoch: u64,
419 ) {
420 let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
421
422 let metrics = self.metrics.for_replica(id);
423 let mut replica = ReplicaState::new(
424 id,
425 client,
426 config,
427 metrics,
428 self.introspection_tx.clone(),
429 epoch,
430 );
431
432 for (collection_id, collection) in &self.collections {
434 if collection.log_collection && !log_ids.contains(collection_id) {
436 continue;
437 }
438
439 let as_of = if collection.log_collection {
440 Antichain::from_elem(T::minimum())
445 } else {
446 collection.read_frontier().to_owned()
447 };
448
449 let input_read_holds = collection.storage_dependencies.values().cloned().collect();
450 replica.add_collection(*collection_id, as_of, input_read_holds);
451 }
452
453 self.replicas.insert(id, replica);
454 }
455
456 fn deliver_response(&self, response: ComputeControllerResponse<T>) {
458 let _ = self.response_tx.send(response);
461 }
462
463 fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
465 let _ = self.introspection_tx.send((type_, updates));
468 }
469
470 fn replica_exists(&self, id: ReplicaId) -> bool {
472 self.replicas.contains_key(&id)
473 }
474
475 fn peeks_targeting(
477 &self,
478 replica_id: ReplicaId,
479 ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)> {
480 self.peeks.iter().filter_map(move |(uuid, peek)| {
481 if peek.target_replica == Some(replica_id) {
482 Some((*uuid, peek))
483 } else {
484 None
485 }
486 })
487 }
488
489 fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
491 self.subscribes.iter().filter_map(move |(id, subscribe)| {
492 let targeting = subscribe.target_replica == Some(replica_id);
493 targeting.then_some(*id)
494 })
495 }
496
497 fn update_frontier_introspection(&mut self) {
506 for collection in self.collections.values_mut() {
507 collection
508 .introspection
509 .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
510 }
511
512 for replica in self.replicas.values_mut() {
513 for collection in replica.collections.values_mut() {
514 collection
515 .introspection
516 .observe_frontier(&collection.write_frontier);
517 }
518 }
519 }
520
521 fn refresh_state_metrics(&self) {
530 let unscheduled_collections_count =
531 self.collections.values().filter(|c| !c.scheduled).count();
532 let connected_replica_count = self
533 .replicas
534 .values()
535 .filter(|r| r.client.is_connected())
536 .count();
537
538 self.metrics
539 .replica_count
540 .set(u64::cast_from(self.replicas.len()));
541 self.metrics
542 .collection_count
543 .set(u64::cast_from(self.collections.len()));
544 self.metrics
545 .collection_unscheduled_count
546 .set(u64::cast_from(unscheduled_collections_count));
547 self.metrics
548 .peek_count
549 .set(u64::cast_from(self.peeks.len()));
550 self.metrics
551 .subscribe_count
552 .set(u64::cast_from(self.subscribes.len()));
553 self.metrics
554 .copy_to_count
555 .set(u64::cast_from(self.copy_tos.len()));
556 self.metrics
557 .connected_replica_count
558 .set(u64::cast_from(connected_replica_count));
559 }
560
561 fn refresh_wallclock_lag(&mut self) {
580 let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
581 Some(ts) => (self.wallclock_lag)(ts.clone()),
582 None => Duration::ZERO,
583 };
584
585 let now_ms = (self.now)();
586 let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
587 let histogram_labels = match &self.workload_class {
588 Some(wc) => [("workload_class", wc.clone())].into(),
589 None => BTreeMap::new(),
590 };
591
592 let mut unreadable_collections = BTreeSet::new();
596 for (id, collection) in &mut self.collections {
597 let read_frontier = match self.storage_collections.collection_frontiers(*id) {
599 Ok(f) => f.read_capabilities,
600 Err(_) => collection.read_frontier(),
601 };
602 let write_frontier = collection.write_frontier();
603 let collection_unreadable = PartialOrder::less_equal(&write_frontier, &read_frontier);
604 if collection_unreadable {
605 unreadable_collections.insert(id);
606 }
607
608 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
609 let bucket = if collection_unreadable {
610 WallclockLag::Undefined
611 } else {
612 let lag = frontier_lag(&write_frontier);
613 let lag = lag.as_secs().next_power_of_two();
614 WallclockLag::Seconds(lag)
615 };
616
617 let key = (histogram_period, bucket, histogram_labels.clone());
618 *stash.entry(key).or_default() += Diff::ONE;
619 }
620 }
621
622 for replica in self.replicas.values_mut() {
624 for (id, collection) in &mut replica.collections {
625 let lag = if unreadable_collections.contains(&id) {
626 WallclockLag::Undefined
627 } else {
628 let lag = frontier_lag(&collection.write_frontier);
629 WallclockLag::Seconds(lag.as_secs())
630 };
631
632 if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
633 *wallclock_lag_max = (*wallclock_lag_max).max(lag);
634 }
635
636 if let Some(metrics) = &mut collection.metrics {
637 let secs = lag.unwrap_seconds_or(u64::MAX);
640 metrics.wallclock_lag.observe(secs);
641 };
642 }
643 }
644
645 self.maybe_record_wallclock_lag();
647 }
648
649 fn maybe_record_wallclock_lag(&mut self) {
657 if self.read_only {
658 return;
659 }
660
661 let duration_trunc = |datetime: DateTime<_>, interval| {
662 let td = TimeDelta::from_std(interval).ok()?;
663 datetime.duration_trunc(td).ok()
664 };
665
666 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
667 let now_dt = mz_ore::now::to_datetime((self.now)());
668 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
669 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
670 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
671 duration_trunc(now_dt, *default).unwrap()
672 });
673 if now_trunc <= self.wallclock_lag_last_recorded {
674 return;
675 }
676
677 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
678
679 let mut history_updates = Vec::new();
680 for (replica_id, replica) in &mut self.replicas {
681 for (collection_id, collection) in &mut replica.collections {
682 let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
683 continue;
684 };
685
686 let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
687 let row = Row::pack_slice(&[
688 Datum::String(&collection_id.to_string()),
689 Datum::String(&replica_id.to_string()),
690 max_lag.into_interval_datum(),
691 Datum::TimestampTz(now_ts),
692 ]);
693 history_updates.push((row, Diff::ONE));
694 }
695 }
696 if !history_updates.is_empty() {
697 self.deliver_introspection_updates(
698 IntrospectionType::WallclockLagHistory,
699 history_updates,
700 );
701 }
702
703 let mut histogram_updates = Vec::new();
704 let mut row_buf = Row::default();
705 for (collection_id, collection) in &mut self.collections {
706 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
707 continue;
708 };
709
710 for ((period, lag, labels), count) in std::mem::take(stash) {
711 let mut packer = row_buf.packer();
712 packer.extend([
713 Datum::TimestampTz(period.start),
714 Datum::TimestampTz(period.end),
715 Datum::String(&collection_id.to_string()),
716 lag.into_uint64_datum(),
717 ]);
718 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
719 packer.push_dict(labels);
720
721 histogram_updates.push((row_buf.clone(), count));
722 }
723 }
724 if !histogram_updates.is_empty() {
725 self.deliver_introspection_updates(
726 IntrospectionType::WallclockLagHistogram,
727 histogram_updates,
728 );
729 }
730
731 self.wallclock_lag_last_recorded = now_trunc;
732 }
733
734 fn report_dependency_updates(&self, id: GlobalId, diff: Diff) {
740 let collection = self.expect_collection(id);
741 let dependencies = collection.dependency_ids();
742
743 let updates = dependencies
744 .map(|dependency_id| {
745 let row = Row::pack_slice(&[
746 Datum::String(&id.to_string()),
747 Datum::String(&dependency_id.to_string()),
748 ]);
749 (row, diff)
750 })
751 .collect();
752
753 self.deliver_introspection_updates(IntrospectionType::ComputeDependencies, updates);
754 }
755
756 #[mz_ore::instrument(level = "debug")]
762 pub fn collection_hydrated(
763 &self,
764 collection_id: GlobalId,
765 ) -> Result<bool, CollectionLookupError> {
766 if self.replicas.is_empty() {
767 return Ok(true);
768 }
769
770 for replica_state in self.replicas.values() {
771 let collection_state = replica_state
772 .collections
773 .get(&collection_id)
774 .ok_or(CollectionLookupError::CollectionMissing(collection_id))?;
775
776 if collection_state.hydrated() {
777 return Ok(true);
778 }
779 }
780
781 Ok(false)
782 }
783
784 #[mz_ore::instrument(level = "debug")]
790 pub fn collections_hydrated_on_replicas(
791 &self,
792 target_replica_ids: Option<Vec<ReplicaId>>,
793 exclude_collections: &BTreeSet<GlobalId>,
794 ) -> Result<bool, HydrationCheckBadTarget> {
795 if self.replicas.is_empty() {
796 return Ok(true);
797 }
798 let mut all_hydrated = true;
799 let target_replicas: BTreeSet<ReplicaId> = self
800 .replicas
801 .keys()
802 .filter_map(|id| match target_replica_ids {
803 None => Some(id.clone()),
804 Some(ref ids) if ids.contains(id) => Some(id.clone()),
805 Some(_) => None,
806 })
807 .collect();
808 if let Some(targets) = target_replica_ids {
809 if target_replicas.is_empty() {
810 return Err(HydrationCheckBadTarget(targets));
811 }
812 }
813
814 for (id, _collection) in self.collections_iter() {
815 if id.is_transient() || exclude_collections.contains(&id) {
816 continue;
817 }
818
819 let mut collection_hydrated = false;
820 for replica_state in self.replicas.values() {
821 if !target_replicas.contains(&replica_state.id) {
822 continue;
823 }
824 let collection_state = replica_state
825 .collections
826 .get(&id)
827 .expect("missing collection state");
828
829 if collection_state.hydrated() {
830 collection_hydrated = true;
831 break;
832 }
833 }
834
835 if !collection_hydrated {
836 tracing::info!("collection {id} is not hydrated on any replica");
837 all_hydrated = false;
838 }
841 }
842
843 Ok(all_hydrated)
844 }
845
846 #[mz_ore::instrument(level = "debug")]
852 pub fn collections_hydrated(&self, exclude_collections: &BTreeSet<GlobalId>) -> bool {
853 self.collections_hydrated_on_replicas(None, exclude_collections)
854 .expect("Cannot error if target_replica_ids is None")
855 }
856
857 fn cleanup_collections(&mut self) {
873 let to_remove: Vec<_> = self
874 .collections_iter()
875 .filter(|(id, collection)| {
876 collection.dropped
877 && collection.shared.lock_read_capabilities(|c| c.is_empty())
878 && self
879 .replicas
880 .values()
881 .all(|r| r.collection_frontiers_empty(*id))
882 })
883 .map(|(id, _collection)| id)
884 .collect();
885
886 for id in to_remove {
887 self.remove_collection(id);
888 }
889 }
890
891 #[mz_ore::instrument(level = "debug")]
895 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
896 let Self {
903 build_info: _,
904 storage_collections: _,
905 peek_stash_persist_location: _,
906 initialized,
907 read_only,
908 workload_class,
909 replicas,
910 collections,
911 log_sources: _,
912 peeks,
913 subscribes,
914 copy_tos,
915 history: _,
916 command_rx: _,
917 response_tx: _,
918 introspection_tx: _,
919 metrics: _,
920 dyncfg: _,
921 now: _,
922 wallclock_lag: _,
923 wallclock_lag_last_recorded,
924 read_hold_tx: _,
925 replica_tx: _,
926 replica_rx: _,
927 } = self;
928
929 fn field(
930 key: &str,
931 value: impl Serialize,
932 ) -> Result<(String, serde_json::Value), anyhow::Error> {
933 let value = serde_json::to_value(value)?;
934 Ok((key.to_string(), value))
935 }
936
937 let replicas: BTreeMap<_, _> = replicas
938 .iter()
939 .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
940 .collect::<Result<_, anyhow::Error>>()?;
941 let collections: BTreeMap<_, _> = collections
942 .iter()
943 .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
944 .collect();
945 let peeks: BTreeMap<_, _> = peeks
946 .iter()
947 .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
948 .collect();
949 let subscribes: BTreeMap<_, _> = subscribes
950 .iter()
951 .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
952 .collect();
953 let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
954 let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
955
956 let map = serde_json::Map::from_iter([
957 field("initialized", initialized)?,
958 field("read_only", read_only)?,
959 field("workload_class", workload_class)?,
960 field("replicas", replicas)?,
961 field("collections", collections)?,
962 field("peeks", peeks)?,
963 field("subscribes", subscribes)?,
964 field("copy_tos", copy_tos)?,
965 field("wallclock_lag_last_recorded", wallclock_lag_last_recorded)?,
966 ]);
967 Ok(serde_json::Value::Object(map))
968 }
969}
970
971impl<T> Instance<T>
972where
973 T: ComputeControllerTimestamp,
974{
975 fn new(
976 build_info: &'static BuildInfo,
977 storage: StorageCollections<T>,
978 peek_stash_persist_location: PersistLocation,
979 arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
980 metrics: InstanceMetrics,
981 now: NowFn,
982 wallclock_lag: WallclockLagFn<T>,
983 dyncfg: Arc<ConfigSet>,
984 command_rx: mpsc::UnboundedReceiver<Command<T>>,
985 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
986 read_hold_tx: read_holds::ChangeTx<T>,
987 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
988 ) -> Self {
989 let mut collections = BTreeMap::new();
990 let mut log_sources = BTreeMap::new();
991 for (log, id, shared) in arranged_logs {
992 let collection = CollectionState::new_log_collection(
993 id,
994 shared,
995 Arc::clone(&read_hold_tx),
996 introspection_tx.clone(),
997 );
998 collections.insert(id, collection);
999 log_sources.insert(log, id);
1000 }
1001
1002 let history = ComputeCommandHistory::new(metrics.for_history());
1003
1004 let send_count = metrics.response_send_count.clone();
1005 let recv_count = metrics.response_recv_count.clone();
1006 let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
1007
1008 let now_dt = mz_ore::now::to_datetime(now());
1009
1010 Self {
1011 build_info,
1012 storage_collections: storage,
1013 peek_stash_persist_location,
1014 initialized: false,
1015 read_only: true,
1016 workload_class: None,
1017 replicas: Default::default(),
1018 collections,
1019 log_sources,
1020 peeks: Default::default(),
1021 subscribes: Default::default(),
1022 copy_tos: Default::default(),
1023 history,
1024 command_rx,
1025 response_tx,
1026 introspection_tx,
1027 metrics,
1028 dyncfg,
1029 now,
1030 wallclock_lag,
1031 wallclock_lag_last_recorded: now_dt,
1032 read_hold_tx,
1033 replica_tx,
1034 replica_rx,
1035 }
1036 }
1037
1038 async fn run(mut self) {
1039 self.send(ComputeCommand::Hello {
1040 nonce: Uuid::default(),
1043 });
1044
1045 let instance_config = InstanceConfig {
1046 peek_stash_persist_location: self.peek_stash_persist_location.clone(),
1047 logging: Default::default(),
1050 expiration_offset: Default::default(),
1051 };
1052
1053 self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
1054
1055 loop {
1056 tokio::select! {
1057 command = self.command_rx.recv() => match command {
1058 Some(cmd) => cmd(&mut self),
1059 None => break,
1060 },
1061 response = self.replica_rx.recv() => match response {
1062 Some(response) => self.handle_response(response),
1063 None => unreachable!("self owns a sender side of the channel"),
1064 }
1065 }
1066 }
1067 }
1068
1069 #[mz_ore::instrument(level = "debug")]
1071 pub fn update_configuration(&mut self, config_params: ComputeParameters) {
1072 if let Some(workload_class) = &config_params.workload_class {
1073 self.workload_class = workload_class.clone();
1074 }
1075
1076 let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
1077 self.send(command);
1078 }
1079
1080 #[mz_ore::instrument(level = "debug")]
1085 pub fn initialization_complete(&mut self) {
1086 if !self.initialized {
1088 self.send(ComputeCommand::InitializationComplete);
1089 self.initialized = true;
1090 }
1091 }
1092
1093 #[mz_ore::instrument(level = "debug")]
1097 pub fn allow_writes(&mut self) {
1098 if self.read_only {
1099 self.read_only = false;
1100 self.send(ComputeCommand::AllowWrites);
1101 }
1102 }
1103
1104 #[mz_ore::instrument(level = "debug")]
1115 pub fn shutdown(&mut self) {
1116 let (_tx, rx) = mpsc::unbounded_channel();
1118 let mut command_rx = std::mem::replace(&mut self.command_rx, rx);
1119
1120 while let Ok(cmd) = command_rx.try_recv() {
1126 cmd(self);
1127 }
1128
1129 self.cleanup_collections();
1131
1132 let stray_replicas: Vec<_> = self.replicas.keys().collect();
1133 soft_assert_or_log!(
1134 stray_replicas.is_empty(),
1135 "dropped instance still has provisioned replicas: {stray_replicas:?}",
1136 );
1137
1138 let collections = self.collections.iter();
1139 let stray_collections: Vec<_> = collections
1140 .filter(|(_, c)| !c.log_collection)
1141 .map(|(id, _)| id)
1142 .collect();
1143 soft_assert_or_log!(
1144 stray_collections.is_empty(),
1145 "dropped instance still has installed collections: {stray_collections:?}",
1146 );
1147 }
1148
1149 #[mz_ore::instrument(level = "debug")]
1151 fn send(&mut self, cmd: ComputeCommand<T>) {
1152 self.history.push(cmd.clone());
1154
1155 for replica in self.replicas.values_mut() {
1157 let _ = replica.client.send(cmd.clone());
1159 }
1160 }
1161
1162 #[mz_ore::instrument(level = "debug")]
1164 pub fn add_replica(
1165 &mut self,
1166 id: ReplicaId,
1167 mut config: ReplicaConfig,
1168 epoch: Option<u64>,
1169 ) -> Result<(), ReplicaExists> {
1170 if self.replica_exists(id) {
1171 return Err(ReplicaExists(id));
1172 }
1173
1174 config.logging.index_logs = self.log_sources.clone();
1175
1176 let epoch = epoch.unwrap_or(1);
1177 let metrics = self.metrics.for_replica(id);
1178 let client = ReplicaClient::spawn(
1179 id,
1180 self.build_info,
1181 config.clone(),
1182 epoch,
1183 metrics.clone(),
1184 Arc::clone(&self.dyncfg),
1185 self.replica_tx.clone(),
1186 );
1187
1188 self.history.reduce();
1190
1191 self.history.update_source_uppers(&self.storage_collections);
1193
1194 for command in self.history.iter() {
1196 if client.send(command.clone()).is_err() {
1197 tracing::warn!("Replica {:?} connection terminated during hydration", id);
1200 break;
1201 }
1202 }
1203
1204 self.add_replica_state(id, client, config, epoch);
1206
1207 Ok(())
1208 }
1209
1210 #[mz_ore::instrument(level = "debug")]
1212 pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1213 self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1214
1215 let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1219 for subscribe_id in to_drop {
1220 let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1221 let response = ComputeControllerResponse::SubscribeResponse(
1222 subscribe_id,
1223 SubscribeBatch {
1224 lower: subscribe.frontier.clone(),
1225 upper: subscribe.frontier,
1226 updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1227 },
1228 );
1229 self.deliver_response(response);
1230 }
1231
1232 let mut peek_responses = Vec::new();
1237 let mut to_drop = Vec::new();
1238 for (uuid, peek) in self.peeks_targeting(id) {
1239 peek_responses.push(ComputeControllerResponse::PeekNotification(
1240 uuid,
1241 PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1242 peek.otel_ctx.clone(),
1243 ));
1244 to_drop.push(uuid);
1245 }
1246 for response in peek_responses {
1247 self.deliver_response(response);
1248 }
1249 for uuid in to_drop {
1250 let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1251 self.finish_peek(uuid, response);
1252 }
1253
1254 self.forward_implied_capabilities();
1257
1258 Ok(())
1259 }
1260
1261 fn rehydrate_replica(&mut self, id: ReplicaId) {
1267 let config = self.replicas[&id].config.clone();
1268 let epoch = self.replicas[&id].epoch + 1;
1269
1270 self.remove_replica(id).expect("replica must exist");
1271 let result = self.add_replica(id, config, Some(epoch));
1272
1273 match result {
1274 Ok(()) => (),
1275 Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1276 }
1277 }
1278
1279 fn rehydrate_failed_replicas(&mut self) {
1281 let replicas = self.replicas.iter();
1282 let failed_replicas: Vec<_> = replicas
1283 .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1284 .collect();
1285
1286 for replica_id in failed_replicas {
1287 self.rehydrate_replica(replica_id);
1288 }
1289 }
1290
1291 #[mz_ore::instrument(level = "debug")]
1300 pub fn create_dataflow(
1301 &mut self,
1302 dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1303 import_read_holds: Vec<ReadHold<T>>,
1304 subscribe_target_replica: Option<ReplicaId>,
1305 mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState<T>>,
1306 ) -> Result<(), DataflowCreationError> {
1307 use DataflowCreationError::*;
1308
1309 if let Some(replica_id) = subscribe_target_replica {
1310 if !self.replica_exists(replica_id) {
1311 return Err(ReplicaMissing(replica_id));
1312 }
1313 }
1314
1315 let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1317 if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1318 return Err(EmptyAsOfForSubscribe);
1319 }
1320 if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1321 return Err(EmptyAsOfForCopyTo);
1322 }
1323
1324 let mut storage_dependencies = BTreeMap::new();
1326 let mut compute_dependencies = BTreeMap::new();
1327
1328 let mut replica_input_read_holds = Vec::new();
1333
1334 let mut import_read_holds: BTreeMap<_, _> =
1335 import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1336
1337 for &id in dataflow.source_imports.keys() {
1338 let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1339 replica_input_read_holds.push(read_hold.clone());
1340
1341 read_hold
1342 .try_downgrade(as_of.clone())
1343 .map_err(|_| ReadHoldInsufficient(id))?;
1344 storage_dependencies.insert(id, read_hold);
1345 }
1346
1347 for &id in dataflow.index_imports.keys() {
1348 let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1349 read_hold
1350 .try_downgrade(as_of.clone())
1351 .map_err(|_| ReadHoldInsufficient(id))?;
1352 compute_dependencies.insert(id, read_hold);
1353 }
1354
1355 if as_of.is_empty() {
1358 replica_input_read_holds = Default::default();
1359 }
1360
1361 for export_id in dataflow.export_ids() {
1363 let shared = shared_collection_state
1364 .remove(&export_id)
1365 .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1366 let write_only = dataflow.sink_exports.contains_key(&export_id);
1367 let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1368
1369 self.add_collection(
1370 export_id,
1371 as_of.clone(),
1372 shared,
1373 storage_dependencies.clone(),
1374 compute_dependencies.clone(),
1375 replica_input_read_holds.clone(),
1376 write_only,
1377 storage_sink,
1378 dataflow.initial_storage_as_of.clone(),
1379 dataflow.refresh_schedule.clone(),
1380 );
1381
1382 if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1385 self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1386 }
1387 }
1388
1389 for subscribe_id in dataflow.subscribe_ids() {
1391 self.subscribes
1392 .insert(subscribe_id, ActiveSubscribe::new(subscribe_target_replica));
1393 }
1394
1395 for copy_to_id in dataflow.copy_to_ids() {
1397 self.copy_tos.insert(copy_to_id);
1398 }
1399
1400 let mut source_imports = BTreeMap::new();
1403 for (id, (si, monotonic, _upper)) in dataflow.source_imports {
1404 let frontiers = self
1405 .storage_collections
1406 .collection_frontiers(id)
1407 .expect("collection exists");
1408
1409 let collection_metadata = self
1410 .storage_collections
1411 .collection_metadata(id)
1412 .expect("we have a read hold on this collection");
1413
1414 let desc = SourceInstanceDesc {
1415 storage_metadata: collection_metadata.clone(),
1416 arguments: si.arguments,
1417 typ: si.typ.clone(),
1418 };
1419 source_imports.insert(id, (desc, monotonic, frontiers.write_frontier));
1420 }
1421
1422 let mut sink_exports = BTreeMap::new();
1423 for (id, se) in dataflow.sink_exports {
1424 let connection = match se.connection {
1425 ComputeSinkConnection::MaterializedView(conn) => {
1426 let metadata = self
1427 .storage_collections
1428 .collection_metadata(id)
1429 .map_err(|_| CollectionMissing(id))?
1430 .clone();
1431 let conn = MaterializedViewSinkConnection {
1432 value_desc: conn.value_desc,
1433 storage_metadata: metadata,
1434 };
1435 ComputeSinkConnection::MaterializedView(conn)
1436 }
1437 ComputeSinkConnection::ContinualTask(conn) => {
1438 let metadata = self
1439 .storage_collections
1440 .collection_metadata(id)
1441 .map_err(|_| DataflowCreationError::CollectionMissing(id))?
1442 .clone();
1443 let conn = ContinualTaskConnection {
1444 input_id: conn.input_id,
1445 storage_metadata: metadata,
1446 };
1447 ComputeSinkConnection::ContinualTask(conn)
1448 }
1449 ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1450 ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1451 ComputeSinkConnection::CopyToS3Oneshot(conn)
1452 }
1453 };
1454 let desc = ComputeSinkDesc {
1455 from: se.from,
1456 from_desc: se.from_desc,
1457 connection,
1458 with_snapshot: se.with_snapshot,
1459 up_to: se.up_to,
1460 non_null_assertions: se.non_null_assertions,
1461 refresh_schedule: se.refresh_schedule,
1462 };
1463 sink_exports.insert(id, desc);
1464 }
1465
1466 let objects_to_build = dataflow
1468 .objects_to_build
1469 .into_iter()
1470 .map(|object| BuildDesc {
1471 id: object.id,
1472 plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1473 })
1474 .collect();
1475
1476 let augmented_dataflow = DataflowDescription {
1477 source_imports,
1478 sink_exports,
1479 objects_to_build,
1480 index_imports: dataflow.index_imports,
1482 index_exports: dataflow.index_exports,
1483 as_of: dataflow.as_of.clone(),
1484 until: dataflow.until,
1485 initial_storage_as_of: dataflow.initial_storage_as_of,
1486 refresh_schedule: dataflow.refresh_schedule,
1487 debug_name: dataflow.debug_name,
1488 time_dependence: dataflow.time_dependence,
1489 };
1490
1491 if augmented_dataflow.is_transient() {
1492 tracing::debug!(
1493 name = %augmented_dataflow.debug_name,
1494 import_ids = %augmented_dataflow.display_import_ids(),
1495 export_ids = %augmented_dataflow.display_export_ids(),
1496 as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1497 until = ?augmented_dataflow.until.elements(),
1498 "creating dataflow",
1499 );
1500 } else {
1501 tracing::info!(
1502 name = %augmented_dataflow.debug_name,
1503 import_ids = %augmented_dataflow.display_import_ids(),
1504 export_ids = %augmented_dataflow.display_export_ids(),
1505 as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1506 until = ?augmented_dataflow.until.elements(),
1507 "creating dataflow",
1508 );
1509 }
1510
1511 if as_of.is_empty() {
1514 tracing::info!(
1515 name = %augmented_dataflow.debug_name,
1516 "not sending `CreateDataflow`, because of empty `as_of`",
1517 );
1518 } else {
1519 let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1520 let dataflow = Box::new(augmented_dataflow);
1521 self.send(ComputeCommand::CreateDataflow(dataflow));
1522
1523 for id in collections {
1524 self.maybe_schedule_collection(id);
1525 }
1526 }
1527
1528 Ok(())
1529 }
1530
1531 fn maybe_schedule_collection(&mut self, id: GlobalId) {
1537 let collection = self.expect_collection(id);
1538
1539 if collection.scheduled {
1541 return;
1542 }
1543
1544 let as_of = collection.read_frontier();
1545
1546 if as_of.is_empty() {
1549 return;
1550 }
1551
1552 let ready = if id.is_transient() {
1553 true
1559 } else {
1560 let not_self_dep = |x: &GlobalId| *x != id;
1566
1567 let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1572 let compute_frontiers = compute_deps.map(|id| {
1573 let dep = &self.expect_collection(id);
1574 dep.write_frontier()
1575 });
1576
1577 let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1578 let storage_frontiers = self
1579 .storage_collections
1580 .collections_frontiers(storage_deps.collect())
1581 .expect("must exist");
1582 let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1583
1584 let ready = compute_frontiers
1585 .chain(storage_frontiers)
1586 .all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1587
1588 ready
1589 };
1590
1591 if ready {
1592 self.send(ComputeCommand::Schedule(id));
1593 let collection = self.expect_collection_mut(id);
1594 collection.scheduled = true;
1595 }
1596 }
1597
1598 fn schedule_collections(&mut self) {
1600 let ids: Vec<_> = self.collections.keys().copied().collect();
1601 for id in ids {
1602 self.maybe_schedule_collection(id);
1603 }
1604 }
1605
1606 #[mz_ore::instrument(level = "debug")]
1609 pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1610 for id in &ids {
1611 let collection = self.collection_mut(*id)?;
1612
1613 collection.dropped = true;
1615
1616 collection.implied_read_hold.release();
1619 collection.warmup_read_hold.release();
1620
1621 self.subscribes.remove(id);
1624 self.copy_tos.remove(id);
1627 }
1628
1629 Ok(())
1630 }
1631
1632 #[mz_ore::instrument(level = "debug")]
1634 pub fn peek(
1635 &mut self,
1636 peek_target: PeekTarget,
1637 literal_constraints: Option<Vec<Row>>,
1638 uuid: Uuid,
1639 timestamp: T,
1640 result_desc: RelationDesc,
1641 finishing: RowSetFinishing,
1642 map_filter_project: mz_expr::SafeMfpPlan,
1643 mut read_hold: ReadHold<T>,
1644 target_replica: Option<ReplicaId>,
1645 peek_response_tx: oneshot::Sender<PeekResponse>,
1646 ) -> Result<(), PeekError> {
1647 use PeekError::*;
1648
1649 let target_id = peek_target.id();
1651 if read_hold.id() != target_id {
1652 return Err(ReadHoldIdMismatch(read_hold.id()));
1653 }
1654 read_hold
1655 .try_downgrade(Antichain::from_elem(timestamp.clone()))
1656 .map_err(|_| ReadHoldInsufficient(target_id))?;
1657
1658 if let Some(target) = target_replica {
1659 if !self.replica_exists(target) {
1660 return Err(ReplicaMissing(target));
1661 }
1662 }
1663
1664 let otel_ctx = OpenTelemetryContext::obtain();
1665
1666 self.peeks.insert(
1667 uuid,
1668 PendingPeek {
1669 target_replica,
1670 otel_ctx: otel_ctx.clone(),
1672 requested_at: Instant::now(),
1673 read_hold,
1674 peek_response_tx,
1675 limit: finishing.limit.map(usize::cast_from),
1676 offset: finishing.offset,
1677 },
1678 );
1679
1680 let peek = Peek {
1681 literal_constraints,
1682 uuid,
1683 timestamp,
1684 finishing,
1685 map_filter_project,
1686 otel_ctx,
1689 target: peek_target,
1690 result_desc,
1691 };
1692 self.send(ComputeCommand::Peek(Box::new(peek)));
1693
1694 Ok(())
1695 }
1696
1697 #[mz_ore::instrument(level = "debug")]
1699 pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1700 let Some(peek) = self.peeks.get_mut(&uuid) else {
1701 tracing::warn!("did not find pending peek for {uuid}");
1702 return;
1703 };
1704
1705 let duration = peek.requested_at.elapsed();
1706 self.metrics
1707 .observe_peek_response(&PeekResponse::Canceled, duration);
1708
1709 let otel_ctx = peek.otel_ctx.clone();
1711 otel_ctx.attach_as_parent();
1712
1713 self.deliver_response(ComputeControllerResponse::PeekNotification(
1714 uuid,
1715 PeekNotification::Canceled,
1716 otel_ctx,
1717 ));
1718
1719 self.finish_peek(uuid, reason);
1722 }
1723
1724 #[mz_ore::instrument(level = "debug")]
1736 pub fn set_read_policy(
1737 &mut self,
1738 policies: Vec<(GlobalId, ReadPolicy<T>)>,
1739 ) -> Result<(), ReadPolicyError> {
1740 for (id, _policy) in &policies {
1743 let collection = self.collection(*id)?;
1744 if collection.read_policy.is_none() {
1745 return Err(ReadPolicyError::WriteOnlyCollection(*id));
1746 }
1747 }
1748
1749 for (id, new_policy) in policies {
1750 let collection = self.expect_collection_mut(id);
1751 let new_since = new_policy.frontier(collection.write_frontier().borrow());
1752 let _ = collection.implied_read_hold.try_downgrade(new_since);
1753 collection.read_policy = Some(new_policy);
1754 }
1755
1756 Ok(())
1757 }
1758
1759 #[mz_ore::instrument(level = "debug")]
1767 fn maybe_update_global_write_frontier(&mut self, id: GlobalId, new_frontier: Antichain<T>) {
1768 let collection = self.expect_collection_mut(id);
1769
1770 let advanced = collection.shared.lock_write_frontier(|f| {
1771 let advanced = PartialOrder::less_than(f, &new_frontier);
1772 if advanced {
1773 f.clone_from(&new_frontier);
1774 }
1775 advanced
1776 });
1777
1778 if !advanced {
1779 return;
1780 }
1781
1782 let new_since = match &collection.read_policy {
1784 Some(read_policy) => {
1785 read_policy.frontier(new_frontier.borrow())
1788 }
1789 None => {
1790 Antichain::from_iter(
1799 new_frontier
1800 .iter()
1801 .map(|t| t.step_back().unwrap_or_else(T::minimum)),
1802 )
1803 }
1804 };
1805 let _ = collection.implied_read_hold.try_downgrade(new_since);
1806
1807 self.deliver_response(ComputeControllerResponse::FrontierUpper {
1809 id,
1810 upper: new_frontier,
1811 });
1812 }
1813
1814 fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch<T>) {
1816 let Some(collection) = self.collections.get_mut(&id) else {
1817 soft_panic_or_log!(
1818 "read hold change for absent collection (id={id}, changes={update:?})"
1819 );
1820 return;
1821 };
1822
1823 let new_since = collection.shared.lock_read_capabilities(|caps| {
1824 let read_frontier = caps.frontier();
1827 for (time, diff) in update.iter() {
1828 let count = caps.count_for(time) + diff;
1829 assert!(
1830 count >= 0,
1831 "invalid read capabilities update: negative capability \
1832 (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1833 );
1834 assert!(
1835 count == 0 || read_frontier.less_equal(time),
1836 "invalid read capabilities update: frontier regression \
1837 (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1838 );
1839 }
1840
1841 let changes = caps.update_iter(update.drain());
1844
1845 let changed = changes.count() > 0;
1846 changed.then(|| caps.frontier().to_owned())
1847 });
1848
1849 let Some(new_since) = new_since else {
1850 return; };
1852
1853 for read_hold in collection.compute_dependencies.values_mut() {
1855 read_hold
1856 .try_downgrade(new_since.clone())
1857 .expect("frontiers don't regress");
1858 }
1859 for read_hold in collection.storage_dependencies.values_mut() {
1860 read_hold
1861 .try_downgrade(new_since.clone())
1862 .expect("frontiers don't regress");
1863 }
1864
1865 self.send(ComputeCommand::AllowCompaction {
1867 id,
1868 frontier: new_since,
1869 });
1870 }
1871
1872 fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1881 let Some(peek) = self.peeks.remove(&uuid) else {
1882 return;
1883 };
1884
1885 let _ = peek.peek_response_tx.send(response);
1887
1888 self.send(ComputeCommand::CancelPeek { uuid });
1891
1892 drop(peek.read_hold);
1893 }
1894
1895 fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse<T>) {
1898 if self
1900 .replicas
1901 .get(&replica_id)
1902 .filter(|replica| replica.epoch == epoch)
1903 .is_none()
1904 {
1905 return;
1906 }
1907
1908 match response {
1911 ComputeResponse::Frontiers(id, frontiers) => {
1912 self.handle_frontiers_response(id, frontiers, replica_id);
1913 }
1914 ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
1915 self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
1916 }
1917 ComputeResponse::CopyToResponse(id, response) => {
1918 self.handle_copy_to_response(id, response, replica_id);
1919 }
1920 ComputeResponse::SubscribeResponse(id, response) => {
1921 self.handle_subscribe_response(id, response, replica_id);
1922 }
1923 ComputeResponse::Status(response) => {
1924 self.handle_status_response(response, replica_id);
1925 }
1926 }
1927 }
1928
1929 fn handle_frontiers_response(
1932 &mut self,
1933 id: GlobalId,
1934 frontiers: FrontiersResponse<T>,
1935 replica_id: ReplicaId,
1936 ) {
1937 if !self.collections.contains_key(&id) {
1938 soft_panic_or_log!(
1939 "frontiers update for an unknown collection \
1940 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1941 );
1942 return;
1943 }
1944 let Some(replica) = self.replicas.get_mut(&replica_id) else {
1945 soft_panic_or_log!(
1946 "frontiers update for an unknown replica \
1947 (replica_id={replica_id}, frontiers={frontiers:?})"
1948 );
1949 return;
1950 };
1951 let Some(replica_collection) = replica.collections.get_mut(&id) else {
1952 soft_panic_or_log!(
1953 "frontiers update for an unknown replica collection \
1954 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1955 );
1956 return;
1957 };
1958
1959 if let Some(new_frontier) = frontiers.input_frontier {
1960 replica_collection.update_input_frontier(new_frontier.clone());
1961 }
1962 if let Some(new_frontier) = frontiers.output_frontier {
1963 replica_collection.update_output_frontier(new_frontier.clone());
1964 }
1965 if let Some(new_frontier) = frontiers.write_frontier {
1966 replica_collection.update_write_frontier(new_frontier.clone());
1967 self.maybe_update_global_write_frontier(id, new_frontier);
1968 }
1969 }
1970
1971 #[mz_ore::instrument(level = "debug")]
1972 fn handle_peek_response(
1973 &mut self,
1974 uuid: Uuid,
1975 response: PeekResponse,
1976 otel_ctx: OpenTelemetryContext,
1977 replica_id: ReplicaId,
1978 ) {
1979 otel_ctx.attach_as_parent();
1980
1981 let Some(peek) = self.peeks.get(&uuid) else {
1984 return;
1985 };
1986
1987 let target_replica = peek.target_replica.unwrap_or(replica_id);
1989 if target_replica != replica_id {
1990 return;
1991 }
1992
1993 let duration = peek.requested_at.elapsed();
1994 self.metrics.observe_peek_response(&response, duration);
1995
1996 let notification = PeekNotification::new(&response, peek.offset, peek.limit);
1997 self.deliver_response(ComputeControllerResponse::PeekNotification(
2000 uuid,
2001 notification,
2002 otel_ctx,
2003 ));
2004
2005 self.finish_peek(uuid, response)
2006 }
2007
2008 fn handle_copy_to_response(
2009 &mut self,
2010 sink_id: GlobalId,
2011 response: CopyToResponse,
2012 replica_id: ReplicaId,
2013 ) {
2014 if !self.collections.contains_key(&sink_id) {
2015 soft_panic_or_log!(
2016 "received response for an unknown copy-to \
2017 (sink_id={sink_id}, replica_id={replica_id})",
2018 );
2019 return;
2020 }
2021 let Some(replica) = self.replicas.get_mut(&replica_id) else {
2022 soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
2023 return;
2024 };
2025 let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
2026 soft_panic_or_log!(
2027 "copy-to response for an unknown replica collection \
2028 (sink_id={sink_id}, replica_id={replica_id})"
2029 );
2030 return;
2031 };
2032
2033 replica_collection.update_write_frontier(Antichain::new());
2037 replica_collection.update_input_frontier(Antichain::new());
2038 replica_collection.update_output_frontier(Antichain::new());
2039
2040 if !self.copy_tos.remove(&sink_id) {
2043 return;
2044 }
2045
2046 let result = match response {
2047 CopyToResponse::RowCount(count) => Ok(count),
2048 CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
2049 CopyToResponse::Dropped => {
2054 tracing::error!(
2055 %sink_id, %replica_id,
2056 "received `Dropped` response for a tracked copy to",
2057 );
2058 return;
2059 }
2060 };
2061
2062 self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2063 }
2064
2065 fn handle_subscribe_response(
2066 &mut self,
2067 subscribe_id: GlobalId,
2068 response: SubscribeResponse<T>,
2069 replica_id: ReplicaId,
2070 ) {
2071 if !self.collections.contains_key(&subscribe_id) {
2072 soft_panic_or_log!(
2073 "received response for an unknown subscribe \
2074 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2075 );
2076 return;
2077 }
2078 let Some(replica) = self.replicas.get_mut(&replica_id) else {
2079 soft_panic_or_log!(
2080 "subscribe response for an unknown replica (replica_id={replica_id})"
2081 );
2082 return;
2083 };
2084 let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2085 soft_panic_or_log!(
2086 "subscribe response for an unknown replica collection \
2087 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2088 );
2089 return;
2090 };
2091
2092 let write_frontier = match &response {
2096 SubscribeResponse::Batch(batch) => batch.upper.clone(),
2097 SubscribeResponse::DroppedAt(_) => Antichain::new(),
2098 };
2099
2100 replica_collection.update_write_frontier(write_frontier.clone());
2104 replica_collection.update_input_frontier(write_frontier.clone());
2105 replica_collection.update_output_frontier(write_frontier.clone());
2106
2107 let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2109 return;
2110 };
2111 let replica_targeted = subscribe.target_replica.unwrap_or(replica_id) == replica_id;
2112 if !replica_targeted {
2113 return;
2114 }
2115
2116 self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2122
2123 match response {
2124 SubscribeResponse::Batch(batch) => {
2125 let upper = batch.upper;
2126 let mut updates = batch.updates;
2127
2128 if PartialOrder::less_than(&subscribe.frontier, &upper) {
2131 let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2132
2133 if upper.is_empty() {
2134 self.subscribes.remove(&subscribe_id);
2136 } else {
2137 self.subscribes.insert(subscribe_id, subscribe);
2139 }
2140
2141 if let Ok(updates) = updates.as_mut() {
2142 updates.retain(|(time, _data, _diff)| lower.less_equal(time));
2143 }
2144 self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2145 subscribe_id,
2146 SubscribeBatch {
2147 lower,
2148 upper,
2149 updates,
2150 },
2151 ));
2152 }
2153 }
2154 SubscribeResponse::DroppedAt(frontier) => {
2155 tracing::error!(
2160 %subscribe_id,
2161 %replica_id,
2162 frontier = ?frontier.elements(),
2163 "received `DroppedAt` response for a tracked subscribe",
2164 );
2165 self.subscribes.remove(&subscribe_id);
2166 }
2167 }
2168 }
2169
2170 fn handle_status_response(&self, response: StatusResponse, _replica_id: ReplicaId) {
2171 match response {
2172 StatusResponse::Placeholder => {}
2173 }
2174 }
2175
2176 fn dependency_write_frontiers<'b>(
2178 &'b self,
2179 collection: &'b CollectionState<T>,
2180 ) -> impl Iterator<Item = Antichain<T>> + 'b {
2181 let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2182 let collection = self.collections.get(&dep_id);
2183 collection.map(|c| c.write_frontier())
2184 });
2185 let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2186 let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2187 frontiers.map(|f| f.write_frontier)
2188 });
2189
2190 compute_frontiers.chain(storage_frontiers)
2191 }
2192
2193 fn transitive_storage_dependency_write_frontiers<'b>(
2195 &'b self,
2196 collection: &'b CollectionState<T>,
2197 ) -> impl Iterator<Item = Antichain<T>> + 'b {
2198 let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2199 let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2200 let mut done = BTreeSet::new();
2201
2202 while let Some(id) = todo.pop() {
2203 if done.contains(&id) {
2204 continue;
2205 }
2206 if let Some(dep) = self.collections.get(&id) {
2207 storage_ids.extend(dep.storage_dependency_ids());
2208 todo.extend(dep.compute_dependency_ids())
2209 }
2210 done.insert(id);
2211 }
2212
2213 let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2214 let frontiers = self.storage_collections.collection_frontiers(id).ok();
2215 frontiers.map(|f| f.write_frontier)
2216 });
2217
2218 storage_frontiers
2219 }
2220
2221 fn downgrade_warmup_capabilities(&mut self) {
2234 let mut new_capabilities = BTreeMap::new();
2235 for (id, collection) in &self.collections {
2236 if collection.read_policy.is_none()
2240 && collection.shared.lock_write_frontier(|f| f.is_empty())
2241 {
2242 new_capabilities.insert(*id, Antichain::new());
2243 continue;
2244 }
2245
2246 let mut new_capability = Antichain::new();
2247 for frontier in self.dependency_write_frontiers(collection) {
2248 for time in frontier {
2249 new_capability.insert(time.step_back().unwrap_or(time));
2250 }
2251 }
2252
2253 new_capabilities.insert(*id, new_capability);
2254 }
2255
2256 for (id, new_capability) in new_capabilities {
2257 let collection = self.expect_collection_mut(id);
2258 let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2259 }
2260 }
2261
2262 fn forward_implied_capabilities(&mut self) {
2290 if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2291 return;
2292 }
2293 if !self.replicas.is_empty() {
2294 return;
2295 }
2296
2297 let mut new_capabilities = BTreeMap::new();
2298 for (id, collection) in &self.collections {
2299 let Some(read_policy) = &collection.read_policy else {
2300 continue;
2302 };
2303
2304 let mut dep_frontier = Antichain::new();
2308 for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2309 dep_frontier.extend(frontier);
2310 }
2311
2312 let new_capability = read_policy.frontier(dep_frontier.borrow());
2313 if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2314 new_capabilities.insert(*id, new_capability);
2315 }
2316 }
2317
2318 for (id, new_capability) in new_capabilities {
2319 let collection = self.expect_collection_mut(id);
2320 let _ = collection.implied_read_hold.try_downgrade(new_capability);
2321 }
2322 }
2323
2324 #[mz_ore::instrument(level = "debug")]
2330 pub fn maintain(&mut self) {
2331 self.rehydrate_failed_replicas();
2332 self.downgrade_warmup_capabilities();
2333 self.forward_implied_capabilities();
2334 self.schedule_collections();
2335 self.cleanup_collections();
2336 self.update_frontier_introspection();
2337 self.refresh_state_metrics();
2338 self.refresh_wallclock_lag();
2339 }
2340}
2341
2342#[derive(Debug)]
2347struct CollectionState<T: ComputeControllerTimestamp> {
2348 log_collection: bool,
2352 dropped: bool,
2358 scheduled: bool,
2361
2362 shared: SharedCollectionState<T>,
2364
2365 implied_read_hold: ReadHold<T>,
2372 warmup_read_hold: ReadHold<T>,
2380 read_policy: Option<ReadPolicy<T>>,
2386
2387 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2390 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2393
2394 introspection: CollectionIntrospection<T>,
2396
2397 wallclock_lag_histogram_stash: Option<
2404 BTreeMap<
2405 (
2406 WallclockLagHistogramPeriod,
2407 WallclockLag,
2408 BTreeMap<&'static str, String>,
2409 ),
2410 Diff,
2411 >,
2412 >,
2413}
2414
2415impl<T: ComputeControllerTimestamp> CollectionState<T> {
2416 fn new(
2418 collection_id: GlobalId,
2419 as_of: Antichain<T>,
2420 shared: SharedCollectionState<T>,
2421 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2422 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2423 read_hold_tx: read_holds::ChangeTx<T>,
2424 introspection: CollectionIntrospection<T>,
2425 ) -> Self {
2426 let since = as_of.clone();
2428 let upper = as_of;
2430
2431 assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2433 assert!(shared.lock_write_frontier(|f| f == &upper));
2434
2435 let implied_read_hold =
2439 ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2440 let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2441
2442 let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2443 shared.lock_read_capabilities(|c| {
2444 c.update_iter(updates);
2445 });
2446
2447 let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2451 true => None,
2452 false => Some(Default::default()),
2453 };
2454
2455 Self {
2456 log_collection: false,
2457 dropped: false,
2458 scheduled: false,
2459 shared,
2460 implied_read_hold,
2461 warmup_read_hold,
2462 read_policy: Some(ReadPolicy::ValidFrom(since)),
2463 storage_dependencies,
2464 compute_dependencies,
2465 introspection,
2466 wallclock_lag_histogram_stash,
2467 }
2468 }
2469
2470 fn new_log_collection(
2472 id: GlobalId,
2473 shared: SharedCollectionState<T>,
2474 read_hold_tx: read_holds::ChangeTx<T>,
2475 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2476 ) -> Self {
2477 let since = Antichain::from_elem(T::minimum());
2478 let introspection =
2479 CollectionIntrospection::new(id, introspection_tx, since.clone(), false, None, None);
2480 let mut state = Self::new(
2481 id,
2482 since,
2483 shared,
2484 Default::default(),
2485 Default::default(),
2486 read_hold_tx,
2487 introspection,
2488 );
2489 state.log_collection = true;
2490 state.scheduled = true;
2492 state
2493 }
2494
2495 fn read_frontier(&self) -> Antichain<T> {
2497 self.shared
2498 .lock_read_capabilities(|c| c.frontier().to_owned())
2499 }
2500
2501 fn write_frontier(&self) -> Antichain<T> {
2503 self.shared.lock_write_frontier(|f| f.clone())
2504 }
2505
2506 fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2507 self.storage_dependencies.keys().copied()
2508 }
2509
2510 fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2511 self.compute_dependencies.keys().copied()
2512 }
2513
2514 fn dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2516 self.compute_dependency_ids()
2517 .chain(self.storage_dependency_ids())
2518 }
2519}
2520
2521#[derive(Clone, Debug)]
2532pub(super) struct SharedCollectionState<T> {
2533 read_capabilities: Arc<Mutex<MutableAntichain<T>>>,
2545 write_frontier: Arc<Mutex<Antichain<T>>>,
2547}
2548
2549impl<T: Timestamp> SharedCollectionState<T> {
2550 pub fn new(as_of: Antichain<T>) -> Self {
2551 let since = as_of.clone();
2553 let upper = as_of;
2555
2556 let mut read_capabilities = MutableAntichain::new();
2560 read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2561
2562 Self {
2563 read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2564 write_frontier: Arc::new(Mutex::new(upper)),
2565 }
2566 }
2567
2568 pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2569 where
2570 F: FnOnce(&mut MutableAntichain<T>) -> R,
2571 {
2572 let mut caps = self.read_capabilities.lock().expect("poisoned");
2573 f(&mut *caps)
2574 }
2575
2576 pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2577 where
2578 F: FnOnce(&mut Antichain<T>) -> R,
2579 {
2580 let mut frontier = self.write_frontier.lock().expect("poisoned");
2581 f(&mut *frontier)
2582 }
2583}
2584
2585#[derive(Debug)]
2590struct CollectionIntrospection<T: ComputeControllerTimestamp> {
2591 collection_id: GlobalId,
2593 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2595 frontiers: Option<FrontiersIntrospectionState<T>>,
2600 refresh: Option<RefreshIntrospectionState<T>>,
2604}
2605
2606impl<T: ComputeControllerTimestamp> CollectionIntrospection<T> {
2607 fn new(
2608 collection_id: GlobalId,
2609 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2610 as_of: Antichain<T>,
2611 storage_sink: bool,
2612 initial_as_of: Option<Antichain<T>>,
2613 refresh_schedule: Option<RefreshSchedule>,
2614 ) -> Self {
2615 let refresh =
2616 match (refresh_schedule, initial_as_of) {
2617 (Some(refresh_schedule), Some(initial_as_of)) => Some(
2618 RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2619 ),
2620 (refresh_schedule, _) => {
2621 soft_assert_or_log!(
2624 refresh_schedule.is_none(),
2625 "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2626 );
2627 None
2628 }
2629 };
2630 let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2631
2632 let self_ = Self {
2633 collection_id,
2634 introspection_tx,
2635 frontiers,
2636 refresh,
2637 };
2638
2639 self_.report_initial_state();
2640 self_
2641 }
2642
2643 fn report_initial_state(&self) {
2645 if let Some(frontiers) = &self.frontiers {
2646 let row = frontiers.row_for_collection(self.collection_id);
2647 let updates = vec![(row, Diff::ONE)];
2648 self.send(IntrospectionType::Frontiers, updates);
2649 }
2650
2651 if let Some(refresh) = &self.refresh {
2652 let row = refresh.row_for_collection(self.collection_id);
2653 let updates = vec![(row, Diff::ONE)];
2654 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2655 }
2656 }
2657
2658 fn observe_frontiers(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2661 self.update_frontier_introspection(read_frontier, write_frontier);
2662 self.update_refresh_introspection(write_frontier);
2663 }
2664
2665 fn update_frontier_introspection(
2666 &mut self,
2667 read_frontier: &Antichain<T>,
2668 write_frontier: &Antichain<T>,
2669 ) {
2670 let Some(frontiers) = &mut self.frontiers else {
2671 return;
2672 };
2673
2674 if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2675 {
2676 return; };
2678
2679 let retraction = frontiers.row_for_collection(self.collection_id);
2680 frontiers.update(read_frontier, write_frontier);
2681 let insertion = frontiers.row_for_collection(self.collection_id);
2682 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2683 self.send(IntrospectionType::Frontiers, updates);
2684 }
2685
2686 fn update_refresh_introspection(&mut self, write_frontier: &Antichain<T>) {
2687 let Some(refresh) = &mut self.refresh else {
2688 return;
2689 };
2690
2691 let retraction = refresh.row_for_collection(self.collection_id);
2692 refresh.frontier_update(write_frontier);
2693 let insertion = refresh.row_for_collection(self.collection_id);
2694
2695 if retraction == insertion {
2696 return; }
2698
2699 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2700 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2701 }
2702
2703 fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2704 let _ = self.introspection_tx.send((introspection_type, updates));
2707 }
2708}
2709
2710impl<T: ComputeControllerTimestamp> Drop for CollectionIntrospection<T> {
2711 fn drop(&mut self) {
2712 if let Some(frontiers) = &self.frontiers {
2714 let row = frontiers.row_for_collection(self.collection_id);
2715 let updates = vec![(row, Diff::MINUS_ONE)];
2716 self.send(IntrospectionType::Frontiers, updates);
2717 }
2718
2719 if let Some(refresh) = &self.refresh {
2721 let retraction = refresh.row_for_collection(self.collection_id);
2722 let updates = vec![(retraction, Diff::MINUS_ONE)];
2723 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2724 }
2725 }
2726}
2727
2728#[derive(Debug)]
2729struct FrontiersIntrospectionState<T> {
2730 read_frontier: Antichain<T>,
2731 write_frontier: Antichain<T>,
2732}
2733
2734impl<T: ComputeControllerTimestamp> FrontiersIntrospectionState<T> {
2735 fn new(as_of: Antichain<T>) -> Self {
2736 Self {
2737 read_frontier: as_of.clone(),
2738 write_frontier: as_of,
2739 }
2740 }
2741
2742 fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2744 let read_frontier = self
2745 .read_frontier
2746 .as_option()
2747 .map_or(Datum::Null, |ts| ts.clone().into());
2748 let write_frontier = self
2749 .write_frontier
2750 .as_option()
2751 .map_or(Datum::Null, |ts| ts.clone().into());
2752 Row::pack_slice(&[
2753 Datum::String(&collection_id.to_string()),
2754 read_frontier,
2755 write_frontier,
2756 ])
2757 }
2758
2759 fn update(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2761 if read_frontier != &self.read_frontier {
2762 self.read_frontier.clone_from(read_frontier);
2763 }
2764 if write_frontier != &self.write_frontier {
2765 self.write_frontier.clone_from(write_frontier);
2766 }
2767 }
2768}
2769
2770#[derive(Debug)]
2773struct RefreshIntrospectionState<T> {
2774 refresh_schedule: RefreshSchedule,
2776 initial_as_of: Antichain<T>,
2777 next_refresh: Datum<'static>, last_completed_refresh: Datum<'static>, }
2781
2782impl<T> RefreshIntrospectionState<T> {
2783 fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2785 Row::pack_slice(&[
2786 Datum::String(&collection_id.to_string()),
2787 self.last_completed_refresh,
2788 self.next_refresh,
2789 ])
2790 }
2791}
2792
2793impl<T: ComputeControllerTimestamp> RefreshIntrospectionState<T> {
2794 fn new(
2797 refresh_schedule: RefreshSchedule,
2798 initial_as_of: Antichain<T>,
2799 upper: &Antichain<T>,
2800 ) -> Self {
2801 let mut self_ = Self {
2802 refresh_schedule: refresh_schedule.clone(),
2803 initial_as_of: initial_as_of.clone(),
2804 next_refresh: Datum::Null,
2805 last_completed_refresh: Datum::Null,
2806 };
2807 self_.frontier_update(upper);
2808 self_
2809 }
2810
2811 fn frontier_update(&mut self, write_frontier: &Antichain<T>) {
2814 if write_frontier.is_empty() {
2815 self.last_completed_refresh =
2816 if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2817 last_refresh.into()
2818 } else {
2819 T::maximum().into()
2822 };
2823 self.next_refresh = Datum::Null;
2824 } else {
2825 if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2826 self.last_completed_refresh = Datum::Null;
2828 let initial_as_of = self.initial_as_of.as_option().expect(
2829 "initial_as_of can't be [], because then there would be no refreshes at all",
2830 );
2831 let first_refresh = initial_as_of
2832 .round_up(&self.refresh_schedule)
2833 .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2834 soft_assert_or_log!(
2835 first_refresh == *initial_as_of,
2836 "initial_as_of should be set to the first refresh"
2837 );
2838 self.next_refresh = first_refresh.into();
2839 } else {
2840 let write_frontier = write_frontier.as_option().expect("checked above");
2842 self.last_completed_refresh = write_frontier
2843 .round_down_minus_1(&self.refresh_schedule)
2844 .map_or_else(
2845 || {
2846 soft_panic_or_log!(
2847 "rounding down should have returned the first refresh or later"
2848 );
2849 Datum::Null
2850 },
2851 |last_completed_refresh| last_completed_refresh.into(),
2852 );
2853 self.next_refresh = write_frontier.clone().into();
2854 }
2855 }
2856 }
2857}
2858
2859#[derive(Debug)]
2861struct PendingPeek<T: Timestamp> {
2862 target_replica: Option<ReplicaId>,
2866 otel_ctx: OpenTelemetryContext,
2868 requested_at: Instant,
2872 read_hold: ReadHold<T>,
2874 peek_response_tx: oneshot::Sender<PeekResponse>,
2876 limit: Option<usize>,
2878 offset: usize,
2880}
2881
2882#[derive(Debug, Clone)]
2883struct ActiveSubscribe<T> {
2884 frontier: Antichain<T>,
2886 target_replica: Option<ReplicaId>,
2890}
2891
2892impl<T: ComputeControllerTimestamp> ActiveSubscribe<T> {
2893 fn new(target_replica: Option<ReplicaId>) -> Self {
2894 Self {
2895 frontier: Antichain::from_elem(T::minimum()),
2896 target_replica,
2897 }
2898 }
2899}
2900
2901#[derive(Debug)]
2903struct ReplicaState<T: ComputeControllerTimestamp> {
2904 id: ReplicaId,
2906 client: ReplicaClient<T>,
2908 config: ReplicaConfig,
2910 metrics: ReplicaMetrics,
2912 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2914 collections: BTreeMap<GlobalId, ReplicaCollectionState<T>>,
2916 epoch: u64,
2918}
2919
2920impl<T: ComputeControllerTimestamp> ReplicaState<T> {
2921 fn new(
2922 id: ReplicaId,
2923 client: ReplicaClient<T>,
2924 config: ReplicaConfig,
2925 metrics: ReplicaMetrics,
2926 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2927 epoch: u64,
2928 ) -> Self {
2929 Self {
2930 id,
2931 client,
2932 config,
2933 metrics,
2934 introspection_tx,
2935 epoch,
2936 collections: Default::default(),
2937 }
2938 }
2939
2940 fn add_collection(
2946 &mut self,
2947 id: GlobalId,
2948 as_of: Antichain<T>,
2949 input_read_holds: Vec<ReadHold<T>>,
2950 ) {
2951 let metrics = self.metrics.for_collection(id);
2952 let introspection = ReplicaCollectionIntrospection::new(
2953 self.id,
2954 id,
2955 self.introspection_tx.clone(),
2956 as_of.clone(),
2957 );
2958 let mut state =
2959 ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
2960
2961 if id.is_transient() {
2965 state.wallclock_lag_max = None;
2966 }
2967
2968 if let Some(previous) = self.collections.insert(id, state) {
2969 panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
2970 }
2971 }
2972
2973 fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState<T>> {
2975 self.collections.remove(&id)
2976 }
2977
2978 fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
2980 self.collections.get(&id).map_or(true, |c| {
2981 c.write_frontier.is_empty()
2982 && c.input_frontier.is_empty()
2983 && c.output_frontier.is_empty()
2984 })
2985 }
2986
2987 #[mz_ore::instrument(level = "debug")]
2991 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2992 let Self {
2999 id,
3000 client: _,
3001 config: _,
3002 metrics: _,
3003 introspection_tx: _,
3004 epoch,
3005 collections,
3006 } = self;
3007
3008 fn field(
3009 key: &str,
3010 value: impl Serialize,
3011 ) -> Result<(String, serde_json::Value), anyhow::Error> {
3012 let value = serde_json::to_value(value)?;
3013 Ok((key.to_string(), value))
3014 }
3015
3016 let collections: BTreeMap<_, _> = collections
3017 .iter()
3018 .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
3019 .collect();
3020
3021 let map = serde_json::Map::from_iter([
3022 field("id", id.to_string())?,
3023 field("collections", collections)?,
3024 field("epoch", epoch)?,
3025 ]);
3026 Ok(serde_json::Value::Object(map))
3027 }
3028}
3029
3030#[derive(Debug)]
3031struct ReplicaCollectionState<T: ComputeControllerTimestamp> {
3032 write_frontier: Antichain<T>,
3036 input_frontier: Antichain<T>,
3040 output_frontier: Antichain<T>,
3044
3045 metrics: Option<ReplicaCollectionMetrics>,
3049 as_of: Antichain<T>,
3051 introspection: ReplicaCollectionIntrospection<T>,
3053 input_read_holds: Vec<ReadHold<T>>,
3059
3060 wallclock_lag_max: Option<WallclockLag>,
3064}
3065
3066impl<T: ComputeControllerTimestamp> ReplicaCollectionState<T> {
3067 fn new(
3068 metrics: Option<ReplicaCollectionMetrics>,
3069 as_of: Antichain<T>,
3070 introspection: ReplicaCollectionIntrospection<T>,
3071 input_read_holds: Vec<ReadHold<T>>,
3072 ) -> Self {
3073 Self {
3074 write_frontier: as_of.clone(),
3075 input_frontier: as_of.clone(),
3076 output_frontier: as_of.clone(),
3077 metrics,
3078 as_of,
3079 introspection,
3080 input_read_holds,
3081 wallclock_lag_max: Some(WallclockLag::MIN),
3082 }
3083 }
3084
3085 fn hydrated(&self) -> bool {
3087 self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3103 }
3104
3105 fn update_write_frontier(&mut self, new_frontier: Antichain<T>) {
3107 if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3108 soft_panic_or_log!(
3109 "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3110 self.write_frontier,
3111 );
3112 return;
3113 } else if new_frontier == self.write_frontier {
3114 return;
3115 }
3116
3117 self.write_frontier = new_frontier;
3118 }
3119
3120 fn update_input_frontier(&mut self, new_frontier: Antichain<T>) {
3122 if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3123 soft_panic_or_log!(
3124 "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3125 self.input_frontier,
3126 );
3127 return;
3128 } else if new_frontier == self.input_frontier {
3129 return;
3130 }
3131
3132 self.input_frontier = new_frontier;
3133
3134 for read_hold in &mut self.input_read_holds {
3136 let result = read_hold.try_downgrade(self.input_frontier.clone());
3137 soft_assert_or_log!(
3138 result.is_ok(),
3139 "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3140 self.input_frontier,
3141 );
3142 }
3143 }
3144
3145 fn update_output_frontier(&mut self, new_frontier: Antichain<T>) {
3147 if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3148 soft_panic_or_log!(
3149 "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3150 self.output_frontier,
3151 );
3152 return;
3153 } else if new_frontier == self.output_frontier {
3154 return;
3155 }
3156
3157 self.output_frontier = new_frontier;
3158 }
3159}
3160
3161#[derive(Debug)]
3164struct ReplicaCollectionIntrospection<T: ComputeControllerTimestamp> {
3165 replica_id: ReplicaId,
3167 collection_id: GlobalId,
3169 write_frontier: Antichain<T>,
3171 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3173}
3174
3175impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
3176 fn new(
3178 replica_id: ReplicaId,
3179 collection_id: GlobalId,
3180 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3181 as_of: Antichain<T>,
3182 ) -> Self {
3183 let self_ = Self {
3184 replica_id,
3185 collection_id,
3186 write_frontier: as_of,
3187 introspection_tx,
3188 };
3189
3190 self_.report_initial_state();
3191 self_
3192 }
3193
3194 fn report_initial_state(&self) {
3196 let row = self.write_frontier_row();
3197 let updates = vec![(row, Diff::ONE)];
3198 self.send(IntrospectionType::ReplicaFrontiers, updates);
3199 }
3200
3201 fn observe_frontier(&mut self, write_frontier: &Antichain<T>) {
3203 if self.write_frontier == *write_frontier {
3204 return; }
3206
3207 let retraction = self.write_frontier_row();
3208 self.write_frontier.clone_from(write_frontier);
3209 let insertion = self.write_frontier_row();
3210
3211 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3212 self.send(IntrospectionType::ReplicaFrontiers, updates);
3213 }
3214
3215 fn write_frontier_row(&self) -> Row {
3217 let write_frontier = self
3218 .write_frontier
3219 .as_option()
3220 .map_or(Datum::Null, |ts| ts.clone().into());
3221 Row::pack_slice(&[
3222 Datum::String(&self.collection_id.to_string()),
3223 Datum::String(&self.replica_id.to_string()),
3224 write_frontier,
3225 ])
3226 }
3227
3228 fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3229 let _ = self.introspection_tx.send((introspection_type, updates));
3232 }
3233}
3234
3235impl<T: ComputeControllerTimestamp> Drop for ReplicaCollectionIntrospection<T> {
3236 fn drop(&mut self) {
3237 let row = self.write_frontier_row();
3239 let updates = vec![(row, Diff::MINUS_ONE)];
3240 self.send(IntrospectionType::ReplicaFrontiers, updates);
3241 }
3242}