1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
21use mz_compute_types::plan::render_plan::RenderPlan;
22use mz_compute_types::sinks::{
23 ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, MaterializedViewSinkConnection,
24};
25use mz_compute_types::sources::SourceInstanceDesc;
26use mz_controller_types::dyncfgs::{
27 ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, WALLCLOCK_LAG_RECORDING_INTERVAL,
28};
29use mz_dyncfg::ConfigSet;
30use mz_expr::RowSetFinishing;
31use mz_ore::cast::CastFrom;
32use mz_ore::channel::instrumented_unbounded_channel;
33use mz_ore::now::NowFn;
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{soft_assert_or_log, soft_panic_or_log};
36use mz_persist_types::PersistLocation;
37use mz_repr::adt::timestamp::CheckedTimestamp;
38use mz_repr::refresh_schedule::RefreshSchedule;
39use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row};
40use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
41use mz_storage_types::read_holds::{self, ReadHold};
42use mz_storage_types::read_policy::ReadPolicy;
43use thiserror::Error;
44use timely::PartialOrder;
45use timely::progress::frontier::MutableAntichain;
46use timely::progress::{Antichain, ChangeBatch, Timestamp};
47use tokio::sync::{mpsc, oneshot};
48use uuid::Uuid;
49
50use crate::controller::error::{
51 CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
52};
53use crate::controller::instance_client::PeekError;
54use crate::controller::replica::{ReplicaClient, ReplicaConfig};
55use crate::controller::{
56 ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, PeekNotification,
57 ReplicaId, StorageCollections,
58};
59use crate::logging::LogVariant;
60use crate::metrics::IntCounter;
61use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
62use crate::protocol::command::{
63 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
64};
65use crate::protocol::history::ComputeCommandHistory;
66use crate::protocol::response::{
67 ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StatusResponse,
68 SubscribeBatch, SubscribeResponse,
69};
70
71#[derive(Error, Debug)]
72#[error("replica exists already: {0}")]
73pub(super) struct ReplicaExists(pub ReplicaId);
74
75#[derive(Error, Debug)]
76#[error("replica does not exist: {0}")]
77pub(super) struct ReplicaMissing(pub ReplicaId);
78
79#[derive(Error, Debug)]
80pub(super) enum DataflowCreationError {
81 #[error("collection does not exist: {0}")]
82 CollectionMissing(GlobalId),
83 #[error("replica does not exist: {0}")]
84 ReplicaMissing(ReplicaId),
85 #[error("dataflow definition lacks an as_of value")]
86 MissingAsOf,
87 #[error("subscribe dataflow has an empty as_of")]
88 EmptyAsOfForSubscribe,
89 #[error("copy to dataflow has an empty as_of")]
90 EmptyAsOfForCopyTo,
91 #[error("no read hold provided for dataflow import: {0}")]
92 ReadHoldMissing(GlobalId),
93 #[error("insufficient read hold provided for dataflow import: {0}")]
94 ReadHoldInsufficient(GlobalId),
95}
96
97impl From<CollectionMissing> for DataflowCreationError {
98 fn from(error: CollectionMissing) -> Self {
99 Self::CollectionMissing(error.0)
100 }
101}
102
103#[derive(Error, Debug)]
104pub(super) enum ReadPolicyError {
105 #[error("collection does not exist: {0}")]
106 CollectionMissing(GlobalId),
107 #[error("collection is write-only: {0}")]
108 WriteOnlyCollection(GlobalId),
109}
110
111impl From<CollectionMissing> for ReadPolicyError {
112 fn from(error: CollectionMissing) -> Self {
113 Self::CollectionMissing(error.0)
114 }
115}
116
117pub(super) type Command<T> = Box<dyn FnOnce(&mut Instance<T>) + Send>;
119
120pub(super) type ReplicaResponse<T> = (ReplicaId, u64, ComputeResponse<T>);
123
124pub(super) struct Instance<T: ComputeControllerTimestamp> {
126 build_info: &'static BuildInfo,
128 storage_collections: StorageCollections<T>,
130 initialized: bool,
132 read_only: bool,
137 workload_class: Option<String>,
141 replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
143 collections: BTreeMap<GlobalId, CollectionState<T>>,
151 log_sources: BTreeMap<LogVariant, GlobalId>,
153 peeks: BTreeMap<Uuid, PendingPeek<T>>,
162 subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>,
176 copy_tos: BTreeSet<GlobalId>,
184 history: ComputeCommandHistory<UIntGauge, T>,
186 command_rx: mpsc::UnboundedReceiver<Command<T>>,
188 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
190 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
192 metrics: InstanceMetrics,
194 dyncfg: Arc<ConfigSet>,
196
197 peek_stash_persist_location: PersistLocation,
199
200 now: NowFn,
202 wallclock_lag: WallclockLagFn<T>,
204 wallclock_lag_last_recorded: DateTime<Utc>,
206
207 read_hold_tx: read_holds::ChangeTx<T>,
212 replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
214 replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse<T>, IntCounter>,
216}
217
218impl<T: ComputeControllerTimestamp> Instance<T> {
219 fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, CollectionMissing> {
221 self.collections.get(&id).ok_or(CollectionMissing(id))
222 }
223
224 fn collection_mut(
226 &mut self,
227 id: GlobalId,
228 ) -> Result<&mut CollectionState<T>, CollectionMissing> {
229 self.collections.get_mut(&id).ok_or(CollectionMissing(id))
230 }
231
232 fn expect_collection(&self, id: GlobalId) -> &CollectionState<T> {
238 self.collections.get(&id).expect("collection must exist")
239 }
240
241 fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T> {
247 self.collections
248 .get_mut(&id)
249 .expect("collection must exist")
250 }
251
252 fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState<T>)> {
253 self.collections.iter().map(|(id, coll)| (*id, coll))
254 }
255
256 fn add_collection(
262 &mut self,
263 id: GlobalId,
264 as_of: Antichain<T>,
265 shared: SharedCollectionState<T>,
266 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
267 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
268 replica_input_read_holds: Vec<ReadHold<T>>,
269 write_only: bool,
270 storage_sink: bool,
271 initial_as_of: Option<Antichain<T>>,
272 refresh_schedule: Option<RefreshSchedule>,
273 ) {
274 let introspection = CollectionIntrospection::new(
276 id,
277 self.introspection_tx.clone(),
278 as_of.clone(),
279 storage_sink,
280 initial_as_of,
281 refresh_schedule,
282 );
283 let mut state = CollectionState::new(
284 id,
285 as_of.clone(),
286 shared,
287 storage_dependencies,
288 compute_dependencies,
289 Arc::clone(&self.read_hold_tx),
290 introspection,
291 );
292 if write_only {
294 state.read_policy = None;
295 }
296
297 if let Some(previous) = self.collections.insert(id, state) {
298 panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
299 }
300
301 for replica in self.replicas.values_mut() {
303 replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
304 }
305
306 self.report_dependency_updates(id, Diff::ONE);
308 }
309
310 fn remove_collection(&mut self, id: GlobalId) {
311 self.report_dependency_updates(id, Diff::MINUS_ONE);
313
314 for replica in self.replicas.values_mut() {
316 replica.remove_collection(id);
317 }
318
319 self.collections.remove(&id);
321 }
322
323 fn add_replica_state(
324 &mut self,
325 id: ReplicaId,
326 client: ReplicaClient<T>,
327 config: ReplicaConfig,
328 epoch: u64,
329 ) {
330 let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
331
332 let metrics = self.metrics.for_replica(id);
333 let mut replica = ReplicaState::new(
334 id,
335 client,
336 config,
337 metrics,
338 self.introspection_tx.clone(),
339 epoch,
340 );
341
342 for (collection_id, collection) in &self.collections {
344 if collection.log_collection && !log_ids.contains(collection_id) {
346 continue;
347 }
348
349 let as_of = if collection.log_collection {
350 Antichain::from_elem(T::minimum())
355 } else {
356 collection.read_frontier().to_owned()
357 };
358
359 let input_read_holds = collection.storage_dependencies.values().cloned().collect();
360 replica.add_collection(*collection_id, as_of, input_read_holds);
361 }
362
363 self.replicas.insert(id, replica);
364 }
365
366 fn deliver_response(&self, response: ComputeControllerResponse<T>) {
368 let _ = self.response_tx.send(response);
371 }
372
373 fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
375 let _ = self.introspection_tx.send((type_, updates));
378 }
379
380 fn replica_exists(&self, id: ReplicaId) -> bool {
382 self.replicas.contains_key(&id)
383 }
384
385 fn peeks_targeting(
387 &self,
388 replica_id: ReplicaId,
389 ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)> {
390 self.peeks.iter().filter_map(move |(uuid, peek)| {
391 if peek.target_replica == Some(replica_id) {
392 Some((*uuid, peek))
393 } else {
394 None
395 }
396 })
397 }
398
399 fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
401 self.subscribes.iter().filter_map(move |(id, subscribe)| {
402 let targeting = subscribe.target_replica == Some(replica_id);
403 targeting.then_some(*id)
404 })
405 }
406
407 fn update_frontier_introspection(&mut self) {
416 for collection in self.collections.values_mut() {
417 collection
418 .introspection
419 .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
420 }
421
422 for replica in self.replicas.values_mut() {
423 for collection in replica.collections.values_mut() {
424 collection
425 .introspection
426 .observe_frontier(&collection.write_frontier);
427 }
428 }
429 }
430
431 fn refresh_state_metrics(&self) {
440 let unscheduled_collections_count =
441 self.collections.values().filter(|c| !c.scheduled).count();
442 let connected_replica_count = self
443 .replicas
444 .values()
445 .filter(|r| r.client.is_connected())
446 .count();
447
448 self.metrics
449 .replica_count
450 .set(u64::cast_from(self.replicas.len()));
451 self.metrics
452 .collection_count
453 .set(u64::cast_from(self.collections.len()));
454 self.metrics
455 .collection_unscheduled_count
456 .set(u64::cast_from(unscheduled_collections_count));
457 self.metrics
458 .peek_count
459 .set(u64::cast_from(self.peeks.len()));
460 self.metrics
461 .subscribe_count
462 .set(u64::cast_from(self.subscribes.len()));
463 self.metrics
464 .copy_to_count
465 .set(u64::cast_from(self.copy_tos.len()));
466 self.metrics
467 .connected_replica_count
468 .set(u64::cast_from(connected_replica_count));
469 }
470
471 fn refresh_wallclock_lag(&mut self) {
490 let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
491 Some(ts) => (self.wallclock_lag)(ts.clone()),
492 None => Duration::ZERO,
493 };
494
495 let now_ms = (self.now)();
496 let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
497 let histogram_labels = match &self.workload_class {
498 Some(wc) => [("workload_class", wc.clone())].into(),
499 None => BTreeMap::new(),
500 };
501
502 let mut unreadable_collections = BTreeSet::new();
506 for (id, collection) in &mut self.collections {
507 let read_frontier = match self.storage_collections.collection_frontiers(*id) {
509 Ok(f) => f.read_capabilities,
510 Err(_) => collection.read_frontier(),
511 };
512 let write_frontier = collection.write_frontier();
513 let collection_unreadable = PartialOrder::less_equal(&write_frontier, &read_frontier);
514 if collection_unreadable {
515 unreadable_collections.insert(id);
516 }
517
518 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
519 let bucket = if collection_unreadable {
520 WallclockLag::Undefined
521 } else {
522 let lag = frontier_lag(&write_frontier);
523 let lag = lag.as_secs().next_power_of_two();
524 WallclockLag::Seconds(lag)
525 };
526
527 let key = (histogram_period, bucket, histogram_labels.clone());
528 *stash.entry(key).or_default() += Diff::ONE;
529 }
530 }
531
532 for replica in self.replicas.values_mut() {
534 for (id, collection) in &mut replica.collections {
535 let lag = if unreadable_collections.contains(&id) {
536 WallclockLag::Undefined
537 } else {
538 let lag = frontier_lag(&collection.write_frontier);
539 WallclockLag::Seconds(lag.as_secs())
540 };
541
542 if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
543 *wallclock_lag_max = (*wallclock_lag_max).max(lag);
544 }
545
546 if let Some(metrics) = &mut collection.metrics {
547 let secs = lag.unwrap_seconds_or(u64::MAX);
550 metrics.wallclock_lag.observe(secs);
551 };
552 }
553 }
554
555 self.maybe_record_wallclock_lag();
557 }
558
559 fn maybe_record_wallclock_lag(&mut self) {
567 if self.read_only {
568 return;
569 }
570
571 let duration_trunc = |datetime: DateTime<_>, interval| {
572 let td = TimeDelta::from_std(interval).ok()?;
573 datetime.duration_trunc(td).ok()
574 };
575
576 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
577 let now_dt = mz_ore::now::to_datetime((self.now)());
578 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
579 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
580 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
581 duration_trunc(now_dt, *default).unwrap()
582 });
583 if now_trunc <= self.wallclock_lag_last_recorded {
584 return;
585 }
586
587 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
588
589 let mut history_updates = Vec::new();
590 for (replica_id, replica) in &mut self.replicas {
591 for (collection_id, collection) in &mut replica.collections {
592 let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
593 continue;
594 };
595
596 let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
597 let row = Row::pack_slice(&[
598 Datum::String(&collection_id.to_string()),
599 Datum::String(&replica_id.to_string()),
600 max_lag.into_interval_datum(),
601 Datum::TimestampTz(now_ts),
602 ]);
603 history_updates.push((row, Diff::ONE));
604 }
605 }
606 if !history_updates.is_empty() {
607 self.deliver_introspection_updates(
608 IntrospectionType::WallclockLagHistory,
609 history_updates,
610 );
611 }
612
613 let mut histogram_updates = Vec::new();
614 let mut row_buf = Row::default();
615 for (collection_id, collection) in &mut self.collections {
616 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
617 continue;
618 };
619
620 for ((period, lag, labels), count) in std::mem::take(stash) {
621 let mut packer = row_buf.packer();
622 packer.extend([
623 Datum::TimestampTz(period.start),
624 Datum::TimestampTz(period.end),
625 Datum::String(&collection_id.to_string()),
626 lag.into_uint64_datum(),
627 ]);
628 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
629 packer.push_dict(labels);
630
631 histogram_updates.push((row_buf.clone(), count));
632 }
633 }
634 if !histogram_updates.is_empty() {
635 self.deliver_introspection_updates(
636 IntrospectionType::WallclockLagHistogram,
637 histogram_updates,
638 );
639 }
640
641 self.wallclock_lag_last_recorded = now_trunc;
642 }
643
644 fn report_dependency_updates(&self, id: GlobalId, diff: Diff) {
650 let collection = self.expect_collection(id);
651 let dependencies = collection.dependency_ids();
652
653 let updates = dependencies
654 .map(|dependency_id| {
655 let row = Row::pack_slice(&[
656 Datum::String(&id.to_string()),
657 Datum::String(&dependency_id.to_string()),
658 ]);
659 (row, diff)
660 })
661 .collect();
662
663 self.deliver_introspection_updates(IntrospectionType::ComputeDependencies, updates);
664 }
665
666 #[mz_ore::instrument(level = "debug")]
672 pub fn collection_hydrated(&self, collection_id: GlobalId) -> Result<bool, CollectionMissing> {
673 if self.replicas.is_empty() {
674 return Ok(true);
675 }
676 for replica_state in self.replicas.values() {
677 let collection_state = replica_state
678 .collections
679 .get(&collection_id)
680 .ok_or(CollectionMissing(collection_id))?;
681
682 if collection_state.hydrated() {
683 return Ok(true);
684 }
685 }
686
687 Ok(false)
688 }
689
690 #[mz_ore::instrument(level = "debug")]
696 pub fn collections_hydrated_on_replicas(
697 &self,
698 target_replica_ids: Option<Vec<ReplicaId>>,
699 exclude_collections: &BTreeSet<GlobalId>,
700 ) -> Result<bool, HydrationCheckBadTarget> {
701 if self.replicas.is_empty() {
702 return Ok(true);
703 }
704 let mut all_hydrated = true;
705 let target_replicas: BTreeSet<ReplicaId> = self
706 .replicas
707 .keys()
708 .filter_map(|id| match target_replica_ids {
709 None => Some(id.clone()),
710 Some(ref ids) if ids.contains(id) => Some(id.clone()),
711 Some(_) => None,
712 })
713 .collect();
714 if let Some(targets) = target_replica_ids {
715 if target_replicas.is_empty() {
716 return Err(HydrationCheckBadTarget(targets));
717 }
718 }
719
720 for (id, _collection) in self.collections_iter() {
721 if id.is_transient() || exclude_collections.contains(&id) {
722 continue;
723 }
724
725 let mut collection_hydrated = false;
726 for replica_state in self.replicas.values() {
727 if !target_replicas.contains(&replica_state.id) {
728 continue;
729 }
730 let collection_state = replica_state
731 .collections
732 .get(&id)
733 .expect("missing collection state");
734
735 if collection_state.hydrated() {
736 collection_hydrated = true;
737 break;
738 }
739 }
740
741 if !collection_hydrated {
742 tracing::info!("collection {id} is not hydrated on any replica");
743 all_hydrated = false;
744 }
747 }
748
749 Ok(all_hydrated)
750 }
751
752 fn cleanup_collections(&mut self) {
768 let to_remove: Vec<_> = self
769 .collections_iter()
770 .filter(|(id, collection)| {
771 collection.dropped
772 && collection.shared.lock_read_capabilities(|c| c.is_empty())
773 && self
774 .replicas
775 .values()
776 .all(|r| r.collection_frontiers_empty(*id))
777 })
778 .map(|(id, _collection)| id)
779 .collect();
780
781 for id in to_remove {
782 self.remove_collection(id);
783 }
784 }
785
786 #[mz_ore::instrument(level = "debug")]
790 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
791 let Self {
798 build_info: _,
799 storage_collections: _,
800 peek_stash_persist_location: _,
801 initialized,
802 read_only,
803 workload_class,
804 replicas,
805 collections,
806 log_sources: _,
807 peeks,
808 subscribes,
809 copy_tos,
810 history: _,
811 command_rx: _,
812 response_tx: _,
813 introspection_tx: _,
814 metrics: _,
815 dyncfg: _,
816 now: _,
817 wallclock_lag: _,
818 wallclock_lag_last_recorded,
819 read_hold_tx: _,
820 replica_tx: _,
821 replica_rx: _,
822 } = self;
823
824 let replicas: BTreeMap<_, _> = replicas
825 .iter()
826 .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
827 .collect::<Result<_, anyhow::Error>>()?;
828 let collections: BTreeMap<_, _> = collections
829 .iter()
830 .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
831 .collect();
832 let peeks: BTreeMap<_, _> = peeks
833 .iter()
834 .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
835 .collect();
836 let subscribes: BTreeMap<_, _> = subscribes
837 .iter()
838 .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
839 .collect();
840 let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
841 let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
842
843 Ok(serde_json::json!({
844 "initialized": initialized,
845 "read_only": read_only,
846 "workload_class": workload_class,
847 "replicas": replicas,
848 "collections": collections,
849 "peeks": peeks,
850 "subscribes": subscribes,
851 "copy_tos": copy_tos,
852 "wallclock_lag_last_recorded": wallclock_lag_last_recorded,
853 }))
854 }
855
856 pub(super) fn collection_write_frontier(
858 &self,
859 id: GlobalId,
860 ) -> Result<Antichain<T>, CollectionMissing> {
861 Ok(self.collection(id)?.write_frontier())
862 }
863}
864
865impl<T> Instance<T>
866where
867 T: ComputeControllerTimestamp,
868{
869 pub(super) fn new(
870 build_info: &'static BuildInfo,
871 storage: StorageCollections<T>,
872 peek_stash_persist_location: PersistLocation,
873 arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
874 metrics: InstanceMetrics,
875 now: NowFn,
876 wallclock_lag: WallclockLagFn<T>,
877 dyncfg: Arc<ConfigSet>,
878 command_rx: mpsc::UnboundedReceiver<Command<T>>,
879 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
880 read_hold_tx: read_holds::ChangeTx<T>,
881 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
882 read_only: bool,
883 ) -> Self {
884 let mut collections = BTreeMap::new();
885 let mut log_sources = BTreeMap::new();
886 for (log, id, shared) in arranged_logs {
887 let collection = CollectionState::new_log_collection(
888 id,
889 shared,
890 Arc::clone(&read_hold_tx),
891 introspection_tx.clone(),
892 );
893 collections.insert(id, collection);
894 log_sources.insert(log, id);
895 }
896
897 let history = ComputeCommandHistory::new(metrics.for_history());
898
899 let send_count = metrics.response_send_count.clone();
900 let recv_count = metrics.response_recv_count.clone();
901 let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
902
903 let now_dt = mz_ore::now::to_datetime(now());
904
905 Self {
906 build_info,
907 storage_collections: storage,
908 peek_stash_persist_location,
909 initialized: false,
910 read_only,
911 workload_class: None,
912 replicas: Default::default(),
913 collections,
914 log_sources,
915 peeks: Default::default(),
916 subscribes: Default::default(),
917 copy_tos: Default::default(),
918 history,
919 command_rx,
920 response_tx,
921 introspection_tx,
922 metrics,
923 dyncfg,
924 now,
925 wallclock_lag,
926 wallclock_lag_last_recorded: now_dt,
927 read_hold_tx,
928 replica_tx,
929 replica_rx,
930 }
931 }
932
933 pub(super) async fn run(mut self) {
934 self.send(ComputeCommand::Hello {
935 nonce: Uuid::default(),
938 });
939
940 let instance_config = InstanceConfig {
941 peek_stash_persist_location: self.peek_stash_persist_location.clone(),
942 logging: Default::default(),
945 expiration_offset: Default::default(),
946 };
947
948 self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
949
950 loop {
951 tokio::select! {
952 command = self.command_rx.recv() => match command {
953 Some(cmd) => cmd(&mut self),
954 None => break,
955 },
956 response = self.replica_rx.recv() => match response {
957 Some(response) => self.handle_response(response),
958 None => unreachable!("self owns a sender side of the channel"),
959 }
960 }
961 }
962 }
963
964 #[mz_ore::instrument(level = "debug")]
966 pub fn update_configuration(&mut self, config_params: ComputeParameters) {
967 if let Some(workload_class) = &config_params.workload_class {
968 self.workload_class = workload_class.clone();
969 }
970
971 let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
972 self.send(command);
973 }
974
975 #[mz_ore::instrument(level = "debug")]
980 pub fn initialization_complete(&mut self) {
981 if !self.initialized {
983 self.send(ComputeCommand::InitializationComplete);
984 self.initialized = true;
985 }
986 }
987
988 #[mz_ore::instrument(level = "debug")]
992 pub fn allow_writes(&mut self, collection_id: GlobalId) -> Result<(), CollectionMissing> {
993 let collection = self.collection_mut(collection_id)?;
994
995 if !collection.read_only {
997 return Ok(());
998 }
999
1000 let as_of = collection.read_frontier();
1002
1003 if as_of.is_empty() {
1006 return Ok(());
1007 }
1008
1009 collection.read_only = false;
1010 self.send(ComputeCommand::AllowWrites(collection_id));
1011
1012 Ok(())
1013 }
1014
1015 #[mz_ore::instrument(level = "debug")]
1026 pub fn shutdown(&mut self) {
1027 let (_tx, rx) = mpsc::unbounded_channel();
1029 let mut command_rx = std::mem::replace(&mut self.command_rx, rx);
1030
1031 while let Ok(cmd) = command_rx.try_recv() {
1037 cmd(self);
1038 }
1039
1040 self.cleanup_collections();
1042
1043 let stray_replicas: Vec<_> = self.replicas.keys().collect();
1044 soft_assert_or_log!(
1045 stray_replicas.is_empty(),
1046 "dropped instance still has provisioned replicas: {stray_replicas:?}",
1047 );
1048
1049 let collections = self.collections.iter();
1050 let stray_collections: Vec<_> = collections
1051 .filter(|(_, c)| !c.log_collection)
1052 .map(|(id, _)| id)
1053 .collect();
1054 soft_assert_or_log!(
1055 stray_collections.is_empty(),
1056 "dropped instance still has installed collections: {stray_collections:?}",
1057 );
1058 }
1059
1060 #[mz_ore::instrument(level = "debug")]
1062 fn send(&mut self, cmd: ComputeCommand<T>) {
1063 self.history.push(cmd.clone());
1065
1066 for replica in self.replicas.values_mut() {
1068 let _ = replica.client.send(cmd.clone());
1070 }
1071 }
1072
1073 #[mz_ore::instrument(level = "debug")]
1075 pub fn add_replica(
1076 &mut self,
1077 id: ReplicaId,
1078 mut config: ReplicaConfig,
1079 epoch: Option<u64>,
1080 ) -> Result<(), ReplicaExists> {
1081 if self.replica_exists(id) {
1082 return Err(ReplicaExists(id));
1083 }
1084
1085 config.logging.index_logs = self.log_sources.clone();
1086
1087 let epoch = epoch.unwrap_or(1);
1088 let metrics = self.metrics.for_replica(id);
1089 let client = ReplicaClient::spawn(
1090 id,
1091 self.build_info,
1092 config.clone(),
1093 epoch,
1094 metrics.clone(),
1095 Arc::clone(&self.dyncfg),
1096 self.replica_tx.clone(),
1097 );
1098
1099 self.history.reduce();
1101
1102 self.history.update_source_uppers(&self.storage_collections);
1104
1105 for command in self.history.iter() {
1107 if client.send(command.clone()).is_err() {
1108 tracing::warn!("Replica {:?} connection terminated during hydration", id);
1111 break;
1112 }
1113 }
1114
1115 self.add_replica_state(id, client, config, epoch);
1117
1118 Ok(())
1119 }
1120
1121 #[mz_ore::instrument(level = "debug")]
1123 pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1124 self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1125
1126 let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1130 for subscribe_id in to_drop {
1131 let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1132 let response = ComputeControllerResponse::SubscribeResponse(
1133 subscribe_id,
1134 SubscribeBatch {
1135 lower: subscribe.frontier.clone(),
1136 upper: subscribe.frontier,
1137 updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1138 },
1139 );
1140 self.deliver_response(response);
1141 }
1142
1143 let mut peek_responses = Vec::new();
1148 let mut to_drop = Vec::new();
1149 for (uuid, peek) in self.peeks_targeting(id) {
1150 peek_responses.push(ComputeControllerResponse::PeekNotification(
1151 uuid,
1152 PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1153 peek.otel_ctx.clone(),
1154 ));
1155 to_drop.push(uuid);
1156 }
1157 for response in peek_responses {
1158 self.deliver_response(response);
1159 }
1160 for uuid in to_drop {
1161 let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1162 self.finish_peek(uuid, response);
1163 }
1164
1165 self.forward_implied_capabilities();
1168
1169 Ok(())
1170 }
1171
1172 fn rehydrate_replica(&mut self, id: ReplicaId) {
1178 let config = self.replicas[&id].config.clone();
1179 let epoch = self.replicas[&id].epoch + 1;
1180
1181 self.remove_replica(id).expect("replica must exist");
1182 let result = self.add_replica(id, config, Some(epoch));
1183
1184 match result {
1185 Ok(()) => (),
1186 Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1187 }
1188 }
1189
1190 fn rehydrate_failed_replicas(&mut self) {
1192 let replicas = self.replicas.iter();
1193 let failed_replicas: Vec<_> = replicas
1194 .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1195 .collect();
1196
1197 for replica_id in failed_replicas {
1198 self.rehydrate_replica(replica_id);
1199 }
1200 }
1201
1202 #[mz_ore::instrument(level = "debug")]
1211 pub fn create_dataflow(
1212 &mut self,
1213 dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1214 import_read_holds: Vec<ReadHold<T>>,
1215 subscribe_target_replica: Option<ReplicaId>,
1216 mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState<T>>,
1217 ) -> Result<(), DataflowCreationError> {
1218 use DataflowCreationError::*;
1219
1220 if let Some(replica_id) = subscribe_target_replica {
1221 if !self.replica_exists(replica_id) {
1222 return Err(ReplicaMissing(replica_id));
1223 }
1224 }
1225
1226 let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1228 if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1229 return Err(EmptyAsOfForSubscribe);
1230 }
1231 if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1232 return Err(EmptyAsOfForCopyTo);
1233 }
1234
1235 let mut storage_dependencies = BTreeMap::new();
1237 let mut compute_dependencies = BTreeMap::new();
1238
1239 let mut replica_input_read_holds = Vec::new();
1244
1245 let mut import_read_holds: BTreeMap<_, _> =
1246 import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1247
1248 for &id in dataflow.source_imports.keys() {
1249 let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1250 replica_input_read_holds.push(read_hold.clone());
1251
1252 read_hold
1253 .try_downgrade(as_of.clone())
1254 .map_err(|_| ReadHoldInsufficient(id))?;
1255 storage_dependencies.insert(id, read_hold);
1256 }
1257
1258 for &id in dataflow.index_imports.keys() {
1259 let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1260 read_hold
1261 .try_downgrade(as_of.clone())
1262 .map_err(|_| ReadHoldInsufficient(id))?;
1263 compute_dependencies.insert(id, read_hold);
1264 }
1265
1266 if as_of.is_empty() {
1269 replica_input_read_holds = Default::default();
1270 }
1271
1272 for export_id in dataflow.export_ids() {
1274 let shared = shared_collection_state
1275 .remove(&export_id)
1276 .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1277 let write_only = dataflow.sink_exports.contains_key(&export_id);
1278 let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1279
1280 self.add_collection(
1281 export_id,
1282 as_of.clone(),
1283 shared,
1284 storage_dependencies.clone(),
1285 compute_dependencies.clone(),
1286 replica_input_read_holds.clone(),
1287 write_only,
1288 storage_sink,
1289 dataflow.initial_storage_as_of.clone(),
1290 dataflow.refresh_schedule.clone(),
1291 );
1292
1293 if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1296 self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1297 }
1298 }
1299
1300 for subscribe_id in dataflow.subscribe_ids() {
1302 self.subscribes
1303 .insert(subscribe_id, ActiveSubscribe::new(subscribe_target_replica));
1304 }
1305
1306 for copy_to_id in dataflow.copy_to_ids() {
1308 self.copy_tos.insert(copy_to_id);
1309 }
1310
1311 let mut source_imports = BTreeMap::new();
1314 for (id, import) in dataflow.source_imports {
1315 let frontiers = self
1316 .storage_collections
1317 .collection_frontiers(id)
1318 .expect("collection exists");
1319
1320 let collection_metadata = self
1321 .storage_collections
1322 .collection_metadata(id)
1323 .expect("we have a read hold on this collection");
1324
1325 let desc = SourceInstanceDesc {
1326 storage_metadata: collection_metadata.clone(),
1327 arguments: import.desc.arguments,
1328 typ: import.desc.typ.clone(),
1329 };
1330 source_imports.insert(
1331 id,
1332 mz_compute_types::dataflows::SourceImport {
1333 desc,
1334 monotonic: import.monotonic,
1335 with_snapshot: import.with_snapshot,
1336 upper: frontiers.write_frontier,
1337 },
1338 );
1339 }
1340
1341 let mut sink_exports = BTreeMap::new();
1342 for (id, se) in dataflow.sink_exports {
1343 let connection = match se.connection {
1344 ComputeSinkConnection::MaterializedView(conn) => {
1345 let metadata = self
1346 .storage_collections
1347 .collection_metadata(id)
1348 .map_err(|_| CollectionMissing(id))?
1349 .clone();
1350 let conn = MaterializedViewSinkConnection {
1351 value_desc: conn.value_desc,
1352 storage_metadata: metadata,
1353 };
1354 ComputeSinkConnection::MaterializedView(conn)
1355 }
1356 ComputeSinkConnection::ContinualTask(conn) => {
1357 let metadata = self
1358 .storage_collections
1359 .collection_metadata(id)
1360 .map_err(|_| DataflowCreationError::CollectionMissing(id))?
1361 .clone();
1362 let conn = ContinualTaskConnection {
1363 input_id: conn.input_id,
1364 storage_metadata: metadata,
1365 };
1366 ComputeSinkConnection::ContinualTask(conn)
1367 }
1368 ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1369 ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1370 ComputeSinkConnection::CopyToS3Oneshot(conn)
1371 }
1372 };
1373 let desc = ComputeSinkDesc {
1374 from: se.from,
1375 from_desc: se.from_desc,
1376 connection,
1377 with_snapshot: se.with_snapshot,
1378 up_to: se.up_to,
1379 non_null_assertions: se.non_null_assertions,
1380 refresh_schedule: se.refresh_schedule,
1381 };
1382 sink_exports.insert(id, desc);
1383 }
1384
1385 let objects_to_build = dataflow
1387 .objects_to_build
1388 .into_iter()
1389 .map(|object| BuildDesc {
1390 id: object.id,
1391 plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1392 })
1393 .collect();
1394
1395 let augmented_dataflow = DataflowDescription {
1396 source_imports,
1397 sink_exports,
1398 objects_to_build,
1399 index_imports: dataflow.index_imports,
1401 index_exports: dataflow.index_exports,
1402 as_of: dataflow.as_of.clone(),
1403 until: dataflow.until,
1404 initial_storage_as_of: dataflow.initial_storage_as_of,
1405 refresh_schedule: dataflow.refresh_schedule,
1406 debug_name: dataflow.debug_name,
1407 time_dependence: dataflow.time_dependence,
1408 };
1409
1410 if augmented_dataflow.is_transient() {
1411 tracing::debug!(
1412 name = %augmented_dataflow.debug_name,
1413 import_ids = %augmented_dataflow.display_import_ids(),
1414 export_ids = %augmented_dataflow.display_export_ids(),
1415 as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1416 until = ?augmented_dataflow.until.elements(),
1417 "creating dataflow",
1418 );
1419 } else {
1420 tracing::info!(
1421 name = %augmented_dataflow.debug_name,
1422 import_ids = %augmented_dataflow.display_import_ids(),
1423 export_ids = %augmented_dataflow.display_export_ids(),
1424 as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1425 until = ?augmented_dataflow.until.elements(),
1426 "creating dataflow",
1427 );
1428 }
1429
1430 if as_of.is_empty() {
1433 tracing::info!(
1434 name = %augmented_dataflow.debug_name,
1435 "not sending `CreateDataflow`, because of empty `as_of`",
1436 );
1437 } else {
1438 let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1439 let dataflow = Box::new(augmented_dataflow);
1440 self.send(ComputeCommand::CreateDataflow(dataflow));
1441
1442 for id in collections {
1443 self.maybe_schedule_collection(id);
1444 }
1445 }
1446
1447 Ok(())
1448 }
1449
1450 fn maybe_schedule_collection(&mut self, id: GlobalId) {
1456 let collection = self.expect_collection(id);
1457
1458 if collection.scheduled {
1460 return;
1461 }
1462
1463 let as_of = collection.read_frontier();
1464
1465 if as_of.is_empty() {
1468 return;
1469 }
1470
1471 let ready = if id.is_transient() {
1472 true
1478 } else {
1479 let not_self_dep = |x: &GlobalId| *x != id;
1485
1486 let mut deps_scheduled = true;
1489
1490 let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1495 let mut compute_frontiers = Vec::new();
1496 for id in compute_deps {
1497 let dep = &self.expect_collection(id);
1498 deps_scheduled &= dep.scheduled;
1499 compute_frontiers.push(dep.write_frontier());
1500 }
1501
1502 let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1503 let storage_frontiers = self
1504 .storage_collections
1505 .collections_frontiers(storage_deps.collect())
1506 .expect("must exist");
1507 let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1508
1509 let mut frontiers = compute_frontiers.into_iter().chain(storage_frontiers);
1510 let frontiers_ready =
1511 frontiers.all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1512
1513 deps_scheduled && frontiers_ready
1514 };
1515
1516 if ready {
1517 self.send(ComputeCommand::Schedule(id));
1518 let collection = self.expect_collection_mut(id);
1519 collection.scheduled = true;
1520 }
1521 }
1522
1523 fn schedule_collections(&mut self) {
1525 let ids: Vec<_> = self.collections.keys().copied().collect();
1526 for id in ids {
1527 self.maybe_schedule_collection(id);
1528 }
1529 }
1530
1531 #[mz_ore::instrument(level = "debug")]
1534 pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1535 for id in &ids {
1536 let collection = self.collection_mut(*id)?;
1537
1538 collection.dropped = true;
1540
1541 collection.implied_read_hold.release();
1544 collection.warmup_read_hold.release();
1545
1546 self.subscribes.remove(id);
1549 self.copy_tos.remove(id);
1552 }
1553
1554 Ok(())
1555 }
1556
1557 #[mz_ore::instrument(level = "debug")]
1561 pub fn peek(
1562 &mut self,
1563 peek_target: PeekTarget,
1564 literal_constraints: Option<Vec<Row>>,
1565 uuid: Uuid,
1566 timestamp: T,
1567 result_desc: RelationDesc,
1568 finishing: RowSetFinishing,
1569 map_filter_project: mz_expr::SafeMfpPlan,
1570 mut read_hold: ReadHold<T>,
1571 target_replica: Option<ReplicaId>,
1572 peek_response_tx: oneshot::Sender<PeekResponse>,
1573 ) -> Result<(), PeekError> {
1574 use PeekError::*;
1575
1576 let target_id = peek_target.id();
1577
1578 if read_hold.id() != target_id {
1580 return Err(ReadHoldIdMismatch(read_hold.id()));
1581 }
1582 read_hold
1583 .try_downgrade(Antichain::from_elem(timestamp.clone()))
1584 .map_err(|_| ReadHoldInsufficient(target_id))?;
1585
1586 if let Some(target) = target_replica {
1587 if !self.replica_exists(target) {
1588 return Err(ReplicaMissing(target));
1589 }
1590 }
1591
1592 let otel_ctx = OpenTelemetryContext::obtain();
1593
1594 self.peeks.insert(
1595 uuid,
1596 PendingPeek {
1597 target_replica,
1598 otel_ctx: otel_ctx.clone(),
1600 requested_at: Instant::now(),
1601 read_hold,
1602 peek_response_tx,
1603 limit: finishing.limit.map(usize::cast_from),
1604 offset: finishing.offset,
1605 },
1606 );
1607
1608 let peek = Peek {
1609 literal_constraints,
1610 uuid,
1611 timestamp,
1612 finishing,
1613 map_filter_project,
1614 otel_ctx,
1617 target: peek_target,
1618 result_desc,
1619 };
1620 self.send(ComputeCommand::Peek(Box::new(peek)));
1621
1622 Ok(())
1623 }
1624
1625 #[mz_ore::instrument(level = "debug")]
1627 pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1628 let Some(peek) = self.peeks.get_mut(&uuid) else {
1629 tracing::warn!("did not find pending peek for {uuid}");
1630 return;
1631 };
1632
1633 let duration = peek.requested_at.elapsed();
1634 self.metrics
1635 .observe_peek_response(&PeekResponse::Canceled, duration);
1636
1637 let otel_ctx = peek.otel_ctx.clone();
1639 otel_ctx.attach_as_parent();
1640
1641 self.deliver_response(ComputeControllerResponse::PeekNotification(
1642 uuid,
1643 PeekNotification::Canceled,
1644 otel_ctx,
1645 ));
1646
1647 self.finish_peek(uuid, reason);
1650 }
1651
1652 #[mz_ore::instrument(level = "debug")]
1664 pub fn set_read_policy(
1665 &mut self,
1666 policies: Vec<(GlobalId, ReadPolicy<T>)>,
1667 ) -> Result<(), ReadPolicyError> {
1668 for (id, _policy) in &policies {
1671 let collection = self.collection(*id)?;
1672 if collection.read_policy.is_none() {
1673 return Err(ReadPolicyError::WriteOnlyCollection(*id));
1674 }
1675 }
1676
1677 for (id, new_policy) in policies {
1678 let collection = self.expect_collection_mut(id);
1679 let new_since = new_policy.frontier(collection.write_frontier().borrow());
1680 let _ = collection.implied_read_hold.try_downgrade(new_since);
1681 collection.read_policy = Some(new_policy);
1682 }
1683
1684 Ok(())
1685 }
1686
1687 #[mz_ore::instrument(level = "debug")]
1695 fn maybe_update_global_write_frontier(&mut self, id: GlobalId, new_frontier: Antichain<T>) {
1696 let collection = self.expect_collection_mut(id);
1697
1698 let advanced = collection.shared.lock_write_frontier(|f| {
1699 let advanced = PartialOrder::less_than(f, &new_frontier);
1700 if advanced {
1701 f.clone_from(&new_frontier);
1702 }
1703 advanced
1704 });
1705
1706 if !advanced {
1707 return;
1708 }
1709
1710 let new_since = match &collection.read_policy {
1712 Some(read_policy) => {
1713 read_policy.frontier(new_frontier.borrow())
1716 }
1717 None => {
1718 Antichain::from_iter(
1727 new_frontier
1728 .iter()
1729 .map(|t| t.step_back().unwrap_or_else(T::minimum)),
1730 )
1731 }
1732 };
1733 let _ = collection.implied_read_hold.try_downgrade(new_since);
1734
1735 self.deliver_response(ComputeControllerResponse::FrontierUpper {
1737 id,
1738 upper: new_frontier,
1739 });
1740 }
1741
1742 pub(super) fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch<T>) {
1744 let Some(collection) = self.collections.get_mut(&id) else {
1745 soft_panic_or_log!(
1746 "read hold change for absent collection (id={id}, changes={update:?})"
1747 );
1748 return;
1749 };
1750
1751 let new_since = collection.shared.lock_read_capabilities(|caps| {
1752 let read_frontier = caps.frontier();
1755 for (time, diff) in update.iter() {
1756 let count = caps.count_for(time) + diff;
1757 assert!(
1758 count >= 0,
1759 "invalid read capabilities update: negative capability \
1760 (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1761 );
1762 assert!(
1763 count == 0 || read_frontier.less_equal(time),
1764 "invalid read capabilities update: frontier regression \
1765 (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1766 );
1767 }
1768
1769 let changes = caps.update_iter(update.drain());
1772
1773 let changed = changes.count() > 0;
1774 changed.then(|| caps.frontier().to_owned())
1775 });
1776
1777 let Some(new_since) = new_since else {
1778 return; };
1780
1781 for read_hold in collection.compute_dependencies.values_mut() {
1783 read_hold
1784 .try_downgrade(new_since.clone())
1785 .expect("frontiers don't regress");
1786 }
1787 for read_hold in collection.storage_dependencies.values_mut() {
1788 read_hold
1789 .try_downgrade(new_since.clone())
1790 .expect("frontiers don't regress");
1791 }
1792
1793 self.send(ComputeCommand::AllowCompaction {
1795 id,
1796 frontier: new_since,
1797 });
1798 }
1799
1800 fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1809 let Some(peek) = self.peeks.remove(&uuid) else {
1810 return;
1811 };
1812
1813 let _ = peek.peek_response_tx.send(response);
1815
1816 self.send(ComputeCommand::CancelPeek { uuid });
1819
1820 drop(peek.read_hold);
1821 }
1822
1823 fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse<T>) {
1826 if self
1828 .replicas
1829 .get(&replica_id)
1830 .filter(|replica| replica.epoch == epoch)
1831 .is_none()
1832 {
1833 return;
1834 }
1835
1836 match response {
1839 ComputeResponse::Frontiers(id, frontiers) => {
1840 self.handle_frontiers_response(id, frontiers, replica_id);
1841 }
1842 ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
1843 self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
1844 }
1845 ComputeResponse::CopyToResponse(id, response) => {
1846 self.handle_copy_to_response(id, response, replica_id);
1847 }
1848 ComputeResponse::SubscribeResponse(id, response) => {
1849 self.handle_subscribe_response(id, response, replica_id);
1850 }
1851 ComputeResponse::Status(response) => {
1852 self.handle_status_response(response, replica_id);
1853 }
1854 }
1855 }
1856
1857 fn handle_frontiers_response(
1860 &mut self,
1861 id: GlobalId,
1862 frontiers: FrontiersResponse<T>,
1863 replica_id: ReplicaId,
1864 ) {
1865 if !self.collections.contains_key(&id) {
1866 soft_panic_or_log!(
1867 "frontiers update for an unknown collection \
1868 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1869 );
1870 return;
1871 }
1872 let Some(replica) = self.replicas.get_mut(&replica_id) else {
1873 soft_panic_or_log!(
1874 "frontiers update for an unknown replica \
1875 (replica_id={replica_id}, frontiers={frontiers:?})"
1876 );
1877 return;
1878 };
1879 let Some(replica_collection) = replica.collections.get_mut(&id) else {
1880 soft_panic_or_log!(
1881 "frontiers update for an unknown replica collection \
1882 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1883 );
1884 return;
1885 };
1886
1887 if let Some(new_frontier) = frontiers.input_frontier {
1888 replica_collection.update_input_frontier(new_frontier.clone());
1889 }
1890 if let Some(new_frontier) = frontiers.output_frontier {
1891 replica_collection.update_output_frontier(new_frontier.clone());
1892 }
1893 if let Some(new_frontier) = frontiers.write_frontier {
1894 replica_collection.update_write_frontier(new_frontier.clone());
1895 self.maybe_update_global_write_frontier(id, new_frontier);
1896 }
1897 }
1898
1899 #[mz_ore::instrument(level = "debug")]
1900 fn handle_peek_response(
1901 &mut self,
1902 uuid: Uuid,
1903 response: PeekResponse,
1904 otel_ctx: OpenTelemetryContext,
1905 replica_id: ReplicaId,
1906 ) {
1907 otel_ctx.attach_as_parent();
1908
1909 let Some(peek) = self.peeks.get(&uuid) else {
1912 return;
1913 };
1914
1915 let target_replica = peek.target_replica.unwrap_or(replica_id);
1917 if target_replica != replica_id {
1918 return;
1919 }
1920
1921 let duration = peek.requested_at.elapsed();
1922 self.metrics.observe_peek_response(&response, duration);
1923
1924 let notification = PeekNotification::new(&response, peek.offset, peek.limit);
1925 self.deliver_response(ComputeControllerResponse::PeekNotification(
1928 uuid,
1929 notification,
1930 otel_ctx,
1931 ));
1932
1933 self.finish_peek(uuid, response)
1934 }
1935
1936 fn handle_copy_to_response(
1937 &mut self,
1938 sink_id: GlobalId,
1939 response: CopyToResponse,
1940 replica_id: ReplicaId,
1941 ) {
1942 if !self.collections.contains_key(&sink_id) {
1943 soft_panic_or_log!(
1944 "received response for an unknown copy-to \
1945 (sink_id={sink_id}, replica_id={replica_id})",
1946 );
1947 return;
1948 }
1949 let Some(replica) = self.replicas.get_mut(&replica_id) else {
1950 soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
1951 return;
1952 };
1953 let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
1954 soft_panic_or_log!(
1955 "copy-to response for an unknown replica collection \
1956 (sink_id={sink_id}, replica_id={replica_id})"
1957 );
1958 return;
1959 };
1960
1961 replica_collection.update_write_frontier(Antichain::new());
1965 replica_collection.update_input_frontier(Antichain::new());
1966 replica_collection.update_output_frontier(Antichain::new());
1967
1968 if !self.copy_tos.remove(&sink_id) {
1971 return;
1972 }
1973
1974 let result = match response {
1975 CopyToResponse::RowCount(count) => Ok(count),
1976 CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
1977 CopyToResponse::Dropped => {
1982 tracing::error!(
1983 %sink_id, %replica_id,
1984 "received `Dropped` response for a tracked copy to",
1985 );
1986 return;
1987 }
1988 };
1989
1990 self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
1991 }
1992
1993 fn handle_subscribe_response(
1994 &mut self,
1995 subscribe_id: GlobalId,
1996 response: SubscribeResponse<T>,
1997 replica_id: ReplicaId,
1998 ) {
1999 if !self.collections.contains_key(&subscribe_id) {
2000 soft_panic_or_log!(
2001 "received response for an unknown subscribe \
2002 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2003 );
2004 return;
2005 }
2006 let Some(replica) = self.replicas.get_mut(&replica_id) else {
2007 soft_panic_or_log!(
2008 "subscribe response for an unknown replica (replica_id={replica_id})"
2009 );
2010 return;
2011 };
2012 let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2013 soft_panic_or_log!(
2014 "subscribe response for an unknown replica collection \
2015 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2016 );
2017 return;
2018 };
2019
2020 let write_frontier = match &response {
2024 SubscribeResponse::Batch(batch) => batch.upper.clone(),
2025 SubscribeResponse::DroppedAt(_) => Antichain::new(),
2026 };
2027
2028 replica_collection.update_write_frontier(write_frontier.clone());
2032 replica_collection.update_input_frontier(write_frontier.clone());
2033 replica_collection.update_output_frontier(write_frontier.clone());
2034
2035 let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2037 return;
2038 };
2039 let replica_targeted = subscribe.target_replica.unwrap_or(replica_id) == replica_id;
2040 if !replica_targeted {
2041 return;
2042 }
2043
2044 self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2050
2051 match response {
2052 SubscribeResponse::Batch(batch) => {
2053 let upper = batch.upper;
2054 let mut updates = batch.updates;
2055
2056 if PartialOrder::less_than(&subscribe.frontier, &upper) {
2059 let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2060
2061 if upper.is_empty() {
2062 self.subscribes.remove(&subscribe_id);
2064 } else {
2065 self.subscribes.insert(subscribe_id, subscribe);
2067 }
2068
2069 if let Ok(updates) = updates.as_mut() {
2070 updates.retain(|(time, _data, _diff)| lower.less_equal(time));
2071 }
2072 self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2073 subscribe_id,
2074 SubscribeBatch {
2075 lower,
2076 upper,
2077 updates,
2078 },
2079 ));
2080 }
2081 }
2082 SubscribeResponse::DroppedAt(frontier) => {
2083 tracing::error!(
2088 %subscribe_id,
2089 %replica_id,
2090 frontier = ?frontier.elements(),
2091 "received `DroppedAt` response for a tracked subscribe",
2092 );
2093 self.subscribes.remove(&subscribe_id);
2094 }
2095 }
2096 }
2097
2098 fn handle_status_response(&self, response: StatusResponse, _replica_id: ReplicaId) {
2099 match response {
2100 StatusResponse::Placeholder => {}
2101 }
2102 }
2103
2104 fn dependency_write_frontiers<'b>(
2106 &'b self,
2107 collection: &'b CollectionState<T>,
2108 ) -> impl Iterator<Item = Antichain<T>> + 'b {
2109 let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2110 let collection = self.collections.get(&dep_id);
2111 collection.map(|c| c.write_frontier())
2112 });
2113 let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2114 let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2115 frontiers.map(|f| f.write_frontier)
2116 });
2117
2118 compute_frontiers.chain(storage_frontiers)
2119 }
2120
2121 fn transitive_storage_dependency_write_frontiers<'b>(
2123 &'b self,
2124 collection: &'b CollectionState<T>,
2125 ) -> impl Iterator<Item = Antichain<T>> + 'b {
2126 let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2127 let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2128 let mut done = BTreeSet::new();
2129
2130 while let Some(id) = todo.pop() {
2131 if done.contains(&id) {
2132 continue;
2133 }
2134 if let Some(dep) = self.collections.get(&id) {
2135 storage_ids.extend(dep.storage_dependency_ids());
2136 todo.extend(dep.compute_dependency_ids())
2137 }
2138 done.insert(id);
2139 }
2140
2141 let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2142 let frontiers = self.storage_collections.collection_frontiers(id).ok();
2143 frontiers.map(|f| f.write_frontier)
2144 });
2145
2146 storage_frontiers
2147 }
2148
2149 fn downgrade_warmup_capabilities(&mut self) {
2162 let mut new_capabilities = BTreeMap::new();
2163 for (id, collection) in &self.collections {
2164 if collection.read_policy.is_none()
2168 && collection.shared.lock_write_frontier(|f| f.is_empty())
2169 {
2170 new_capabilities.insert(*id, Antichain::new());
2171 continue;
2172 }
2173
2174 let mut new_capability = Antichain::new();
2175 for frontier in self.dependency_write_frontiers(collection) {
2176 for time in frontier {
2177 new_capability.insert(time.step_back().unwrap_or(time));
2178 }
2179 }
2180
2181 new_capabilities.insert(*id, new_capability);
2182 }
2183
2184 for (id, new_capability) in new_capabilities {
2185 let collection = self.expect_collection_mut(id);
2186 let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2187 }
2188 }
2189
2190 fn forward_implied_capabilities(&mut self) {
2218 if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2219 return;
2220 }
2221 if !self.replicas.is_empty() {
2222 return;
2223 }
2224
2225 let mut new_capabilities = BTreeMap::new();
2226 for (id, collection) in &self.collections {
2227 let Some(read_policy) = &collection.read_policy else {
2228 continue;
2230 };
2231
2232 let mut dep_frontier = Antichain::new();
2236 for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2237 dep_frontier.extend(frontier);
2238 }
2239
2240 let new_capability = read_policy.frontier(dep_frontier.borrow());
2241 if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2242 new_capabilities.insert(*id, new_capability);
2243 }
2244 }
2245
2246 for (id, new_capability) in new_capabilities {
2247 let collection = self.expect_collection_mut(id);
2248 let _ = collection.implied_read_hold.try_downgrade(new_capability);
2249 }
2250 }
2251
2252 pub(super) fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
2257 let collection = self.collection(id)?;
2263 let since = collection.shared.lock_read_capabilities(|caps| {
2264 let since = caps.frontier().to_owned();
2265 caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
2266 since
2267 });
2268 let hold = ReadHold::new(id, since, Arc::clone(&self.read_hold_tx));
2269 Ok(hold)
2270 }
2271
2272 #[mz_ore::instrument(level = "debug")]
2278 pub fn maintain(&mut self) {
2279 self.rehydrate_failed_replicas();
2280 self.downgrade_warmup_capabilities();
2281 self.forward_implied_capabilities();
2282 self.schedule_collections();
2283 self.cleanup_collections();
2284 self.update_frontier_introspection();
2285 self.refresh_state_metrics();
2286 self.refresh_wallclock_lag();
2287 }
2288}
2289
2290#[derive(Debug)]
2295struct CollectionState<T: ComputeControllerTimestamp> {
2296 log_collection: bool,
2300 dropped: bool,
2306 scheduled: bool,
2309
2310 read_only: bool,
2314
2315 shared: SharedCollectionState<T>,
2317
2318 implied_read_hold: ReadHold<T>,
2325 warmup_read_hold: ReadHold<T>,
2333 read_policy: Option<ReadPolicy<T>>,
2339
2340 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2343 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2346
2347 introspection: CollectionIntrospection<T>,
2349
2350 wallclock_lag_histogram_stash: Option<
2357 BTreeMap<
2358 (
2359 WallclockLagHistogramPeriod,
2360 WallclockLag,
2361 BTreeMap<&'static str, String>,
2362 ),
2363 Diff,
2364 >,
2365 >,
2366}
2367
2368impl<T: ComputeControllerTimestamp> CollectionState<T> {
2369 fn new(
2371 collection_id: GlobalId,
2372 as_of: Antichain<T>,
2373 shared: SharedCollectionState<T>,
2374 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2375 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2376 read_hold_tx: read_holds::ChangeTx<T>,
2377 introspection: CollectionIntrospection<T>,
2378 ) -> Self {
2379 let since = as_of.clone();
2381 let upper = as_of;
2383
2384 assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2386 assert!(shared.lock_write_frontier(|f| f == &upper));
2387
2388 let implied_read_hold =
2392 ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2393 let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2394
2395 let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2396 shared.lock_read_capabilities(|c| {
2397 c.update_iter(updates);
2398 });
2399
2400 let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2404 true => None,
2405 false => Some(Default::default()),
2406 };
2407
2408 Self {
2409 log_collection: false,
2410 dropped: false,
2411 scheduled: false,
2412 read_only: true,
2413 shared,
2414 implied_read_hold,
2415 warmup_read_hold,
2416 read_policy: Some(ReadPolicy::ValidFrom(since)),
2417 storage_dependencies,
2418 compute_dependencies,
2419 introspection,
2420 wallclock_lag_histogram_stash,
2421 }
2422 }
2423
2424 fn new_log_collection(
2426 id: GlobalId,
2427 shared: SharedCollectionState<T>,
2428 read_hold_tx: read_holds::ChangeTx<T>,
2429 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2430 ) -> Self {
2431 let since = Antichain::from_elem(T::minimum());
2432 let introspection =
2433 CollectionIntrospection::new(id, introspection_tx, since.clone(), false, None, None);
2434 let mut state = Self::new(
2435 id,
2436 since,
2437 shared,
2438 Default::default(),
2439 Default::default(),
2440 read_hold_tx,
2441 introspection,
2442 );
2443 state.log_collection = true;
2444 state.scheduled = true;
2446 state
2447 }
2448
2449 fn read_frontier(&self) -> Antichain<T> {
2451 self.shared
2452 .lock_read_capabilities(|c| c.frontier().to_owned())
2453 }
2454
2455 fn write_frontier(&self) -> Antichain<T> {
2457 self.shared.lock_write_frontier(|f| f.clone())
2458 }
2459
2460 fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2461 self.storage_dependencies.keys().copied()
2462 }
2463
2464 fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2465 self.compute_dependencies.keys().copied()
2466 }
2467
2468 fn dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2470 self.compute_dependency_ids()
2471 .chain(self.storage_dependency_ids())
2472 }
2473}
2474
2475#[derive(Clone, Debug)]
2486pub(super) struct SharedCollectionState<T> {
2487 read_capabilities: Arc<Mutex<MutableAntichain<T>>>,
2500 write_frontier: Arc<Mutex<Antichain<T>>>,
2502}
2503
2504impl<T: Timestamp> SharedCollectionState<T> {
2505 pub fn new(as_of: Antichain<T>) -> Self {
2506 let since = as_of.clone();
2508 let upper = as_of;
2510
2511 let mut read_capabilities = MutableAntichain::new();
2515 read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2516
2517 Self {
2518 read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2519 write_frontier: Arc::new(Mutex::new(upper)),
2520 }
2521 }
2522
2523 pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2524 where
2525 F: FnOnce(&mut MutableAntichain<T>) -> R,
2526 {
2527 let mut caps = self.read_capabilities.lock().expect("poisoned");
2528 f(&mut *caps)
2529 }
2530
2531 pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2532 where
2533 F: FnOnce(&mut Antichain<T>) -> R,
2534 {
2535 let mut frontier = self.write_frontier.lock().expect("poisoned");
2536 f(&mut *frontier)
2537 }
2538}
2539
2540#[derive(Debug)]
2545struct CollectionIntrospection<T: ComputeControllerTimestamp> {
2546 collection_id: GlobalId,
2548 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2550 frontiers: Option<FrontiersIntrospectionState<T>>,
2555 refresh: Option<RefreshIntrospectionState<T>>,
2559}
2560
2561impl<T: ComputeControllerTimestamp> CollectionIntrospection<T> {
2562 fn new(
2563 collection_id: GlobalId,
2564 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2565 as_of: Antichain<T>,
2566 storage_sink: bool,
2567 initial_as_of: Option<Antichain<T>>,
2568 refresh_schedule: Option<RefreshSchedule>,
2569 ) -> Self {
2570 let refresh =
2571 match (refresh_schedule, initial_as_of) {
2572 (Some(refresh_schedule), Some(initial_as_of)) => Some(
2573 RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2574 ),
2575 (refresh_schedule, _) => {
2576 soft_assert_or_log!(
2579 refresh_schedule.is_none(),
2580 "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2581 );
2582 None
2583 }
2584 };
2585 let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2586
2587 let self_ = Self {
2588 collection_id,
2589 introspection_tx,
2590 frontiers,
2591 refresh,
2592 };
2593
2594 self_.report_initial_state();
2595 self_
2596 }
2597
2598 fn report_initial_state(&self) {
2600 if let Some(frontiers) = &self.frontiers {
2601 let row = frontiers.row_for_collection(self.collection_id);
2602 let updates = vec![(row, Diff::ONE)];
2603 self.send(IntrospectionType::Frontiers, updates);
2604 }
2605
2606 if let Some(refresh) = &self.refresh {
2607 let row = refresh.row_for_collection(self.collection_id);
2608 let updates = vec![(row, Diff::ONE)];
2609 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2610 }
2611 }
2612
2613 fn observe_frontiers(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2616 self.update_frontier_introspection(read_frontier, write_frontier);
2617 self.update_refresh_introspection(write_frontier);
2618 }
2619
2620 fn update_frontier_introspection(
2621 &mut self,
2622 read_frontier: &Antichain<T>,
2623 write_frontier: &Antichain<T>,
2624 ) {
2625 let Some(frontiers) = &mut self.frontiers else {
2626 return;
2627 };
2628
2629 if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2630 {
2631 return; };
2633
2634 let retraction = frontiers.row_for_collection(self.collection_id);
2635 frontiers.update(read_frontier, write_frontier);
2636 let insertion = frontiers.row_for_collection(self.collection_id);
2637 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2638 self.send(IntrospectionType::Frontiers, updates);
2639 }
2640
2641 fn update_refresh_introspection(&mut self, write_frontier: &Antichain<T>) {
2642 let Some(refresh) = &mut self.refresh else {
2643 return;
2644 };
2645
2646 let retraction = refresh.row_for_collection(self.collection_id);
2647 refresh.frontier_update(write_frontier);
2648 let insertion = refresh.row_for_collection(self.collection_id);
2649
2650 if retraction == insertion {
2651 return; }
2653
2654 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2655 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2656 }
2657
2658 fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2659 let _ = self.introspection_tx.send((introspection_type, updates));
2662 }
2663}
2664
2665impl<T: ComputeControllerTimestamp> Drop for CollectionIntrospection<T> {
2666 fn drop(&mut self) {
2667 if let Some(frontiers) = &self.frontiers {
2669 let row = frontiers.row_for_collection(self.collection_id);
2670 let updates = vec![(row, Diff::MINUS_ONE)];
2671 self.send(IntrospectionType::Frontiers, updates);
2672 }
2673
2674 if let Some(refresh) = &self.refresh {
2676 let retraction = refresh.row_for_collection(self.collection_id);
2677 let updates = vec![(retraction, Diff::MINUS_ONE)];
2678 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2679 }
2680 }
2681}
2682
2683#[derive(Debug)]
2684struct FrontiersIntrospectionState<T> {
2685 read_frontier: Antichain<T>,
2686 write_frontier: Antichain<T>,
2687}
2688
2689impl<T: ComputeControllerTimestamp> FrontiersIntrospectionState<T> {
2690 fn new(as_of: Antichain<T>) -> Self {
2691 Self {
2692 read_frontier: as_of.clone(),
2693 write_frontier: as_of,
2694 }
2695 }
2696
2697 fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2699 let read_frontier = self
2700 .read_frontier
2701 .as_option()
2702 .map_or(Datum::Null, |ts| ts.clone().into());
2703 let write_frontier = self
2704 .write_frontier
2705 .as_option()
2706 .map_or(Datum::Null, |ts| ts.clone().into());
2707 Row::pack_slice(&[
2708 Datum::String(&collection_id.to_string()),
2709 read_frontier,
2710 write_frontier,
2711 ])
2712 }
2713
2714 fn update(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2716 if read_frontier != &self.read_frontier {
2717 self.read_frontier.clone_from(read_frontier);
2718 }
2719 if write_frontier != &self.write_frontier {
2720 self.write_frontier.clone_from(write_frontier);
2721 }
2722 }
2723}
2724
2725#[derive(Debug)]
2728struct RefreshIntrospectionState<T> {
2729 refresh_schedule: RefreshSchedule,
2731 initial_as_of: Antichain<T>,
2732 next_refresh: Datum<'static>, last_completed_refresh: Datum<'static>, }
2736
2737impl<T> RefreshIntrospectionState<T> {
2738 fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2740 Row::pack_slice(&[
2741 Datum::String(&collection_id.to_string()),
2742 self.last_completed_refresh,
2743 self.next_refresh,
2744 ])
2745 }
2746}
2747
2748impl<T: ComputeControllerTimestamp> RefreshIntrospectionState<T> {
2749 fn new(
2752 refresh_schedule: RefreshSchedule,
2753 initial_as_of: Antichain<T>,
2754 upper: &Antichain<T>,
2755 ) -> Self {
2756 let mut self_ = Self {
2757 refresh_schedule: refresh_schedule.clone(),
2758 initial_as_of: initial_as_of.clone(),
2759 next_refresh: Datum::Null,
2760 last_completed_refresh: Datum::Null,
2761 };
2762 self_.frontier_update(upper);
2763 self_
2764 }
2765
2766 fn frontier_update(&mut self, write_frontier: &Antichain<T>) {
2769 if write_frontier.is_empty() {
2770 self.last_completed_refresh =
2771 if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2772 last_refresh.into()
2773 } else {
2774 T::maximum().into()
2777 };
2778 self.next_refresh = Datum::Null;
2779 } else {
2780 if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2781 self.last_completed_refresh = Datum::Null;
2783 let initial_as_of = self.initial_as_of.as_option().expect(
2784 "initial_as_of can't be [], because then there would be no refreshes at all",
2785 );
2786 let first_refresh = initial_as_of
2787 .round_up(&self.refresh_schedule)
2788 .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2789 soft_assert_or_log!(
2790 first_refresh == *initial_as_of,
2791 "initial_as_of should be set to the first refresh"
2792 );
2793 self.next_refresh = first_refresh.into();
2794 } else {
2795 let write_frontier = write_frontier.as_option().expect("checked above");
2797 self.last_completed_refresh = write_frontier
2798 .round_down_minus_1(&self.refresh_schedule)
2799 .map_or_else(
2800 || {
2801 soft_panic_or_log!(
2802 "rounding down should have returned the first refresh or later"
2803 );
2804 Datum::Null
2805 },
2806 |last_completed_refresh| last_completed_refresh.into(),
2807 );
2808 self.next_refresh = write_frontier.clone().into();
2809 }
2810 }
2811 }
2812}
2813
2814#[derive(Debug)]
2816struct PendingPeek<T: Timestamp> {
2817 target_replica: Option<ReplicaId>,
2821 otel_ctx: OpenTelemetryContext,
2823 requested_at: Instant,
2827 read_hold: ReadHold<T>,
2829 peek_response_tx: oneshot::Sender<PeekResponse>,
2831 limit: Option<usize>,
2833 offset: usize,
2835}
2836
2837#[derive(Debug, Clone)]
2838struct ActiveSubscribe<T> {
2839 frontier: Antichain<T>,
2841 target_replica: Option<ReplicaId>,
2845}
2846
2847impl<T: ComputeControllerTimestamp> ActiveSubscribe<T> {
2848 fn new(target_replica: Option<ReplicaId>) -> Self {
2849 Self {
2850 frontier: Antichain::from_elem(T::minimum()),
2851 target_replica,
2852 }
2853 }
2854}
2855
2856#[derive(Debug)]
2858struct ReplicaState<T: ComputeControllerTimestamp> {
2859 id: ReplicaId,
2861 client: ReplicaClient<T>,
2863 config: ReplicaConfig,
2865 metrics: ReplicaMetrics,
2867 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2869 collections: BTreeMap<GlobalId, ReplicaCollectionState<T>>,
2871 epoch: u64,
2873}
2874
2875impl<T: ComputeControllerTimestamp> ReplicaState<T> {
2876 fn new(
2877 id: ReplicaId,
2878 client: ReplicaClient<T>,
2879 config: ReplicaConfig,
2880 metrics: ReplicaMetrics,
2881 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2882 epoch: u64,
2883 ) -> Self {
2884 Self {
2885 id,
2886 client,
2887 config,
2888 metrics,
2889 introspection_tx,
2890 epoch,
2891 collections: Default::default(),
2892 }
2893 }
2894
2895 fn add_collection(
2901 &mut self,
2902 id: GlobalId,
2903 as_of: Antichain<T>,
2904 input_read_holds: Vec<ReadHold<T>>,
2905 ) {
2906 let metrics = self.metrics.for_collection(id);
2907 let introspection = ReplicaCollectionIntrospection::new(
2908 self.id,
2909 id,
2910 self.introspection_tx.clone(),
2911 as_of.clone(),
2912 );
2913 let mut state =
2914 ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
2915
2916 if id.is_transient() {
2920 state.wallclock_lag_max = None;
2921 }
2922
2923 if let Some(previous) = self.collections.insert(id, state) {
2924 panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
2925 }
2926 }
2927
2928 fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState<T>> {
2930 self.collections.remove(&id)
2931 }
2932
2933 fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
2935 self.collections.get(&id).map_or(true, |c| {
2936 c.write_frontier.is_empty()
2937 && c.input_frontier.is_empty()
2938 && c.output_frontier.is_empty()
2939 })
2940 }
2941
2942 #[mz_ore::instrument(level = "debug")]
2946 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2947 let Self {
2954 id,
2955 client: _,
2956 config: _,
2957 metrics: _,
2958 introspection_tx: _,
2959 epoch,
2960 collections,
2961 } = self;
2962
2963 let collections: BTreeMap<_, _> = collections
2964 .iter()
2965 .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
2966 .collect();
2967
2968 Ok(serde_json::json!({
2969 "id": id.to_string(),
2970 "collections": collections,
2971 "epoch": epoch,
2972 }))
2973 }
2974}
2975
2976#[derive(Debug)]
2977struct ReplicaCollectionState<T: ComputeControllerTimestamp> {
2978 write_frontier: Antichain<T>,
2982 input_frontier: Antichain<T>,
2986 output_frontier: Antichain<T>,
2990
2991 metrics: Option<ReplicaCollectionMetrics>,
2995 as_of: Antichain<T>,
2997 introspection: ReplicaCollectionIntrospection<T>,
2999 input_read_holds: Vec<ReadHold<T>>,
3005
3006 wallclock_lag_max: Option<WallclockLag>,
3010}
3011
3012impl<T: ComputeControllerTimestamp> ReplicaCollectionState<T> {
3013 fn new(
3014 metrics: Option<ReplicaCollectionMetrics>,
3015 as_of: Antichain<T>,
3016 introspection: ReplicaCollectionIntrospection<T>,
3017 input_read_holds: Vec<ReadHold<T>>,
3018 ) -> Self {
3019 Self {
3020 write_frontier: as_of.clone(),
3021 input_frontier: as_of.clone(),
3022 output_frontier: as_of.clone(),
3023 metrics,
3024 as_of,
3025 introspection,
3026 input_read_holds,
3027 wallclock_lag_max: Some(WallclockLag::MIN),
3028 }
3029 }
3030
3031 fn hydrated(&self) -> bool {
3033 self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3049 }
3050
3051 fn update_write_frontier(&mut self, new_frontier: Antichain<T>) {
3053 if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3054 soft_panic_or_log!(
3055 "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3056 self.write_frontier,
3057 );
3058 return;
3059 } else if new_frontier == self.write_frontier {
3060 return;
3061 }
3062
3063 self.write_frontier = new_frontier;
3064 }
3065
3066 fn update_input_frontier(&mut self, new_frontier: Antichain<T>) {
3068 if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3069 soft_panic_or_log!(
3070 "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3071 self.input_frontier,
3072 );
3073 return;
3074 } else if new_frontier == self.input_frontier {
3075 return;
3076 }
3077
3078 self.input_frontier = new_frontier;
3079
3080 for read_hold in &mut self.input_read_holds {
3082 let result = read_hold.try_downgrade(self.input_frontier.clone());
3083 soft_assert_or_log!(
3084 result.is_ok(),
3085 "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3086 self.input_frontier,
3087 );
3088 }
3089 }
3090
3091 fn update_output_frontier(&mut self, new_frontier: Antichain<T>) {
3093 if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3094 soft_panic_or_log!(
3095 "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3096 self.output_frontier,
3097 );
3098 return;
3099 } else if new_frontier == self.output_frontier {
3100 return;
3101 }
3102
3103 self.output_frontier = new_frontier;
3104 }
3105}
3106
3107#[derive(Debug)]
3110struct ReplicaCollectionIntrospection<T: ComputeControllerTimestamp> {
3111 replica_id: ReplicaId,
3113 collection_id: GlobalId,
3115 write_frontier: Antichain<T>,
3117 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3119}
3120
3121impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
3122 fn new(
3124 replica_id: ReplicaId,
3125 collection_id: GlobalId,
3126 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3127 as_of: Antichain<T>,
3128 ) -> Self {
3129 let self_ = Self {
3130 replica_id,
3131 collection_id,
3132 write_frontier: as_of,
3133 introspection_tx,
3134 };
3135
3136 self_.report_initial_state();
3137 self_
3138 }
3139
3140 fn report_initial_state(&self) {
3142 let row = self.write_frontier_row();
3143 let updates = vec![(row, Diff::ONE)];
3144 self.send(IntrospectionType::ReplicaFrontiers, updates);
3145 }
3146
3147 fn observe_frontier(&mut self, write_frontier: &Antichain<T>) {
3149 if self.write_frontier == *write_frontier {
3150 return; }
3152
3153 let retraction = self.write_frontier_row();
3154 self.write_frontier.clone_from(write_frontier);
3155 let insertion = self.write_frontier_row();
3156
3157 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3158 self.send(IntrospectionType::ReplicaFrontiers, updates);
3159 }
3160
3161 fn write_frontier_row(&self) -> Row {
3163 let write_frontier = self
3164 .write_frontier
3165 .as_option()
3166 .map_or(Datum::Null, |ts| ts.clone().into());
3167 Row::pack_slice(&[
3168 Datum::String(&self.collection_id.to_string()),
3169 Datum::String(&self.replica_id.to_string()),
3170 write_frontier,
3171 ])
3172 }
3173
3174 fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3175 let _ = self.introspection_tx.send((introspection_type, updates));
3178 }
3179}
3180
3181impl<T: ComputeControllerTimestamp> Drop for ReplicaCollectionIntrospection<T> {
3182 fn drop(&mut self) {
3183 let row = self.write_frontier_row();
3185 let updates = vec![(row, Diff::MINUS_ONE)];
3186 self.send(IntrospectionType::ReplicaFrontiers, updates);
3187 }
3188}