1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
21use mz_compute_types::plan::render_plan::RenderPlan;
22use mz_compute_types::sinks::{
23 ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, MaterializedViewSinkConnection,
24};
25use mz_compute_types::sources::SourceInstanceDesc;
26use mz_controller_types::dyncfgs::{
27 ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, WALLCLOCK_LAG_RECORDING_INTERVAL,
28};
29use mz_dyncfg::ConfigSet;
30use mz_expr::RowSetFinishing;
31use mz_ore::cast::CastFrom;
32use mz_ore::channel::instrumented_unbounded_channel;
33use mz_ore::now::NowFn;
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{soft_assert_or_log, soft_panic_or_log};
36use mz_persist_types::PersistLocation;
37use mz_repr::adt::timestamp::CheckedTimestamp;
38use mz_repr::refresh_schedule::RefreshSchedule;
39use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row, Timestamp};
40use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
41use mz_storage_types::read_holds::{self, ReadHold};
42use mz_storage_types::read_policy::ReadPolicy;
43use thiserror::Error;
44use timely::PartialOrder;
45use timely::progress::frontier::MutableAntichain;
46use timely::progress::{Antichain, ChangeBatch};
47use tokio::sync::{mpsc, oneshot};
48use uuid::Uuid;
49
50use crate::controller::error::{
51 CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
52};
53use crate::controller::instance_client::PeekError;
54use crate::controller::replica::{ReplicaClient, ReplicaConfig};
55use crate::controller::{
56 ComputeControllerResponse, IntrospectionUpdates, PeekNotification, ReplicaId,
57 StorageCollections,
58};
59use crate::logging::LogVariant;
60use crate::metrics::IntCounter;
61use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
62use crate::protocol::command::{
63 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
64};
65use crate::protocol::history::ComputeCommandHistory;
66use crate::protocol::response::{
67 ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StatusResponse,
68 SubscribeBatch, SubscribeResponse,
69};
70
71#[derive(Error, Debug)]
72#[error("replica exists already: {0}")]
73pub(super) struct ReplicaExists(pub ReplicaId);
74
75#[derive(Error, Debug)]
76#[error("replica does not exist: {0}")]
77pub(super) struct ReplicaMissing(pub ReplicaId);
78
79#[derive(Error, Debug)]
80pub(super) enum DataflowCreationError {
81 #[error("collection does not exist: {0}")]
82 CollectionMissing(GlobalId),
83 #[error("replica does not exist: {0}")]
84 ReplicaMissing(ReplicaId),
85 #[error("dataflow definition lacks an as_of value")]
86 MissingAsOf,
87 #[error("subscribe dataflow has an empty as_of")]
88 EmptyAsOfForSubscribe,
89 #[error("copy to dataflow has an empty as_of")]
90 EmptyAsOfForCopyTo,
91 #[error("no read hold provided for dataflow import: {0}")]
92 ReadHoldMissing(GlobalId),
93 #[error("insufficient read hold provided for dataflow import: {0}")]
94 ReadHoldInsufficient(GlobalId),
95}
96
97impl From<CollectionMissing> for DataflowCreationError {
98 fn from(error: CollectionMissing) -> Self {
99 Self::CollectionMissing(error.0)
100 }
101}
102
103#[derive(Error, Debug)]
104pub(super) enum ReadPolicyError {
105 #[error("collection does not exist: {0}")]
106 CollectionMissing(GlobalId),
107 #[error("collection is write-only: {0}")]
108 WriteOnlyCollection(GlobalId),
109}
110
111impl From<CollectionMissing> for ReadPolicyError {
112 fn from(error: CollectionMissing) -> Self {
113 Self::CollectionMissing(error.0)
114 }
115}
116
117pub(super) type Command = Box<dyn FnOnce(&mut Instance) + Send>;
119
120pub(super) type ReplicaResponse = (ReplicaId, u64, ComputeResponse);
123
124pub(super) struct Instance {
126 build_info: &'static BuildInfo,
128 storage_collections: StorageCollections,
130 initialized: bool,
132 read_only: bool,
137 workload_class: Option<String>,
141 replicas: BTreeMap<ReplicaId, ReplicaState>,
143 collections: BTreeMap<GlobalId, CollectionState>,
151 log_sources: BTreeMap<LogVariant, GlobalId>,
153 peeks: BTreeMap<Uuid, PendingPeek>,
162 subscribes: BTreeMap<GlobalId, ActiveSubscribe>,
176 copy_tos: BTreeSet<GlobalId>,
184 history: ComputeCommandHistory<UIntGauge>,
186 command_rx: mpsc::UnboundedReceiver<Command>,
188 response_tx: mpsc::UnboundedSender<ComputeControllerResponse>,
190 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
192 metrics: InstanceMetrics,
194 dyncfg: Arc<ConfigSet>,
196
197 peek_stash_persist_location: PersistLocation,
199
200 now: NowFn,
202 wallclock_lag: WallclockLagFn<Timestamp>,
204 wallclock_lag_last_recorded: DateTime<Utc>,
206
207 read_hold_tx: read_holds::ChangeTx,
212 replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse, IntCounter>,
214 replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse, IntCounter>,
216}
217
218impl Instance {
219 fn collection(&self, id: GlobalId) -> Result<&CollectionState, CollectionMissing> {
221 self.collections.get(&id).ok_or(CollectionMissing(id))
222 }
223
224 fn collection_mut(&mut self, id: GlobalId) -> Result<&mut CollectionState, CollectionMissing> {
226 self.collections.get_mut(&id).ok_or(CollectionMissing(id))
227 }
228
229 fn expect_collection(&self, id: GlobalId) -> &CollectionState {
235 self.collections.get(&id).expect("collection must exist")
236 }
237
238 fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
244 self.collections
245 .get_mut(&id)
246 .expect("collection must exist")
247 }
248
249 fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState)> {
250 self.collections.iter().map(|(id, coll)| (*id, coll))
251 }
252
253 fn replicas_hosting(
260 &self,
261 id: GlobalId,
262 ) -> Result<impl Iterator<Item = &ReplicaState>, CollectionMissing> {
263 let target = self.collection(id)?.target_replica;
264 Ok(self
265 .replicas
266 .values()
267 .filter(move |r| target.map_or(true, |t| t == r.id)))
268 }
269
270 fn add_collection(
276 &mut self,
277 id: GlobalId,
278 as_of: Antichain<Timestamp>,
279 shared: SharedCollectionState,
280 storage_dependencies: BTreeMap<GlobalId, ReadHold>,
281 compute_dependencies: BTreeMap<GlobalId, ReadHold>,
282 replica_input_read_holds: Vec<ReadHold>,
283 write_only: bool,
284 storage_sink: bool,
285 initial_as_of: Option<Antichain<Timestamp>>,
286 refresh_schedule: Option<RefreshSchedule>,
287 target_replica: Option<ReplicaId>,
288 ) {
289 let dependency_ids: Vec<GlobalId> = compute_dependencies
291 .keys()
292 .chain(storage_dependencies.keys())
293 .copied()
294 .collect();
295 let introspection = CollectionIntrospection::new(
296 id,
297 self.introspection_tx.clone(),
298 as_of.clone(),
299 storage_sink,
300 initial_as_of,
301 refresh_schedule,
302 dependency_ids,
303 );
304 let mut state = CollectionState::new(
305 id,
306 as_of.clone(),
307 shared,
308 storage_dependencies,
309 compute_dependencies,
310 Arc::clone(&self.read_hold_tx),
311 introspection,
312 );
313 state.target_replica = target_replica;
314 if write_only {
316 state.read_policy = None;
317 }
318
319 if let Some(previous) = self.collections.insert(id, state) {
320 panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
321 }
322
323 for replica in self.replicas.values_mut() {
325 if target_replica.is_some_and(|id| id != replica.id) {
326 continue;
327 }
328 replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
329 }
330 }
331
332 fn remove_collection(&mut self, id: GlobalId) {
333 for replica in self.replicas.values_mut() {
335 replica.remove_collection(id);
336 }
337
338 self.collections.remove(&id);
340 }
341
342 fn add_replica_state(
343 &mut self,
344 id: ReplicaId,
345 client: ReplicaClient,
346 config: ReplicaConfig,
347 epoch: u64,
348 ) {
349 let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
350
351 let metrics = self.metrics.for_replica(id);
352 let mut replica = ReplicaState::new(
353 id,
354 client,
355 config,
356 metrics,
357 self.introspection_tx.clone(),
358 epoch,
359 );
360
361 for (collection_id, collection) in &self.collections {
363 if (collection.log_collection && !log_ids.contains(collection_id))
366 || collection.target_replica.is_some_and(|rid| rid != id)
367 {
368 continue;
369 }
370
371 let as_of = if collection.log_collection {
372 Antichain::from_elem(Timestamp::MIN)
377 } else {
378 collection.read_frontier().to_owned()
379 };
380
381 let input_read_holds = collection.storage_dependencies.values().cloned().collect();
382 replica.add_collection(*collection_id, as_of, input_read_holds);
383 }
384
385 self.replicas.insert(id, replica);
386 }
387
388 fn deliver_response(&self, response: ComputeControllerResponse) {
390 let _ = self.response_tx.send(response);
393 }
394
395 fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
397 let _ = self.introspection_tx.send((type_, updates));
400 }
401
402 fn replica_exists(&self, id: ReplicaId) -> bool {
404 self.replicas.contains_key(&id)
405 }
406
407 fn peeks_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = (Uuid, &PendingPeek)> {
409 self.peeks.iter().filter_map(move |(uuid, peek)| {
410 if peek.target_replica == Some(replica_id) {
411 Some((*uuid, peek))
412 } else {
413 None
414 }
415 })
416 }
417
418 fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
420 self.subscribes.keys().copied().filter(move |id| {
421 let collection = self.expect_collection(*id);
422 collection.target_replica == Some(replica_id)
423 })
424 }
425
426 fn update_frontier_introspection(&mut self) {
435 for collection in self.collections.values_mut() {
436 collection
437 .introspection
438 .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
439 }
440
441 for replica in self.replicas.values_mut() {
442 for collection in replica.collections.values_mut() {
443 collection
444 .introspection
445 .observe_frontier(&collection.write_frontier);
446 }
447 }
448 }
449
450 fn refresh_state_metrics(&self) {
459 let unscheduled_collections_count =
460 self.collections.values().filter(|c| !c.scheduled).count();
461 let connected_replica_count = self
462 .replicas
463 .values()
464 .filter(|r| r.client.is_connected())
465 .count();
466
467 self.metrics
468 .replica_count
469 .set(u64::cast_from(self.replicas.len()));
470 self.metrics
471 .collection_count
472 .set(u64::cast_from(self.collections.len()));
473 self.metrics
474 .collection_unscheduled_count
475 .set(u64::cast_from(unscheduled_collections_count));
476 self.metrics
477 .peek_count
478 .set(u64::cast_from(self.peeks.len()));
479 self.metrics
480 .subscribe_count
481 .set(u64::cast_from(self.subscribes.len()));
482 self.metrics
483 .copy_to_count
484 .set(u64::cast_from(self.copy_tos.len()));
485 self.metrics
486 .connected_replica_count
487 .set(u64::cast_from(connected_replica_count));
488 }
489
490 fn refresh_wallclock_lag(&mut self) {
509 let frontier_lag = |frontier: &Antichain<Timestamp>| match frontier.as_option() {
510 Some(ts) => (self.wallclock_lag)(ts.clone()),
511 None => Duration::ZERO,
512 };
513
514 let now_ms = (self.now)();
515 let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
516 let histogram_labels = match &self.workload_class {
517 Some(wc) => [("workload_class", wc.clone())].into(),
518 None => BTreeMap::new(),
519 };
520
521 let readable_storage_collections: BTreeSet<_> = self
524 .collections
525 .keys()
526 .filter_map(|id| {
527 let frontiers = self.storage_collections.collection_frontiers(*id).ok()?;
528 PartialOrder::less_than(&frontiers.read_capabilities, &frontiers.write_frontier)
529 .then_some(*id)
530 })
531 .collect();
532
533 for (id, collection) in &mut self.collections {
535 let write_frontier = collection.write_frontier();
536 let readable = if self.storage_collections.check_exists(*id).is_ok() {
537 readable_storage_collections.contains(id)
538 } else {
539 PartialOrder::less_than(&collection.read_frontier(), &write_frontier)
540 };
541
542 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
543 let bucket = if readable {
544 let lag = frontier_lag(&write_frontier);
545 let lag = lag.as_secs().next_power_of_two();
546 WallclockLag::Seconds(lag)
547 } else {
548 WallclockLag::Undefined
549 };
550
551 let key = (histogram_period, bucket, histogram_labels.clone());
552 *stash.entry(key).or_default() += Diff::ONE;
553 }
554 }
555
556 for replica in self.replicas.values_mut() {
558 for (id, collection) in &mut replica.collections {
559 let readable = readable_storage_collections.contains(id) || collection.hydrated();
564
565 let lag = if readable {
566 let lag = frontier_lag(&collection.write_frontier);
567 WallclockLag::Seconds(lag.as_secs())
568 } else {
569 WallclockLag::Undefined
570 };
571
572 if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
573 *wallclock_lag_max = (*wallclock_lag_max).max(lag);
574 }
575
576 if let Some(metrics) = &mut collection.metrics {
577 let secs = lag.unwrap_seconds_or(u64::MAX);
580 metrics.wallclock_lag.observe(secs);
581 };
582 }
583 }
584
585 self.maybe_record_wallclock_lag();
587 }
588
589 fn maybe_record_wallclock_lag(&mut self) {
597 if self.read_only {
598 return;
599 }
600
601 let duration_trunc = |datetime: DateTime<_>, interval| {
602 let td = TimeDelta::from_std(interval).ok()?;
603 datetime.duration_trunc(td).ok()
604 };
605
606 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
607 let now_dt = mz_ore::now::to_datetime((self.now)());
608 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
609 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
610 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
611 duration_trunc(now_dt, *default).unwrap()
612 });
613 if now_trunc <= self.wallclock_lag_last_recorded {
614 return;
615 }
616
617 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
618
619 let mut history_updates = Vec::new();
620 for (replica_id, replica) in &mut self.replicas {
621 for (collection_id, collection) in &mut replica.collections {
622 let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
623 continue;
624 };
625
626 let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
627 let row = Row::pack_slice(&[
628 Datum::String(&collection_id.to_string()),
629 Datum::String(&replica_id.to_string()),
630 max_lag.into_interval_datum(),
631 Datum::TimestampTz(now_ts),
632 ]);
633 history_updates.push((row, Diff::ONE));
634 }
635 }
636 if !history_updates.is_empty() {
637 self.deliver_introspection_updates(
638 IntrospectionType::WallclockLagHistory,
639 history_updates,
640 );
641 }
642
643 let mut histogram_updates = Vec::new();
644 let mut row_buf = Row::default();
645 for (collection_id, collection) in &mut self.collections {
646 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
647 continue;
648 };
649
650 for ((period, lag, labels), count) in std::mem::take(stash) {
651 let mut packer = row_buf.packer();
652 packer.extend([
653 Datum::TimestampTz(period.start),
654 Datum::TimestampTz(period.end),
655 Datum::String(&collection_id.to_string()),
656 lag.into_uint64_datum(),
657 ]);
658 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
659 packer.push_dict(labels);
660
661 histogram_updates.push((row_buf.clone(), count));
662 }
663 }
664 if !histogram_updates.is_empty() {
665 self.deliver_introspection_updates(
666 IntrospectionType::WallclockLagHistogram,
667 histogram_updates,
668 );
669 }
670
671 self.wallclock_lag_last_recorded = now_trunc;
672 }
673
674 #[mz_ore::instrument(level = "debug")]
680 pub fn collection_hydrated(&self, collection_id: GlobalId) -> Result<bool, CollectionMissing> {
681 let mut hosting_replicas = self.replicas_hosting(collection_id)?.peekable();
682 if hosting_replicas.peek().is_none() {
683 return Ok(true);
684 }
685 for replica_state in hosting_replicas {
686 let collection_state = replica_state
687 .collections
688 .get(&collection_id)
689 .expect("hosting replica must have per-replica collection state");
690
691 if collection_state.hydrated() {
692 return Ok(true);
693 }
694 }
695
696 Ok(false)
697 }
698
699 #[mz_ore::instrument(level = "debug")]
705 pub fn collections_hydrated_on_replicas(
706 &self,
707 target_replica_ids: Option<Vec<ReplicaId>>,
708 exclude_collections: &BTreeSet<GlobalId>,
709 ) -> Result<bool, HydrationCheckBadTarget> {
710 if self.replicas.is_empty() {
711 return Ok(true);
712 }
713 let mut all_hydrated = true;
714 let target_replicas: BTreeSet<ReplicaId> = self
715 .replicas
716 .keys()
717 .filter_map(|id| match target_replica_ids {
718 None => Some(id.clone()),
719 Some(ref ids) if ids.contains(id) => Some(id.clone()),
720 Some(_) => None,
721 })
722 .collect();
723 if let Some(targets) = target_replica_ids {
724 if target_replicas.is_empty() {
725 return Err(HydrationCheckBadTarget(targets));
726 }
727 }
728
729 for (id, _collection) in self.collections_iter() {
730 if id.is_transient() || exclude_collections.contains(&id) {
731 continue;
732 }
733
734 let mut collection_hydrated = false;
735 for replica_state in self.replicas_hosting(id).expect("collection must exist") {
738 if !target_replicas.contains(&replica_state.id) {
739 continue;
740 }
741 let collection_state = replica_state
742 .collections
743 .get(&id)
744 .expect("hosting replica must have per-replica collection state");
745
746 if collection_state.hydrated() {
747 collection_hydrated = true;
748 break;
749 }
750 }
751
752 if !collection_hydrated {
753 tracing::info!("collection {id} is not hydrated on any replica");
754 all_hydrated = false;
755 }
758 }
759
760 Ok(all_hydrated)
761 }
762
763 fn cleanup_collections(&mut self) {
779 let to_remove: Vec<_> = self
780 .collections_iter()
781 .filter(|(id, collection)| {
782 collection.dropped
783 && collection.shared.lock_read_capabilities(|c| c.is_empty())
784 && self
785 .replicas
786 .values()
787 .all(|r| r.collection_frontiers_empty(*id))
788 })
789 .map(|(id, _collection)| id)
790 .collect();
791
792 for id in to_remove {
793 self.remove_collection(id);
794 }
795 }
796
797 #[mz_ore::instrument(level = "debug")]
801 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
802 let Self {
809 build_info: _,
810 storage_collections: _,
811 peek_stash_persist_location: _,
812 initialized,
813 read_only,
814 workload_class,
815 replicas,
816 collections,
817 log_sources: _,
818 peeks,
819 subscribes,
820 copy_tos,
821 history: _,
822 command_rx: _,
823 response_tx: _,
824 introspection_tx: _,
825 metrics: _,
826 dyncfg: _,
827 now: _,
828 wallclock_lag: _,
829 wallclock_lag_last_recorded,
830 read_hold_tx: _,
831 replica_tx: _,
832 replica_rx: _,
833 } = self;
834
835 let replicas: BTreeMap<_, _> = replicas
836 .iter()
837 .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
838 .collect::<Result<_, anyhow::Error>>()?;
839 let collections: BTreeMap<_, _> = collections
840 .iter()
841 .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
842 .collect();
843 let peeks: BTreeMap<_, _> = peeks
844 .iter()
845 .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
846 .collect();
847 let subscribes: BTreeMap<_, _> = subscribes
848 .iter()
849 .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
850 .collect();
851 let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
852 let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
853
854 Ok(serde_json::json!({
855 "initialized": initialized,
856 "read_only": read_only,
857 "workload_class": workload_class,
858 "replicas": replicas,
859 "collections": collections,
860 "peeks": peeks,
861 "subscribes": subscribes,
862 "copy_tos": copy_tos,
863 "wallclock_lag_last_recorded": wallclock_lag_last_recorded,
864 }))
865 }
866
867 pub(super) fn collection_write_frontier(
869 &self,
870 id: GlobalId,
871 ) -> Result<Antichain<Timestamp>, CollectionMissing> {
872 Ok(self.collection(id)?.write_frontier())
873 }
874}
875
876impl Instance {
877 pub(super) fn new(
878 build_info: &'static BuildInfo,
879 storage: StorageCollections,
880 peek_stash_persist_location: PersistLocation,
881 arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState)>,
882 metrics: InstanceMetrics,
883 now: NowFn,
884 wallclock_lag: WallclockLagFn<Timestamp>,
885 dyncfg: Arc<ConfigSet>,
886 command_rx: mpsc::UnboundedReceiver<Command>,
887 response_tx: mpsc::UnboundedSender<ComputeControllerResponse>,
888 read_hold_tx: read_holds::ChangeTx,
889 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
890 read_only: bool,
891 ) -> Self {
892 let mut collections = BTreeMap::new();
893 let mut log_sources = BTreeMap::new();
894 for (log, id, shared) in arranged_logs {
895 let collection = CollectionState::new_log_collection(
896 id,
897 shared,
898 Arc::clone(&read_hold_tx),
899 introspection_tx.clone(),
900 );
901 collections.insert(id, collection);
902 log_sources.insert(log, id);
903 }
904
905 let history = ComputeCommandHistory::new(metrics.for_history());
906
907 let send_count = metrics.response_send_count.clone();
908 let recv_count = metrics.response_recv_count.clone();
909 let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
910
911 let now_dt = mz_ore::now::to_datetime(now());
912
913 Self {
914 build_info,
915 storage_collections: storage,
916 peek_stash_persist_location,
917 initialized: false,
918 read_only,
919 workload_class: None,
920 replicas: Default::default(),
921 collections,
922 log_sources,
923 peeks: Default::default(),
924 subscribes: Default::default(),
925 copy_tos: Default::default(),
926 history,
927 command_rx,
928 response_tx,
929 introspection_tx,
930 metrics,
931 dyncfg,
932 now,
933 wallclock_lag,
934 wallclock_lag_last_recorded: now_dt,
935 read_hold_tx,
936 replica_tx,
937 replica_rx,
938 }
939 }
940
941 pub(super) async fn run(mut self) {
942 self.send(ComputeCommand::Hello {
943 nonce: Uuid::default(),
946 });
947
948 let instance_config = InstanceConfig {
949 peek_stash_persist_location: self.peek_stash_persist_location.clone(),
950 logging: Default::default(),
953 expiration_offset: Default::default(),
954 };
955
956 self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
957
958 loop {
959 tokio::select! {
960 command = self.command_rx.recv() => match command {
961 Some(cmd) => cmd(&mut self),
962 None => break,
963 },
964 response = self.replica_rx.recv() => match response {
965 Some(response) => self.handle_response(response),
966 None => unreachable!("self owns a sender side of the channel"),
967 }
968 }
969 }
970 }
971
972 #[mz_ore::instrument(level = "debug")]
974 pub fn update_configuration(&mut self, config_params: ComputeParameters) {
975 if let Some(workload_class) = &config_params.workload_class {
976 self.workload_class = workload_class.clone();
977 }
978
979 let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
980 self.send(command);
981 }
982
983 #[mz_ore::instrument(level = "debug")]
988 pub fn initialization_complete(&mut self) {
989 if !self.initialized {
991 self.send(ComputeCommand::InitializationComplete);
992 self.initialized = true;
993 }
994 }
995
996 #[mz_ore::instrument(level = "debug")]
1000 pub fn allow_writes(&mut self, collection_id: GlobalId) -> Result<(), CollectionMissing> {
1001 let collection = self.collection_mut(collection_id)?;
1002
1003 if !collection.read_only {
1005 return Ok(());
1006 }
1007
1008 let as_of = collection.read_frontier();
1010
1011 if as_of.is_empty() {
1014 return Ok(());
1015 }
1016
1017 collection.read_only = false;
1018 self.send(ComputeCommand::AllowWrites(collection_id));
1019
1020 Ok(())
1021 }
1022
1023 #[mz_ore::instrument(level = "debug")]
1033 pub fn shutdown(&mut self) {
1034 let (_tx, rx) = mpsc::unbounded_channel();
1036 self.command_rx = rx;
1037
1038 let stray_replicas: Vec<_> = self.replicas.keys().collect();
1039 soft_assert_or_log!(
1040 stray_replicas.is_empty(),
1041 "dropped instance still has provisioned replicas: {stray_replicas:?}",
1042 );
1043 }
1044
1045 #[mz_ore::instrument(level = "debug")]
1047 fn send(&mut self, cmd: ComputeCommand) {
1048 self.history.push(cmd.clone());
1050
1051 let target_replica = self.target_replica(&cmd);
1052
1053 if let Some(rid) = target_replica {
1054 if let Some(replica) = self.replicas.get_mut(&rid) {
1055 let _ = replica.client.send(cmd);
1056 }
1057 } else {
1058 for replica in self.replicas.values_mut() {
1059 let _ = replica.client.send(cmd.clone());
1060 }
1061 }
1062 }
1063
1064 fn target_replica(&self, cmd: &ComputeCommand) -> Option<ReplicaId> {
1072 match &cmd {
1073 ComputeCommand::Schedule(id)
1074 | ComputeCommand::AllowWrites(id)
1075 | ComputeCommand::AllowCompaction { id, .. } => {
1076 self.expect_collection(*id).target_replica
1077 }
1078 ComputeCommand::CreateDataflow(desc) => {
1079 let mut target_replica = None;
1080 for id in desc.export_ids() {
1081 if let Some(replica) = self.expect_collection(id).target_replica {
1082 if target_replica.is_some() {
1083 assert_eq!(target_replica, Some(replica));
1084 }
1085 target_replica = Some(replica);
1086 }
1087 }
1088 target_replica
1089 }
1090 ComputeCommand::Peek(_)
1092 | ComputeCommand::Hello { .. }
1093 | ComputeCommand::CreateInstance(_)
1094 | ComputeCommand::InitializationComplete
1095 | ComputeCommand::UpdateConfiguration(_)
1096 | ComputeCommand::CancelPeek { .. } => None,
1097 }
1098 }
1099
1100 #[mz_ore::instrument(level = "debug")]
1102 pub fn add_replica(
1103 &mut self,
1104 id: ReplicaId,
1105 mut config: ReplicaConfig,
1106 epoch: Option<u64>,
1107 ) -> Result<(), ReplicaExists> {
1108 if self.replica_exists(id) {
1109 return Err(ReplicaExists(id));
1110 }
1111
1112 config.logging.index_logs = self.log_sources.clone();
1113
1114 let epoch = epoch.unwrap_or(1);
1115 let metrics = self.metrics.for_replica(id);
1116 let client = ReplicaClient::spawn(
1117 id,
1118 self.build_info,
1119 config.clone(),
1120 epoch,
1121 metrics.clone(),
1122 Arc::clone(&self.dyncfg),
1123 self.replica_tx.clone(),
1124 );
1125
1126 self.history.reduce();
1128
1129 self.history.update_source_uppers(&self.storage_collections);
1131
1132 for command in self.history.iter() {
1134 if let Some(target_replica) = self.target_replica(command)
1136 && target_replica != id
1137 {
1138 continue;
1139 }
1140
1141 if client.send(command.clone()).is_err() {
1142 tracing::warn!("Replica {:?} connection terminated during hydration", id);
1145 break;
1146 }
1147 }
1148
1149 self.add_replica_state(id, client, config, epoch);
1151
1152 Ok(())
1153 }
1154
1155 #[mz_ore::instrument(level = "debug")]
1157 pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1158 let replica = self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1159
1160 for (collection_id, replica_collection) in &replica.collections {
1168 let collection = self.collections.get(collection_id);
1169 for replica_hold in &replica_collection.input_read_holds {
1170 let input_id = replica_hold.id();
1171 let global_hold = collection.and_then(|c| c.storage_dependencies.get(&input_id));
1172 let unprotected = global_hold
1173 .is_none_or(|h| PartialOrder::less_than(replica_hold.since(), h.since()));
1174 if unprotected {
1175 tracing::warn!(
1176 replica_id = %id,
1177 %collection_id,
1178 %input_id,
1179 replica_hold_since = ?replica_hold.since(),
1180 global_hold_since = ?global_hold.map(|h| h.since()),
1181 "dropping per-replica read hold without equivalent global read hold",
1182 );
1183 }
1184 }
1185 }
1186 drop(replica);
1187
1188 let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1192 for subscribe_id in to_drop {
1193 let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1194 let response = ComputeControllerResponse::SubscribeResponse(
1195 subscribe_id,
1196 SubscribeBatch {
1197 lower: subscribe.frontier.clone(),
1198 upper: subscribe.frontier,
1199 updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1200 },
1201 );
1202 self.deliver_response(response);
1203 }
1204
1205 let mut peek_responses = Vec::new();
1210 let mut to_drop = Vec::new();
1211 for (uuid, peek) in self.peeks_targeting(id) {
1212 peek_responses.push(ComputeControllerResponse::PeekNotification(
1213 uuid,
1214 PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1215 peek.otel_ctx.clone(),
1216 ));
1217 to_drop.push(uuid);
1218 }
1219 for response in peek_responses {
1220 self.deliver_response(response);
1221 }
1222 for uuid in to_drop {
1223 let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1224 self.finish_peek(uuid, response);
1225 }
1226
1227 self.forward_implied_capabilities();
1230
1231 Ok(())
1232 }
1233
1234 fn rehydrate_replica(&mut self, id: ReplicaId) {
1240 let config = self.replicas[&id].config.clone();
1241 let epoch = self.replicas[&id].epoch + 1;
1242
1243 self.remove_replica(id).expect("replica must exist");
1244 let result = self.add_replica(id, config, Some(epoch));
1245
1246 match result {
1247 Ok(()) => (),
1248 Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1249 }
1250 }
1251
1252 fn rehydrate_failed_replicas(&mut self) {
1254 let replicas = self.replicas.iter();
1255 let failed_replicas: Vec<_> = replicas
1256 .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1257 .collect();
1258
1259 for replica_id in failed_replicas {
1260 self.rehydrate_replica(replica_id);
1261 }
1262 }
1263
1264 #[mz_ore::instrument(level = "debug")]
1269 pub fn create_dataflow(
1270 &mut self,
1271 dataflow: DataflowDescription<mz_compute_types::plan::Plan, ()>,
1272 import_read_holds: Vec<ReadHold>,
1273 mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState>,
1274 target_replica: Option<ReplicaId>,
1275 ) -> Result<(), DataflowCreationError> {
1276 use DataflowCreationError::*;
1277
1278 if let Some(replica_id) = target_replica {
1282 if !self.replica_exists(replica_id) {
1283 return Err(ReplicaMissing(replica_id));
1284 }
1285 }
1286
1287 let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1289 if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1290 return Err(EmptyAsOfForSubscribe);
1291 }
1292 if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1293 return Err(EmptyAsOfForCopyTo);
1294 }
1295
1296 let mut storage_dependencies = BTreeMap::new();
1298 let mut compute_dependencies = BTreeMap::new();
1299
1300 let mut replica_input_read_holds = Vec::new();
1305
1306 let mut import_read_holds: BTreeMap<_, _> =
1307 import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1308
1309 for &id in dataflow.source_imports.keys() {
1310 let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1311 replica_input_read_holds.push(read_hold.clone());
1312
1313 read_hold
1314 .try_downgrade(as_of.clone())
1315 .map_err(|_| ReadHoldInsufficient(id))?;
1316 storage_dependencies.insert(id, read_hold);
1317 }
1318
1319 for &id in dataflow.index_imports.keys() {
1320 let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1321 read_hold
1322 .try_downgrade(as_of.clone())
1323 .map_err(|_| ReadHoldInsufficient(id))?;
1324 compute_dependencies.insert(id, read_hold);
1325 }
1326
1327 if as_of.is_empty() {
1330 replica_input_read_holds = Default::default();
1331 }
1332
1333 for export_id in dataflow.export_ids() {
1335 let shared = shared_collection_state
1336 .remove(&export_id)
1337 .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1338 let write_only = dataflow.sink_exports.contains_key(&export_id);
1339 let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1340
1341 self.add_collection(
1342 export_id,
1343 as_of.clone(),
1344 shared,
1345 storage_dependencies.clone(),
1346 compute_dependencies.clone(),
1347 replica_input_read_holds.clone(),
1348 write_only,
1349 storage_sink,
1350 dataflow.initial_storage_as_of.clone(),
1351 dataflow.refresh_schedule.clone(),
1352 target_replica,
1353 );
1354
1355 if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1358 self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1359 }
1360 }
1361
1362 for subscribe_id in dataflow.subscribe_ids() {
1364 self.subscribes
1365 .insert(subscribe_id, ActiveSubscribe::default());
1366 }
1367
1368 for copy_to_id in dataflow.copy_to_ids() {
1370 self.copy_tos.insert(copy_to_id);
1371 }
1372
1373 let mut source_imports = BTreeMap::new();
1376 for (id, import) in dataflow.source_imports {
1377 let frontiers = self
1378 .storage_collections
1379 .collection_frontiers(id)
1380 .expect("collection exists");
1381
1382 let collection_metadata = self
1383 .storage_collections
1384 .collection_metadata(id)
1385 .expect("we have a read hold on this collection");
1386
1387 let desc = SourceInstanceDesc {
1388 storage_metadata: collection_metadata.clone(),
1389 arguments: import.desc.arguments,
1390 typ: import.desc.typ.clone(),
1391 };
1392 source_imports.insert(
1393 id,
1394 mz_compute_types::dataflows::SourceImport {
1395 desc,
1396 monotonic: import.monotonic,
1397 with_snapshot: import.with_snapshot,
1398 upper: frontiers.write_frontier,
1399 },
1400 );
1401 }
1402
1403 let mut sink_exports = BTreeMap::new();
1404 for (id, se) in dataflow.sink_exports {
1405 let connection = match se.connection {
1406 ComputeSinkConnection::MaterializedView(conn) => {
1407 let metadata = self
1408 .storage_collections
1409 .collection_metadata(id)
1410 .map_err(|_| CollectionMissing(id))?
1411 .clone();
1412 let conn = MaterializedViewSinkConnection {
1413 value_desc: conn.value_desc,
1414 storage_metadata: metadata,
1415 };
1416 ComputeSinkConnection::MaterializedView(conn)
1417 }
1418 ComputeSinkConnection::ContinualTask(conn) => {
1419 let metadata = self
1420 .storage_collections
1421 .collection_metadata(id)
1422 .map_err(|_| DataflowCreationError::CollectionMissing(id))?
1423 .clone();
1424 let conn = ContinualTaskConnection {
1425 input_id: conn.input_id,
1426 storage_metadata: metadata,
1427 };
1428 ComputeSinkConnection::ContinualTask(conn)
1429 }
1430 ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1431 ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1432 ComputeSinkConnection::CopyToS3Oneshot(conn)
1433 }
1434 };
1435 let desc = ComputeSinkDesc {
1436 from: se.from,
1437 from_desc: se.from_desc,
1438 connection,
1439 with_snapshot: se.with_snapshot,
1440 up_to: se.up_to,
1441 non_null_assertions: se.non_null_assertions,
1442 refresh_schedule: se.refresh_schedule,
1443 };
1444 sink_exports.insert(id, desc);
1445 }
1446
1447 let objects_to_build = dataflow
1449 .objects_to_build
1450 .into_iter()
1451 .map(|object| BuildDesc {
1452 id: object.id,
1453 plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1454 })
1455 .collect();
1456
1457 let augmented_dataflow = DataflowDescription {
1458 source_imports,
1459 sink_exports,
1460 objects_to_build,
1461 index_imports: dataflow.index_imports,
1463 index_exports: dataflow.index_exports,
1464 as_of: dataflow.as_of.clone(),
1465 until: dataflow.until,
1466 initial_storage_as_of: dataflow.initial_storage_as_of,
1467 refresh_schedule: dataflow.refresh_schedule,
1468 debug_name: dataflow.debug_name,
1469 time_dependence: dataflow.time_dependence,
1470 };
1471
1472 if augmented_dataflow.is_transient() {
1473 tracing::debug!(
1474 name = %augmented_dataflow.debug_name,
1475 import_ids = %augmented_dataflow.display_import_ids(),
1476 export_ids = %augmented_dataflow.display_export_ids(),
1477 as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1478 until = ?augmented_dataflow.until.elements(),
1479 "creating dataflow",
1480 );
1481 } else {
1482 tracing::info!(
1483 name = %augmented_dataflow.debug_name,
1484 import_ids = %augmented_dataflow.display_import_ids(),
1485 export_ids = %augmented_dataflow.display_export_ids(),
1486 as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1487 until = ?augmented_dataflow.until.elements(),
1488 "creating dataflow",
1489 );
1490 }
1491
1492 if as_of.is_empty() {
1495 tracing::info!(
1496 name = %augmented_dataflow.debug_name,
1497 "not sending `CreateDataflow`, because of empty `as_of`",
1498 );
1499 } else {
1500 let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1501 self.send(ComputeCommand::CreateDataflow(Box::new(augmented_dataflow)));
1502
1503 for id in collections {
1504 self.maybe_schedule_collection(id);
1505 }
1506 }
1507
1508 Ok(())
1509 }
1510
1511 fn maybe_schedule_collection(&mut self, id: GlobalId) {
1517 let collection = self.expect_collection(id);
1518
1519 if collection.scheduled {
1521 return;
1522 }
1523
1524 let as_of = collection.read_frontier();
1525
1526 if as_of.is_empty() {
1529 return;
1530 }
1531
1532 let ready = if id.is_transient() {
1533 true
1539 } else {
1540 let not_self_dep = |x: &GlobalId| *x != id;
1546
1547 let mut deps_scheduled = true;
1550
1551 let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1556 let mut compute_frontiers = Vec::new();
1557 for id in compute_deps {
1558 let dep = &self.expect_collection(id);
1559 deps_scheduled &= dep.scheduled;
1560 compute_frontiers.push(dep.write_frontier());
1561 }
1562
1563 let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1564 let storage_frontiers = self
1565 .storage_collections
1566 .collections_frontiers(storage_deps.collect())
1567 .expect("must exist");
1568 let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1569
1570 let mut frontiers = compute_frontiers.into_iter().chain(storage_frontiers);
1571 let frontiers_ready =
1572 frontiers.all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1573
1574 deps_scheduled && frontiers_ready
1575 };
1576
1577 if ready {
1578 self.send(ComputeCommand::Schedule(id));
1579 let collection = self.expect_collection_mut(id);
1580 collection.scheduled = true;
1581 }
1582 }
1583
1584 fn schedule_collections(&mut self) {
1586 let ids: Vec<_> = self.collections.keys().copied().collect();
1587 for id in ids {
1588 self.maybe_schedule_collection(id);
1589 }
1590 }
1591
1592 #[mz_ore::instrument(level = "debug")]
1595 pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1596 for id in &ids {
1597 let collection = self.collection_mut(*id)?;
1598
1599 collection.dropped = true;
1601
1602 collection.implied_read_hold.release();
1605 collection.warmup_read_hold.release();
1606
1607 self.subscribes.remove(id);
1610 self.copy_tos.remove(id);
1613 }
1614
1615 Ok(())
1616 }
1617
1618 #[mz_ore::instrument(level = "debug")]
1622 pub fn peek(
1623 &mut self,
1624 peek_target: PeekTarget,
1625 literal_constraints: Option<Vec<Row>>,
1626 uuid: Uuid,
1627 timestamp: Timestamp,
1628 result_desc: RelationDesc,
1629 finishing: RowSetFinishing,
1630 map_filter_project: mz_expr::SafeMfpPlan,
1631 mut read_hold: ReadHold,
1632 target_replica: Option<ReplicaId>,
1633 peek_response_tx: oneshot::Sender<PeekResponse>,
1634 ) -> Result<(), PeekError> {
1635 use PeekError::*;
1636
1637 let target_id = peek_target.id();
1638
1639 if read_hold.id() != target_id {
1641 return Err(ReadHoldIdMismatch(read_hold.id()));
1642 }
1643 read_hold
1644 .try_downgrade(Antichain::from_elem(timestamp.clone()))
1645 .map_err(|_| ReadHoldInsufficient(target_id))?;
1646
1647 if let Some(target) = target_replica {
1648 if !self.replica_exists(target) {
1649 return Err(ReplicaMissing(target));
1650 }
1651 }
1652
1653 let otel_ctx = OpenTelemetryContext::obtain();
1654
1655 self.peeks.insert(
1656 uuid,
1657 PendingPeek {
1658 target_replica,
1659 otel_ctx: otel_ctx.clone(),
1661 requested_at: Instant::now(),
1662 read_hold,
1663 peek_response_tx,
1664 limit: finishing.limit.map(usize::cast_from),
1665 offset: finishing.offset,
1666 },
1667 );
1668
1669 let peek = Peek {
1670 literal_constraints,
1671 uuid,
1672 timestamp,
1673 finishing,
1674 map_filter_project,
1675 otel_ctx,
1678 target: peek_target,
1679 result_desc,
1680 };
1681 self.send(ComputeCommand::Peek(Box::new(peek)));
1682
1683 Ok(())
1684 }
1685
1686 #[mz_ore::instrument(level = "debug")]
1688 pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1689 let Some(peek) = self.peeks.get_mut(&uuid) else {
1690 tracing::warn!("did not find pending peek for {uuid}");
1691 return;
1692 };
1693
1694 let duration = peek.requested_at.elapsed();
1695 self.metrics
1696 .observe_peek_response(&PeekResponse::Canceled, duration);
1697
1698 let otel_ctx = peek.otel_ctx.clone();
1700 otel_ctx.attach_as_parent();
1701
1702 self.deliver_response(ComputeControllerResponse::PeekNotification(
1703 uuid,
1704 PeekNotification::Canceled,
1705 otel_ctx,
1706 ));
1707
1708 self.finish_peek(uuid, reason);
1711 }
1712
1713 #[mz_ore::instrument(level = "debug")]
1725 pub fn set_read_policy(
1726 &mut self,
1727 policies: Vec<(GlobalId, ReadPolicy)>,
1728 ) -> Result<(), ReadPolicyError> {
1729 for (id, _policy) in &policies {
1732 let collection = self.collection(*id)?;
1733 if collection.read_policy.is_none() {
1734 return Err(ReadPolicyError::WriteOnlyCollection(*id));
1735 }
1736 }
1737
1738 for (id, new_policy) in policies {
1739 let collection = self.expect_collection_mut(id);
1740 let new_since = new_policy.frontier(collection.write_frontier().borrow());
1741 let _ = collection.implied_read_hold.try_downgrade(new_since);
1742 collection.read_policy = Some(new_policy);
1743 }
1744
1745 Ok(())
1746 }
1747
1748 #[mz_ore::instrument(level = "debug")]
1756 fn maybe_update_global_write_frontier(
1757 &mut self,
1758 id: GlobalId,
1759 new_frontier: Antichain<Timestamp>,
1760 ) {
1761 let collection = self.expect_collection_mut(id);
1762
1763 let advanced = collection.shared.lock_write_frontier(|f| {
1764 let advanced = PartialOrder::less_than(f, &new_frontier);
1765 if advanced {
1766 f.clone_from(&new_frontier);
1767 }
1768 advanced
1769 });
1770
1771 if !advanced {
1772 return;
1773 }
1774
1775 let new_since = match &collection.read_policy {
1777 Some(read_policy) => {
1778 read_policy.frontier(new_frontier.borrow())
1781 }
1782 None => {
1783 Antichain::from_iter(
1792 new_frontier
1793 .iter()
1794 .map(|t| t.step_back().unwrap_or(Timestamp::MIN)),
1795 )
1796 }
1797 };
1798 let _ = collection.implied_read_hold.try_downgrade(new_since);
1799
1800 self.deliver_response(ComputeControllerResponse::FrontierUpper {
1802 id,
1803 upper: new_frontier,
1804 });
1805 }
1806
1807 pub(super) fn apply_read_hold_change(
1809 &mut self,
1810 id: GlobalId,
1811 mut update: ChangeBatch<Timestamp>,
1812 ) {
1813 let Some(collection) = self.collections.get_mut(&id) else {
1814 soft_panic_or_log!(
1815 "read hold change for absent collection (id={id}, changes={update:?})"
1816 );
1817 return;
1818 };
1819
1820 let new_since = collection.shared.lock_read_capabilities(|caps| {
1821 let read_frontier = caps.frontier();
1824 for (time, diff) in update.iter() {
1825 let count = caps.count_for(time) + diff;
1826 assert!(
1827 count >= 0,
1828 "invalid read capabilities update: negative capability \
1829 (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1830 );
1831 assert!(
1832 count == 0 || read_frontier.less_equal(time),
1833 "invalid read capabilities update: frontier regression \
1834 (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1835 );
1836 }
1837
1838 let changes = caps.update_iter(update.drain());
1841
1842 let changed = changes.count() > 0;
1843 changed.then(|| caps.frontier().to_owned())
1844 });
1845
1846 let Some(new_since) = new_since else {
1847 return; };
1849
1850 for read_hold in collection.compute_dependencies.values_mut() {
1852 read_hold
1853 .try_downgrade(new_since.clone())
1854 .expect("frontiers don't regress");
1855 }
1856 for read_hold in collection.storage_dependencies.values_mut() {
1857 read_hold
1858 .try_downgrade(new_since.clone())
1859 .expect("frontiers don't regress");
1860 }
1861
1862 self.send(ComputeCommand::AllowCompaction {
1864 id,
1865 frontier: new_since,
1866 });
1867 }
1868
1869 fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1878 let Some(peek) = self.peeks.remove(&uuid) else {
1879 return;
1880 };
1881
1882 let _ = peek.peek_response_tx.send(response);
1884
1885 self.send(ComputeCommand::CancelPeek { uuid });
1888
1889 drop(peek.read_hold);
1890 }
1891
1892 fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse) {
1895 if self
1897 .replicas
1898 .get(&replica_id)
1899 .filter(|replica| replica.epoch == epoch)
1900 .is_none()
1901 {
1902 return;
1903 }
1904
1905 match response {
1908 ComputeResponse::Frontiers(id, frontiers) => {
1909 self.handle_frontiers_response(id, frontiers, replica_id);
1910 }
1911 ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
1912 self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
1913 }
1914 ComputeResponse::CopyToResponse(id, response) => {
1915 self.handle_copy_to_response(id, response, replica_id);
1916 }
1917 ComputeResponse::SubscribeResponse(id, response) => {
1918 self.handle_subscribe_response(id, response, replica_id);
1919 }
1920 ComputeResponse::Status(response) => {
1921 self.handle_status_response(response, replica_id);
1922 }
1923 }
1924 }
1925
1926 fn handle_frontiers_response(
1929 &mut self,
1930 id: GlobalId,
1931 frontiers: FrontiersResponse,
1932 replica_id: ReplicaId,
1933 ) {
1934 if !self.collections.contains_key(&id) {
1935 soft_panic_or_log!(
1936 "frontiers update for an unknown collection \
1937 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1938 );
1939 return;
1940 }
1941 let Some(replica) = self.replicas.get_mut(&replica_id) else {
1942 soft_panic_or_log!(
1943 "frontiers update for an unknown replica \
1944 (replica_id={replica_id}, frontiers={frontiers:?})"
1945 );
1946 return;
1947 };
1948 let Some(replica_collection) = replica.collections.get_mut(&id) else {
1949 soft_panic_or_log!(
1950 "frontiers update for an unknown replica collection \
1951 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1952 );
1953 return;
1954 };
1955
1956 if let Some(new_frontier) = frontiers.input_frontier {
1957 replica_collection.update_input_frontier(new_frontier.clone());
1958 }
1959 if let Some(new_frontier) = frontiers.output_frontier {
1960 replica_collection.update_output_frontier(new_frontier.clone());
1961 }
1962 if let Some(new_frontier) = frontiers.write_frontier {
1963 replica_collection.update_write_frontier(new_frontier.clone());
1964 self.maybe_update_global_write_frontier(id, new_frontier);
1965 }
1966 }
1967
1968 #[mz_ore::instrument(level = "debug")]
1969 fn handle_peek_response(
1970 &mut self,
1971 uuid: Uuid,
1972 response: PeekResponse,
1973 otel_ctx: OpenTelemetryContext,
1974 replica_id: ReplicaId,
1975 ) {
1976 otel_ctx.attach_as_parent();
1977
1978 let Some(peek) = self.peeks.get(&uuid) else {
1981 return;
1982 };
1983
1984 let target_replica = peek.target_replica.unwrap_or(replica_id);
1986 if target_replica != replica_id {
1987 return;
1988 }
1989
1990 let duration = peek.requested_at.elapsed();
1991 self.metrics.observe_peek_response(&response, duration);
1992
1993 let notification = PeekNotification::new(&response, peek.offset, peek.limit);
1994 self.deliver_response(ComputeControllerResponse::PeekNotification(
1997 uuid,
1998 notification,
1999 otel_ctx,
2000 ));
2001
2002 self.finish_peek(uuid, response)
2003 }
2004
2005 fn handle_copy_to_response(
2006 &mut self,
2007 sink_id: GlobalId,
2008 response: CopyToResponse,
2009 replica_id: ReplicaId,
2010 ) {
2011 if !self.collections.contains_key(&sink_id) {
2012 soft_panic_or_log!(
2013 "received response for an unknown copy-to \
2014 (sink_id={sink_id}, replica_id={replica_id})",
2015 );
2016 return;
2017 }
2018 let Some(replica) = self.replicas.get_mut(&replica_id) else {
2019 soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
2020 return;
2021 };
2022 let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
2023 soft_panic_or_log!(
2024 "copy-to response for an unknown replica collection \
2025 (sink_id={sink_id}, replica_id={replica_id})"
2026 );
2027 return;
2028 };
2029
2030 replica_collection.update_write_frontier(Antichain::new());
2034 replica_collection.update_input_frontier(Antichain::new());
2035 replica_collection.update_output_frontier(Antichain::new());
2036
2037 if !self.copy_tos.remove(&sink_id) {
2040 return;
2041 }
2042
2043 let result = match response {
2044 CopyToResponse::RowCount(count) => Ok(count),
2045 CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
2046 CopyToResponse::Dropped => {
2051 tracing::error!(
2052 %sink_id, %replica_id,
2053 "received `Dropped` response for a tracked copy to",
2054 );
2055 return;
2056 }
2057 };
2058
2059 self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2060 }
2061
2062 fn handle_subscribe_response(
2063 &mut self,
2064 subscribe_id: GlobalId,
2065 response: SubscribeResponse,
2066 replica_id: ReplicaId,
2067 ) {
2068 if !self.collections.contains_key(&subscribe_id) {
2069 soft_panic_or_log!(
2070 "received response for an unknown subscribe \
2071 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2072 );
2073 return;
2074 }
2075 let Some(replica) = self.replicas.get_mut(&replica_id) else {
2076 soft_panic_or_log!(
2077 "subscribe response for an unknown replica (replica_id={replica_id})"
2078 );
2079 return;
2080 };
2081 let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2082 soft_panic_or_log!(
2083 "subscribe response for an unknown replica collection \
2084 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2085 );
2086 return;
2087 };
2088
2089 let write_frontier = match &response {
2093 SubscribeResponse::Batch(batch) => batch.upper.clone(),
2094 SubscribeResponse::DroppedAt(_) => Antichain::new(),
2095 };
2096
2097 replica_collection.update_write_frontier(write_frontier.clone());
2101 replica_collection.update_input_frontier(write_frontier.clone());
2102 replica_collection.update_output_frontier(write_frontier.clone());
2103
2104 let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2106 return;
2107 };
2108
2109 self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2115
2116 match response {
2117 SubscribeResponse::Batch(batch) => {
2118 let upper = batch.upper;
2119 let mut updates = batch.updates;
2120
2121 if PartialOrder::less_than(&subscribe.frontier, &upper) {
2124 let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2125
2126 if upper.is_empty() {
2127 self.subscribes.remove(&subscribe_id);
2129 } else {
2130 self.subscribes.insert(subscribe_id, subscribe);
2132 }
2133
2134 if let Ok(updates) = updates.as_mut() {
2135 updates.retain_mut(|updates| {
2136 let offset = updates.times().partition_point(|t| {
2137 !lower.less_equal(t)
2140 });
2141 let (_, past_lower) = std::mem::take(updates).split_at(offset);
2142 *updates = past_lower;
2143 updates.len() > 0
2144 });
2145 }
2146 self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2147 subscribe_id,
2148 SubscribeBatch {
2149 lower,
2150 upper,
2151 updates,
2152 },
2153 ));
2154 }
2155 }
2156 SubscribeResponse::DroppedAt(frontier) => {
2157 tracing::error!(
2162 %subscribe_id,
2163 %replica_id,
2164 frontier = ?frontier.elements(),
2165 "received `DroppedAt` response for a tracked subscribe",
2166 );
2167 self.subscribes.remove(&subscribe_id);
2168 }
2169 }
2170 }
2171
2172 fn handle_status_response(&self, response: StatusResponse, _replica_id: ReplicaId) {
2173 match response {
2174 StatusResponse::Placeholder => {}
2175 }
2176 }
2177
2178 fn dependency_write_frontiers<'b>(
2180 &'b self,
2181 collection: &'b CollectionState,
2182 ) -> impl Iterator<Item = Antichain<Timestamp>> + 'b {
2183 let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2184 let collection = self.collections.get(&dep_id);
2185 collection.map(|c| c.write_frontier())
2186 });
2187 let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2188 let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2189 frontiers.map(|f| f.write_frontier)
2190 });
2191
2192 compute_frontiers.chain(storage_frontiers)
2193 }
2194
2195 fn transitive_storage_dependency_write_frontiers<'b>(
2197 &'b self,
2198 collection: &'b CollectionState,
2199 ) -> impl Iterator<Item = Antichain<Timestamp>> + 'b {
2200 let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2201 let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2202 let mut done = BTreeSet::new();
2203
2204 while let Some(id) = todo.pop() {
2205 if done.contains(&id) {
2206 continue;
2207 }
2208 if let Some(dep) = self.collections.get(&id) {
2209 storage_ids.extend(dep.storage_dependency_ids());
2210 todo.extend(dep.compute_dependency_ids())
2211 }
2212 done.insert(id);
2213 }
2214
2215 let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2216 let frontiers = self.storage_collections.collection_frontiers(id).ok();
2217 frontiers.map(|f| f.write_frontier)
2218 });
2219
2220 storage_frontiers
2221 }
2222
2223 fn downgrade_warmup_capabilities(&mut self) {
2236 let mut new_capabilities = BTreeMap::new();
2237 for (id, collection) in &self.collections {
2238 if collection.read_policy.is_none()
2242 && collection.shared.lock_write_frontier(|f| f.is_empty())
2243 {
2244 new_capabilities.insert(*id, Antichain::new());
2245 continue;
2246 }
2247
2248 let mut new_capability = Antichain::new();
2249 for frontier in self.dependency_write_frontiers(collection) {
2250 for time in frontier {
2251 new_capability.insert(time.step_back().unwrap_or(time));
2252 }
2253 }
2254
2255 new_capabilities.insert(*id, new_capability);
2256 }
2257
2258 for (id, new_capability) in new_capabilities {
2259 let collection = self.expect_collection_mut(id);
2260 let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2261 }
2262 }
2263
2264 fn forward_implied_capabilities(&mut self) {
2292 if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2293 return;
2294 }
2295 if !self.replicas.is_empty() {
2296 return;
2297 }
2298
2299 let mut new_capabilities = BTreeMap::new();
2300 for (id, collection) in &self.collections {
2301 let Some(read_policy) = &collection.read_policy else {
2302 continue;
2304 };
2305
2306 let mut dep_frontier = Antichain::new();
2310 for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2311 dep_frontier.extend(frontier);
2312 }
2313
2314 let new_capability = read_policy.frontier(dep_frontier.borrow());
2315 if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2316 new_capabilities.insert(*id, new_capability);
2317 }
2318 }
2319
2320 for (id, new_capability) in new_capabilities {
2321 let collection = self.expect_collection_mut(id);
2322 let _ = collection.implied_read_hold.try_downgrade(new_capability);
2323 }
2324 }
2325
2326 pub(super) fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold, CollectionMissing> {
2331 let collection = self.collection(id)?;
2337 let since = collection.shared.lock_read_capabilities(|caps| {
2338 let since = caps.frontier().to_owned();
2339 caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
2340 since
2341 });
2342 let hold = ReadHold::new(id, since, Arc::clone(&self.read_hold_tx));
2343 Ok(hold)
2344 }
2345
2346 #[mz_ore::instrument(level = "debug")]
2352 pub fn maintain(&mut self) {
2353 self.rehydrate_failed_replicas();
2354 self.downgrade_warmup_capabilities();
2355 self.forward_implied_capabilities();
2356 self.schedule_collections();
2357 self.cleanup_collections();
2358 self.update_frontier_introspection();
2359 self.refresh_state_metrics();
2360 self.refresh_wallclock_lag();
2361 }
2362}
2363
2364#[derive(Debug)]
2369struct CollectionState {
2370 target_replica: Option<ReplicaId>,
2372 log_collection: bool,
2376 dropped: bool,
2382 scheduled: bool,
2385
2386 read_only: bool,
2390
2391 shared: SharedCollectionState,
2393
2394 implied_read_hold: ReadHold,
2401 warmup_read_hold: ReadHold,
2409 read_policy: Option<ReadPolicy>,
2415
2416 storage_dependencies: BTreeMap<GlobalId, ReadHold>,
2419 compute_dependencies: BTreeMap<GlobalId, ReadHold>,
2422
2423 introspection: CollectionIntrospection,
2425
2426 wallclock_lag_histogram_stash: Option<
2433 BTreeMap<
2434 (
2435 WallclockLagHistogramPeriod,
2436 WallclockLag,
2437 BTreeMap<&'static str, String>,
2438 ),
2439 Diff,
2440 >,
2441 >,
2442}
2443
2444impl CollectionState {
2445 fn new(
2447 collection_id: GlobalId,
2448 as_of: Antichain<Timestamp>,
2449 shared: SharedCollectionState,
2450 storage_dependencies: BTreeMap<GlobalId, ReadHold>,
2451 compute_dependencies: BTreeMap<GlobalId, ReadHold>,
2452 read_hold_tx: read_holds::ChangeTx,
2453 introspection: CollectionIntrospection,
2454 ) -> Self {
2455 let since = as_of.clone();
2457 let upper = as_of;
2459
2460 assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2462 assert!(shared.lock_write_frontier(|f| f == &upper));
2463
2464 let implied_read_hold =
2468 ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2469 let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2470
2471 let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2472 shared.lock_read_capabilities(|c| {
2473 c.update_iter(updates);
2474 });
2475
2476 let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2480 true => None,
2481 false => Some(Default::default()),
2482 };
2483
2484 Self {
2485 target_replica: None,
2486 log_collection: false,
2487 dropped: false,
2488 scheduled: false,
2489 read_only: true,
2490 shared,
2491 implied_read_hold,
2492 warmup_read_hold,
2493 read_policy: Some(ReadPolicy::ValidFrom(since)),
2494 storage_dependencies,
2495 compute_dependencies,
2496 introspection,
2497 wallclock_lag_histogram_stash,
2498 }
2499 }
2500
2501 fn new_log_collection(
2503 id: GlobalId,
2504 shared: SharedCollectionState,
2505 read_hold_tx: read_holds::ChangeTx,
2506 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2507 ) -> Self {
2508 let since = Antichain::from_elem(Timestamp::MIN);
2509 let introspection = CollectionIntrospection::new(
2510 id,
2511 introspection_tx,
2512 since.clone(),
2513 false,
2514 None,
2515 None,
2516 Vec::new(),
2517 );
2518 let mut state = Self::new(
2519 id,
2520 since,
2521 shared,
2522 Default::default(),
2523 Default::default(),
2524 read_hold_tx,
2525 introspection,
2526 );
2527 state.log_collection = true;
2528 state.scheduled = true;
2530 state
2531 }
2532
2533 fn read_frontier(&self) -> Antichain<Timestamp> {
2535 self.shared
2536 .lock_read_capabilities(|c| c.frontier().to_owned())
2537 }
2538
2539 fn write_frontier(&self) -> Antichain<Timestamp> {
2541 self.shared.lock_write_frontier(|f| f.clone())
2542 }
2543
2544 fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2545 self.storage_dependencies.keys().copied()
2546 }
2547
2548 fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2549 self.compute_dependencies.keys().copied()
2550 }
2551}
2552
2553#[derive(Clone, Debug)]
2564pub(super) struct SharedCollectionState {
2565 read_capabilities: Arc<Mutex<MutableAntichain<Timestamp>>>,
2578 write_frontier: Arc<Mutex<Antichain<Timestamp>>>,
2580}
2581
2582impl SharedCollectionState {
2583 pub fn new(as_of: Antichain<Timestamp>) -> Self {
2584 let since = as_of.clone();
2586 let upper = as_of;
2588
2589 let mut read_capabilities = MutableAntichain::new();
2593 read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2594
2595 Self {
2596 read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2597 write_frontier: Arc::new(Mutex::new(upper)),
2598 }
2599 }
2600
2601 pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2602 where
2603 F: FnOnce(&mut MutableAntichain<Timestamp>) -> R,
2604 {
2605 let mut caps = self.read_capabilities.lock().expect("poisoned");
2606 f(&mut *caps)
2607 }
2608
2609 pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2610 where
2611 F: FnOnce(&mut Antichain<Timestamp>) -> R,
2612 {
2613 let mut frontier = self.write_frontier.lock().expect("poisoned");
2614 f(&mut *frontier)
2615 }
2616}
2617
2618#[derive(Debug)]
2621struct CollectionIntrospection {
2622 collection_id: GlobalId,
2624 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2626 frontiers: Option<FrontiersIntrospectionState>,
2631 refresh: Option<RefreshIntrospectionState>,
2635 dependency_ids: Vec<GlobalId>,
2637}
2638
2639impl CollectionIntrospection {
2640 fn new(
2641 collection_id: GlobalId,
2642 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2643 as_of: Antichain<Timestamp>,
2644 storage_sink: bool,
2645 initial_as_of: Option<Antichain<Timestamp>>,
2646 refresh_schedule: Option<RefreshSchedule>,
2647 dependency_ids: Vec<GlobalId>,
2648 ) -> Self {
2649 let refresh =
2650 match (refresh_schedule, initial_as_of) {
2651 (Some(refresh_schedule), Some(initial_as_of)) => Some(
2652 RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2653 ),
2654 (refresh_schedule, _) => {
2655 soft_assert_or_log!(
2658 refresh_schedule.is_none(),
2659 "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2660 );
2661 None
2662 }
2663 };
2664 let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2665
2666 let self_ = Self {
2667 collection_id,
2668 introspection_tx,
2669 frontiers,
2670 refresh,
2671 dependency_ids,
2672 };
2673
2674 self_.report_initial_state();
2675 self_
2676 }
2677
2678 fn report_initial_state(&self) {
2680 if let Some(frontiers) = &self.frontiers {
2681 let row = frontiers.row_for_collection(self.collection_id);
2682 let updates = vec![(row, Diff::ONE)];
2683 self.send(IntrospectionType::Frontiers, updates);
2684 }
2685
2686 if let Some(refresh) = &self.refresh {
2687 let row = refresh.row_for_collection(self.collection_id);
2688 let updates = vec![(row, Diff::ONE)];
2689 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2690 }
2691
2692 if !self.dependency_ids.is_empty() {
2693 let updates = self.dependency_rows(Diff::ONE);
2694 self.send(IntrospectionType::ComputeDependencies, updates);
2695 }
2696 }
2697
2698 fn dependency_rows(&self, diff: Diff) -> Vec<(Row, Diff)> {
2700 self.dependency_ids
2701 .iter()
2702 .map(|dependency_id| {
2703 let row = Row::pack_slice(&[
2704 Datum::String(&self.collection_id.to_string()),
2705 Datum::String(&dependency_id.to_string()),
2706 ]);
2707 (row, diff)
2708 })
2709 .collect()
2710 }
2711
2712 fn observe_frontiers(
2715 &mut self,
2716 read_frontier: &Antichain<Timestamp>,
2717 write_frontier: &Antichain<Timestamp>,
2718 ) {
2719 self.update_frontier_introspection(read_frontier, write_frontier);
2720 self.update_refresh_introspection(write_frontier);
2721 }
2722
2723 fn update_frontier_introspection(
2724 &mut self,
2725 read_frontier: &Antichain<Timestamp>,
2726 write_frontier: &Antichain<Timestamp>,
2727 ) {
2728 let Some(frontiers) = &mut self.frontiers else {
2729 return;
2730 };
2731
2732 if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2733 {
2734 return; };
2736
2737 let retraction = frontiers.row_for_collection(self.collection_id);
2738 frontiers.update(read_frontier, write_frontier);
2739 let insertion = frontiers.row_for_collection(self.collection_id);
2740 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2741 self.send(IntrospectionType::Frontiers, updates);
2742 }
2743
2744 fn update_refresh_introspection(&mut self, write_frontier: &Antichain<Timestamp>) {
2745 let Some(refresh) = &mut self.refresh else {
2746 return;
2747 };
2748
2749 let retraction = refresh.row_for_collection(self.collection_id);
2750 refresh.frontier_update(write_frontier);
2751 let insertion = refresh.row_for_collection(self.collection_id);
2752
2753 if retraction == insertion {
2754 return; }
2756
2757 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2758 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2759 }
2760
2761 fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2762 let _ = self.introspection_tx.send((introspection_type, updates));
2765 }
2766}
2767
2768impl Drop for CollectionIntrospection {
2769 fn drop(&mut self) {
2770 if let Some(frontiers) = &self.frontiers {
2772 let row = frontiers.row_for_collection(self.collection_id);
2773 let updates = vec![(row, Diff::MINUS_ONE)];
2774 self.send(IntrospectionType::Frontiers, updates);
2775 }
2776
2777 if let Some(refresh) = &self.refresh {
2779 let retraction = refresh.row_for_collection(self.collection_id);
2780 let updates = vec![(retraction, Diff::MINUS_ONE)];
2781 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2782 }
2783
2784 if !self.dependency_ids.is_empty() {
2786 let updates = self.dependency_rows(Diff::MINUS_ONE);
2787 self.send(IntrospectionType::ComputeDependencies, updates);
2788 }
2789 }
2790}
2791
2792#[derive(Debug)]
2793struct FrontiersIntrospectionState {
2794 read_frontier: Antichain<Timestamp>,
2795 write_frontier: Antichain<Timestamp>,
2796}
2797
2798impl FrontiersIntrospectionState {
2799 fn new(as_of: Antichain<Timestamp>) -> Self {
2800 Self {
2801 read_frontier: as_of.clone(),
2802 write_frontier: as_of,
2803 }
2804 }
2805
2806 fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2808 let read_frontier = self
2809 .read_frontier
2810 .as_option()
2811 .map_or(Datum::Null, |ts| ts.clone().into());
2812 let write_frontier = self
2813 .write_frontier
2814 .as_option()
2815 .map_or(Datum::Null, |ts| ts.clone().into());
2816 Row::pack_slice(&[
2817 Datum::String(&collection_id.to_string()),
2818 read_frontier,
2819 write_frontier,
2820 ])
2821 }
2822
2823 fn update(
2825 &mut self,
2826 read_frontier: &Antichain<Timestamp>,
2827 write_frontier: &Antichain<Timestamp>,
2828 ) {
2829 if read_frontier != &self.read_frontier {
2830 self.read_frontier.clone_from(read_frontier);
2831 }
2832 if write_frontier != &self.write_frontier {
2833 self.write_frontier.clone_from(write_frontier);
2834 }
2835 }
2836}
2837
2838#[derive(Debug)]
2841struct RefreshIntrospectionState {
2842 refresh_schedule: RefreshSchedule,
2844 initial_as_of: Antichain<Timestamp>,
2845 next_refresh: Datum<'static>, last_completed_refresh: Datum<'static>, }
2849
2850impl RefreshIntrospectionState {
2851 fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2853 Row::pack_slice(&[
2854 Datum::String(&collection_id.to_string()),
2855 self.last_completed_refresh,
2856 self.next_refresh,
2857 ])
2858 }
2859}
2860
2861impl RefreshIntrospectionState {
2862 fn new(
2865 refresh_schedule: RefreshSchedule,
2866 initial_as_of: Antichain<Timestamp>,
2867 upper: &Antichain<Timestamp>,
2868 ) -> Self {
2869 let mut self_ = Self {
2870 refresh_schedule: refresh_schedule.clone(),
2871 initial_as_of: initial_as_of.clone(),
2872 next_refresh: Datum::Null,
2873 last_completed_refresh: Datum::Null,
2874 };
2875 self_.frontier_update(upper);
2876 self_
2877 }
2878
2879 fn frontier_update(&mut self, write_frontier: &Antichain<Timestamp>) {
2882 if write_frontier.is_empty() {
2883 self.last_completed_refresh =
2884 if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2885 last_refresh.into()
2886 } else {
2887 Timestamp::MAX.into()
2890 };
2891 self.next_refresh = Datum::Null;
2892 } else {
2893 if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2894 self.last_completed_refresh = Datum::Null;
2896 let initial_as_of = self.initial_as_of.as_option().expect(
2897 "initial_as_of can't be [], because then there would be no refreshes at all",
2898 );
2899 let first_refresh = self
2900 .refresh_schedule
2901 .round_up_timestamp(*initial_as_of)
2902 .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2903 soft_assert_or_log!(
2904 first_refresh == *initial_as_of,
2905 "initial_as_of should be set to the first refresh"
2906 );
2907 self.next_refresh = first_refresh.into();
2908 } else {
2909 let write_frontier = write_frontier.as_option().expect("checked above");
2911 self.last_completed_refresh = self
2912 .refresh_schedule
2913 .round_down_timestamp_m1(*write_frontier)
2914 .map_or_else(
2915 || {
2916 soft_panic_or_log!(
2917 "rounding down should have returned the first refresh or later"
2918 );
2919 Datum::Null
2920 },
2921 |last_completed_refresh| last_completed_refresh.into(),
2922 );
2923 self.next_refresh = write_frontier.clone().into();
2924 }
2925 }
2926 }
2927}
2928
2929#[derive(Debug)]
2931struct PendingPeek {
2932 target_replica: Option<ReplicaId>,
2936 otel_ctx: OpenTelemetryContext,
2938 requested_at: Instant,
2942 read_hold: ReadHold,
2944 peek_response_tx: oneshot::Sender<PeekResponse>,
2946 limit: Option<usize>,
2948 offset: usize,
2950}
2951
2952#[derive(Debug, Clone)]
2953struct ActiveSubscribe {
2954 frontier: Antichain<Timestamp>,
2956}
2957
2958impl Default for ActiveSubscribe {
2959 fn default() -> Self {
2960 Self {
2961 frontier: Antichain::from_elem(Timestamp::MIN),
2962 }
2963 }
2964}
2965
2966#[derive(Debug)]
2968struct ReplicaState {
2969 id: ReplicaId,
2971 client: ReplicaClient,
2973 config: ReplicaConfig,
2975 metrics: ReplicaMetrics,
2977 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2979 collections: BTreeMap<GlobalId, ReplicaCollectionState>,
2981 epoch: u64,
2983}
2984
2985impl ReplicaState {
2986 fn new(
2987 id: ReplicaId,
2988 client: ReplicaClient,
2989 config: ReplicaConfig,
2990 metrics: ReplicaMetrics,
2991 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2992 epoch: u64,
2993 ) -> Self {
2994 Self {
2995 id,
2996 client,
2997 config,
2998 metrics,
2999 introspection_tx,
3000 epoch,
3001 collections: Default::default(),
3002 }
3003 }
3004
3005 fn add_collection(
3011 &mut self,
3012 id: GlobalId,
3013 as_of: Antichain<Timestamp>,
3014 input_read_holds: Vec<ReadHold>,
3015 ) {
3016 let metrics = self.metrics.for_collection(id);
3017 let introspection = ReplicaCollectionIntrospection::new(
3018 self.id,
3019 id,
3020 self.introspection_tx.clone(),
3021 as_of.clone(),
3022 );
3023 let mut state =
3024 ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
3025
3026 if id.is_transient() {
3030 state.wallclock_lag_max = None;
3031 }
3032
3033 if let Some(previous) = self.collections.insert(id, state) {
3034 panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
3035 }
3036 }
3037
3038 fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState> {
3040 self.collections.remove(&id)
3041 }
3042
3043 fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
3045 self.collections.get(&id).map_or(true, |c| {
3046 c.write_frontier.is_empty()
3047 && c.input_frontier.is_empty()
3048 && c.output_frontier.is_empty()
3049 })
3050 }
3051
3052 #[mz_ore::instrument(level = "debug")]
3056 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3057 let Self {
3064 id,
3065 client: _,
3066 config: _,
3067 metrics: _,
3068 introspection_tx: _,
3069 epoch,
3070 collections,
3071 } = self;
3072
3073 let collections: BTreeMap<_, _> = collections
3074 .iter()
3075 .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
3076 .collect();
3077
3078 Ok(serde_json::json!({
3079 "id": id.to_string(),
3080 "collections": collections,
3081 "epoch": epoch,
3082 }))
3083 }
3084}
3085
3086#[derive(Debug)]
3087struct ReplicaCollectionState {
3088 write_frontier: Antichain<Timestamp>,
3092 input_frontier: Antichain<Timestamp>,
3096 output_frontier: Antichain<Timestamp>,
3100
3101 metrics: Option<ReplicaCollectionMetrics>,
3105 as_of: Antichain<Timestamp>,
3107 introspection: ReplicaCollectionIntrospection,
3109 input_read_holds: Vec<ReadHold>,
3115
3116 wallclock_lag_max: Option<WallclockLag>,
3120}
3121
3122impl ReplicaCollectionState {
3123 fn new(
3124 metrics: Option<ReplicaCollectionMetrics>,
3125 as_of: Antichain<Timestamp>,
3126 introspection: ReplicaCollectionIntrospection,
3127 input_read_holds: Vec<ReadHold>,
3128 ) -> Self {
3129 Self {
3130 write_frontier: as_of.clone(),
3131 input_frontier: as_of.clone(),
3132 output_frontier: as_of.clone(),
3133 metrics,
3134 as_of,
3135 introspection,
3136 input_read_holds,
3137 wallclock_lag_max: Some(WallclockLag::MIN),
3138 }
3139 }
3140
3141 fn hydrated(&self) -> bool {
3143 self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3159 }
3160
3161 fn update_write_frontier(&mut self, new_frontier: Antichain<Timestamp>) {
3163 if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3164 soft_panic_or_log!(
3165 "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3166 self.write_frontier,
3167 );
3168 return;
3169 } else if new_frontier == self.write_frontier {
3170 return;
3171 }
3172
3173 self.write_frontier = new_frontier;
3174 }
3175
3176 fn update_input_frontier(&mut self, new_frontier: Antichain<Timestamp>) {
3178 if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3179 soft_panic_or_log!(
3180 "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3181 self.input_frontier,
3182 );
3183 return;
3184 } else if new_frontier == self.input_frontier {
3185 return;
3186 }
3187
3188 self.input_frontier = new_frontier;
3189
3190 for read_hold in &mut self.input_read_holds {
3192 let result = read_hold.try_downgrade(self.input_frontier.clone());
3193 soft_assert_or_log!(
3194 result.is_ok(),
3195 "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3196 self.input_frontier,
3197 );
3198 }
3199 }
3200
3201 fn update_output_frontier(&mut self, new_frontier: Antichain<Timestamp>) {
3203 if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3204 soft_panic_or_log!(
3205 "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3206 self.output_frontier,
3207 );
3208 return;
3209 } else if new_frontier == self.output_frontier {
3210 return;
3211 }
3212
3213 self.output_frontier = new_frontier;
3214 }
3215}
3216
3217#[derive(Debug)]
3220struct ReplicaCollectionIntrospection {
3221 replica_id: ReplicaId,
3223 collection_id: GlobalId,
3225 write_frontier: Antichain<Timestamp>,
3227 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3229}
3230
3231impl ReplicaCollectionIntrospection {
3232 fn new(
3234 replica_id: ReplicaId,
3235 collection_id: GlobalId,
3236 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3237 as_of: Antichain<Timestamp>,
3238 ) -> Self {
3239 let self_ = Self {
3240 replica_id,
3241 collection_id,
3242 write_frontier: as_of,
3243 introspection_tx,
3244 };
3245
3246 self_.report_initial_state();
3247 self_
3248 }
3249
3250 fn report_initial_state(&self) {
3252 let row = self.write_frontier_row();
3253 let updates = vec![(row, Diff::ONE)];
3254 self.send(IntrospectionType::ReplicaFrontiers, updates);
3255 }
3256
3257 fn observe_frontier(&mut self, write_frontier: &Antichain<Timestamp>) {
3259 if self.write_frontier == *write_frontier {
3260 return; }
3262
3263 let retraction = self.write_frontier_row();
3264 self.write_frontier.clone_from(write_frontier);
3265 let insertion = self.write_frontier_row();
3266
3267 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3268 self.send(IntrospectionType::ReplicaFrontiers, updates);
3269 }
3270
3271 fn write_frontier_row(&self) -> Row {
3273 let write_frontier = self
3274 .write_frontier
3275 .as_option()
3276 .map_or(Datum::Null, |ts| ts.clone().into());
3277 Row::pack_slice(&[
3278 Datum::String(&self.collection_id.to_string()),
3279 Datum::String(&self.replica_id.to_string()),
3280 write_frontier,
3281 ])
3282 }
3283
3284 fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3285 let _ = self.introspection_tx.send((introspection_type, updates));
3288 }
3289}
3290
3291impl Drop for ReplicaCollectionIntrospection {
3292 fn drop(&mut self) {
3293 let row = self.write_frontier_row();
3295 let updates = vec![(row, Diff::MINUS_ONE)];
3296 self.send(IntrospectionType::ReplicaFrontiers, updates);
3297 }
3298}