1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
21use mz_compute_types::plan::render_plan::RenderPlan;
22use mz_compute_types::sinks::{
23 ComputeSinkConnection, ComputeSinkDesc, MaterializedViewSinkConnection,
24};
25use mz_compute_types::sources::SourceInstanceDesc;
26use mz_controller_types::dyncfgs::{
27 ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, WALLCLOCK_LAG_RECORDING_INTERVAL,
28};
29use mz_dyncfg::{ConfigSet, ConfigUpdates};
30use mz_expr::RowSetFinishing;
31use mz_ore::cast::CastFrom;
32use mz_ore::channel::instrumented_unbounded_channel;
33use mz_ore::now::NowFn;
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{soft_assert_or_log, soft_panic_or_log};
36use mz_persist_types::PersistLocation;
37use mz_repr::adt::timestamp::CheckedTimestamp;
38use mz_repr::refresh_schedule::RefreshSchedule;
39use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row, Timestamp};
40use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
41use mz_storage_types::read_holds::{self, ReadHold};
42use mz_storage_types::read_policy::ReadPolicy;
43use thiserror::Error;
44use timely::PartialOrder;
45use timely::progress::frontier::MutableAntichain;
46use timely::progress::{Antichain, ChangeBatch};
47use tokio::sync::{mpsc, oneshot};
48use uuid::Uuid;
49
50use crate::controller::error::{
51 CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
52};
53use crate::controller::instance_client::PeekError;
54use crate::controller::replica::{ReplicaClient, ReplicaConfig};
55use crate::controller::{
56 ComputeControllerResponse, IntrospectionUpdates, PeekNotification, ReplicaId,
57 StorageCollections,
58};
59use crate::logging::LogVariant;
60use crate::metrics::IntCounter;
61use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
62use crate::protocol::command::{
63 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
64};
65use crate::protocol::history::ComputeCommandHistory;
66use crate::protocol::response::{
67 ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StatusResponse,
68 SubscribeBatch, SubscribeResponse,
69};
70
71#[derive(Error, Debug)]
72#[error("replica exists already: {0}")]
73pub(super) struct ReplicaExists(pub ReplicaId);
74
75#[derive(Error, Debug)]
76#[error("replica does not exist: {0}")]
77pub(super) struct ReplicaMissing(pub ReplicaId);
78
79#[derive(Error, Debug)]
80pub(super) enum DataflowCreationError {
81 #[error("collection does not exist: {0}")]
82 CollectionMissing(GlobalId),
83 #[error("replica does not exist: {0}")]
84 ReplicaMissing(ReplicaId),
85 #[error("dataflow definition lacks an as_of value")]
86 MissingAsOf,
87 #[error("subscribe dataflow has an empty as_of")]
88 EmptyAsOfForSubscribe,
89 #[error("copy to dataflow has an empty as_of")]
90 EmptyAsOfForCopyTo,
91 #[error("no read hold provided for dataflow import: {0}")]
92 ReadHoldMissing(GlobalId),
93 #[error("insufficient read hold provided for dataflow import: {0}")]
94 ReadHoldInsufficient(GlobalId),
95}
96
97impl From<CollectionMissing> for DataflowCreationError {
98 fn from(error: CollectionMissing) -> Self {
99 Self::CollectionMissing(error.0)
100 }
101}
102
103#[derive(Error, Debug)]
104pub(super) enum ReadPolicyError {
105 #[error("collection does not exist: {0}")]
106 CollectionMissing(GlobalId),
107 #[error("collection is write-only: {0}")]
108 WriteOnlyCollection(GlobalId),
109}
110
111impl From<CollectionMissing> for ReadPolicyError {
112 fn from(error: CollectionMissing) -> Self {
113 Self::CollectionMissing(error.0)
114 }
115}
116
117pub(super) type Command = Box<dyn FnOnce(&mut Instance) + Send>;
119
120pub(super) type ReplicaResponse = (ReplicaId, u64, ComputeResponse);
123
124pub(super) struct Instance {
126 build_info: &'static BuildInfo,
128 storage_collections: StorageCollections,
130 initialized: bool,
132 read_only: bool,
137 workload_class: Option<String>,
141 replicas: BTreeMap<ReplicaId, ReplicaState>,
143 replica_dyncfg_overrides: BTreeMap<ReplicaId, ConfigUpdates>,
150 collections: BTreeMap<GlobalId, CollectionState>,
158 log_sources: BTreeMap<LogVariant, GlobalId>,
160 peeks: BTreeMap<Uuid, PendingPeek>,
169 subscribes: BTreeMap<GlobalId, ActiveSubscribe>,
183 copy_tos: BTreeSet<GlobalId>,
191 history: ComputeCommandHistory<UIntGauge>,
193 command_rx: mpsc::UnboundedReceiver<Command>,
195 response_tx: mpsc::UnboundedSender<ComputeControllerResponse>,
197 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
199 metrics: InstanceMetrics,
201 dyncfg: Arc<ConfigSet>,
203
204 peek_stash_persist_location: PersistLocation,
206
207 now: NowFn,
209 wallclock_lag: WallclockLagFn<Timestamp>,
211 wallclock_lag_last_recorded: DateTime<Utc>,
213
214 read_hold_tx: read_holds::ChangeTx,
219 replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse, IntCounter>,
221 replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse, IntCounter>,
223}
224
225impl Instance {
226 fn collection(&self, id: GlobalId) -> Result<&CollectionState, CollectionMissing> {
228 self.collections.get(&id).ok_or(CollectionMissing(id))
229 }
230
231 fn collection_mut(&mut self, id: GlobalId) -> Result<&mut CollectionState, CollectionMissing> {
233 self.collections.get_mut(&id).ok_or(CollectionMissing(id))
234 }
235
236 fn expect_collection(&self, id: GlobalId) -> &CollectionState {
242 self.collections.get(&id).expect("collection must exist")
243 }
244
245 fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
251 self.collections
252 .get_mut(&id)
253 .expect("collection must exist")
254 }
255
256 fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState)> {
257 self.collections.iter().map(|(id, coll)| (*id, coll))
258 }
259
260 fn replicas_hosting(
267 &self,
268 id: GlobalId,
269 ) -> Result<impl Iterator<Item = &ReplicaState>, CollectionMissing> {
270 let target = self.collection(id)?.target_replica;
271 Ok(self
272 .replicas
273 .values()
274 .filter(move |r| target.map_or(true, |t| t == r.id)))
275 }
276
277 fn add_collection(
283 &mut self,
284 id: GlobalId,
285 as_of: Antichain<Timestamp>,
286 shared: SharedCollectionState,
287 storage_dependencies: BTreeMap<GlobalId, ReadHold>,
288 compute_dependencies: BTreeMap<GlobalId, ReadHold>,
289 replica_input_read_holds: Vec<ReadHold>,
290 write_only: bool,
291 storage_sink: bool,
292 initial_as_of: Option<Antichain<Timestamp>>,
293 refresh_schedule: Option<RefreshSchedule>,
294 target_replica: Option<ReplicaId>,
295 ) {
296 let dependency_ids: Vec<GlobalId> = compute_dependencies
298 .keys()
299 .chain(storage_dependencies.keys())
300 .copied()
301 .collect();
302 let introspection = CollectionIntrospection::new(
303 id,
304 self.introspection_tx.clone(),
305 as_of.clone(),
306 storage_sink,
307 initial_as_of,
308 refresh_schedule,
309 dependency_ids,
310 );
311 let mut state = CollectionState::new(
312 id,
313 as_of.clone(),
314 shared,
315 storage_dependencies,
316 compute_dependencies,
317 Arc::clone(&self.read_hold_tx),
318 introspection,
319 );
320 state.target_replica = target_replica;
321 if write_only {
323 state.read_policy = None;
324 }
325
326 if let Some(previous) = self.collections.insert(id, state) {
327 panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
328 }
329
330 for replica in self.replicas.values_mut() {
332 if target_replica.is_some_and(|id| id != replica.id) {
333 continue;
334 }
335 replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
336 }
337 }
338
339 fn remove_collection(&mut self, id: GlobalId) {
340 for replica in self.replicas.values_mut() {
342 replica.remove_collection(id);
343 }
344
345 self.collections.remove(&id);
347 }
348
349 fn add_replica_state(
350 &mut self,
351 id: ReplicaId,
352 client: ReplicaClient,
353 config: ReplicaConfig,
354 epoch: u64,
355 ) -> Result<(), read_holds::ReadHoldIssuerHungUp> {
356 let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
357
358 let metrics = self.metrics.for_replica(id);
359 let mut replica = ReplicaState::new(
360 id,
361 client,
362 config,
363 metrics,
364 self.introspection_tx.clone(),
365 epoch,
366 );
367
368 let mut shutdown_input = None;
370 for (collection_id, collection) in &self.collections {
371 if (collection.log_collection && !log_ids.contains(collection_id))
374 || collection.target_replica.is_some_and(|rid| rid != id)
375 {
376 continue;
377 }
378
379 let as_of = if collection.log_collection {
380 Antichain::from_elem(Timestamp::MIN)
385 } else {
386 collection.read_frontier().to_owned()
387 };
388
389 let mut input_read_holds = Vec::with_capacity(collection.storage_dependencies.len());
397 let mut hung_up = Vec::new();
398 for hold in collection.storage_dependencies.values() {
399 match hold.try_clone() {
400 Ok(hold) => input_read_holds.push(hold),
401 Err(read_holds::ReadHoldIssuerHungUp(input_id)) => hung_up.push(input_id),
402 }
403 }
404 if !hung_up.is_empty() {
405 tracing::error!(
406 replica_id = %id,
407 %collection_id,
408 ?hung_up,
409 "giving up on adding replica collections: storage read hold issuers hung \
410 up, the process is shutting down",
411 );
412 shutdown_input = hung_up.into_iter().next();
413 break;
414 }
415
416 replica.add_collection(*collection_id, as_of, input_read_holds);
417 }
418
419 self.replicas.insert(id, replica);
420
421 match shutdown_input {
422 Some(input_id) => Err(read_holds::ReadHoldIssuerHungUp(input_id)),
423 None => Ok(()),
424 }
425 }
426
427 fn deliver_response(&self, response: ComputeControllerResponse) {
429 let _ = self.response_tx.send(response);
432 }
433
434 fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
436 let _ = self.introspection_tx.send((type_, updates));
439 }
440
441 fn replica_exists(&self, id: ReplicaId) -> bool {
443 self.replicas.contains_key(&id)
444 }
445
446 fn peeks_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = (Uuid, &PendingPeek)> {
448 self.peeks.iter().filter_map(move |(uuid, peek)| {
449 if peek.target_replica == Some(replica_id) {
450 Some((*uuid, peek))
451 } else {
452 None
453 }
454 })
455 }
456
457 fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
459 self.subscribes.keys().copied().filter(move |id| {
460 let collection = self.expect_collection(*id);
461 collection.target_replica == Some(replica_id)
462 })
463 }
464
465 fn update_frontier_introspection(&mut self) {
474 for collection in self.collections.values_mut() {
475 collection
476 .introspection
477 .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
478 }
479
480 for replica in self.replicas.values_mut() {
481 for collection in replica.collections.values_mut() {
482 collection
483 .introspection
484 .observe_frontier(&collection.write_frontier);
485 }
486 }
487 }
488
489 fn refresh_state_metrics(&self) {
498 let unscheduled_collections_count =
499 self.collections.values().filter(|c| !c.scheduled).count();
500 let connected_replica_count = self
501 .replicas
502 .values()
503 .filter(|r| r.client.is_connected())
504 .count();
505
506 self.metrics
507 .replica_count
508 .set(u64::cast_from(self.replicas.len()));
509 self.metrics
510 .collection_count
511 .set(u64::cast_from(self.collections.len()));
512 self.metrics
513 .collection_unscheduled_count
514 .set(u64::cast_from(unscheduled_collections_count));
515 self.metrics
516 .peek_count
517 .set(u64::cast_from(self.peeks.len()));
518 self.metrics
519 .subscribe_count
520 .set(u64::cast_from(self.subscribes.len()));
521 self.metrics
522 .copy_to_count
523 .set(u64::cast_from(self.copy_tos.len()));
524 self.metrics
525 .connected_replica_count
526 .set(u64::cast_from(connected_replica_count));
527 }
528
529 fn refresh_wallclock_lag(&mut self) {
548 let frontier_lag = |frontier: &Antichain<Timestamp>| match frontier.as_option() {
549 Some(ts) => (self.wallclock_lag)(ts.clone()),
550 None => Duration::ZERO,
551 };
552
553 let now_ms = (self.now)();
554 let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
555 let histogram_labels = match &self.workload_class {
556 Some(wc) => [("workload_class", wc.clone())].into(),
557 None => BTreeMap::new(),
558 };
559
560 let readable_storage_collections: BTreeSet<_> = self
563 .collections
564 .keys()
565 .filter_map(|id| {
566 let frontiers = self.storage_collections.collection_frontiers(*id).ok()?;
567 PartialOrder::less_than(&frontiers.read_capabilities, &frontiers.write_frontier)
568 .then_some(*id)
569 })
570 .collect();
571
572 for (id, collection) in &mut self.collections {
574 let write_frontier = collection.write_frontier();
575 let readable = if self.storage_collections.check_exists(*id).is_ok() {
576 readable_storage_collections.contains(id)
577 } else {
578 PartialOrder::less_than(&collection.read_frontier(), &write_frontier)
579 };
580
581 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
582 let bucket = if readable {
583 let lag = frontier_lag(&write_frontier);
584 let lag = lag.as_secs().next_power_of_two();
585 WallclockLag::Seconds(lag)
586 } else {
587 WallclockLag::Undefined
588 };
589
590 let key = (histogram_period, bucket, histogram_labels.clone());
591 *stash.entry(key).or_default() += Diff::ONE;
592 }
593 }
594
595 for replica in self.replicas.values_mut() {
597 for (id, collection) in &mut replica.collections {
598 let readable = readable_storage_collections.contains(id) || collection.hydrated();
603
604 let lag = if readable {
605 let lag = frontier_lag(&collection.write_frontier);
606 WallclockLag::Seconds(lag.as_secs())
607 } else {
608 WallclockLag::Undefined
609 };
610
611 if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
612 *wallclock_lag_max = (*wallclock_lag_max).max(lag);
613 }
614
615 if let Some(metrics) = &mut collection.metrics {
616 let secs = lag.unwrap_seconds_or(u64::MAX);
619 metrics.wallclock_lag.observe(secs);
620 };
621 }
622 }
623
624 self.maybe_record_wallclock_lag();
626 }
627
628 fn maybe_record_wallclock_lag(&mut self) {
636 if self.read_only {
637 return;
638 }
639
640 let duration_trunc = |datetime: DateTime<_>, interval| {
641 let td = TimeDelta::from_std(interval).ok()?;
642 datetime.duration_trunc(td).ok()
643 };
644
645 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
646 let now_dt = mz_ore::now::to_datetime((self.now)());
647 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
648 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
649 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
650 duration_trunc(now_dt, *default).unwrap()
651 });
652 if now_trunc <= self.wallclock_lag_last_recorded {
653 return;
654 }
655
656 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
657
658 let mut history_updates = Vec::new();
659 for (replica_id, replica) in &mut self.replicas {
660 for (collection_id, collection) in &mut replica.collections {
661 let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
662 continue;
663 };
664
665 let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
666 let row = Row::pack_slice(&[
667 Datum::String(&collection_id.to_string()),
668 Datum::String(&replica_id.to_string()),
669 max_lag.into_interval_datum(),
670 Datum::TimestampTz(now_ts),
671 ]);
672 history_updates.push((row, Diff::ONE));
673 }
674 }
675 if !history_updates.is_empty() {
676 self.deliver_introspection_updates(
677 IntrospectionType::WallclockLagHistory,
678 history_updates,
679 );
680 }
681
682 let mut histogram_updates = Vec::new();
683 let mut row_buf = Row::default();
684 for (collection_id, collection) in &mut self.collections {
685 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
686 continue;
687 };
688
689 for ((period, lag, labels), count) in std::mem::take(stash) {
690 let mut packer = row_buf.packer();
691 packer.extend([
692 Datum::TimestampTz(period.start),
693 Datum::TimestampTz(period.end),
694 Datum::String(&collection_id.to_string()),
695 lag.into_uint64_datum(),
696 ]);
697 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
698 packer.push_dict(labels);
699
700 histogram_updates.push((row_buf.clone(), count));
701 }
702 }
703 if !histogram_updates.is_empty() {
704 self.deliver_introspection_updates(
705 IntrospectionType::WallclockLagHistogram,
706 histogram_updates,
707 );
708 }
709
710 self.wallclock_lag_last_recorded = now_trunc;
711 }
712
713 #[mz_ore::instrument(level = "debug")]
719 pub fn collection_hydrated(&self, collection_id: GlobalId) -> Result<bool, CollectionMissing> {
720 let mut hosting_replicas = self.replicas_hosting(collection_id)?.peekable();
721 if hosting_replicas.peek().is_none() {
722 return Ok(true);
723 }
724 for replica_state in hosting_replicas {
725 let collection_state = replica_state
726 .collections
727 .get(&collection_id)
728 .expect("hosting replica must have per-replica collection state");
729
730 if collection_state.hydrated() {
731 return Ok(true);
732 }
733 }
734
735 Ok(false)
736 }
737
738 #[mz_ore::instrument(level = "debug")]
744 pub fn collections_hydrated_on_replicas(
745 &self,
746 target_replica_ids: Option<Vec<ReplicaId>>,
747 exclude_collections: &BTreeSet<GlobalId>,
748 ) -> Result<bool, HydrationCheckBadTarget> {
749 if self.replicas.is_empty() {
750 return Ok(true);
751 }
752 let mut all_hydrated = true;
753 let target_replicas: BTreeSet<ReplicaId> = self
754 .replicas
755 .keys()
756 .filter_map(|id| match target_replica_ids {
757 None => Some(id.clone()),
758 Some(ref ids) if ids.contains(id) => Some(id.clone()),
759 Some(_) => None,
760 })
761 .collect();
762 if let Some(targets) = target_replica_ids {
763 if target_replicas.is_empty() {
764 return Err(HydrationCheckBadTarget(targets));
765 }
766 }
767
768 for (id, _collection) in self.collections_iter() {
769 if id.is_transient() || exclude_collections.contains(&id) {
770 continue;
771 }
772
773 let mut collection_hydrated = false;
774 for replica_state in self.replicas_hosting(id).expect("collection must exist") {
777 if !target_replicas.contains(&replica_state.id) {
778 continue;
779 }
780 let collection_state = replica_state
781 .collections
782 .get(&id)
783 .expect("hosting replica must have per-replica collection state");
784
785 if collection_state.hydrated() {
786 collection_hydrated = true;
787 break;
788 }
789 }
790
791 if !collection_hydrated {
792 tracing::info!("collection {id} is not hydrated on any replica");
793 all_hydrated = false;
794 }
797 }
798
799 Ok(all_hydrated)
800 }
801
802 fn cleanup_collections(&mut self) {
818 let to_remove: Vec<_> = self
819 .collections_iter()
820 .filter(|(id, collection)| {
821 collection.dropped
822 && collection.shared.lock_read_capabilities(|c| c.is_empty())
823 && self
824 .replicas
825 .values()
826 .all(|r| r.collection_frontiers_empty(*id))
827 })
828 .map(|(id, _collection)| id)
829 .collect();
830
831 for id in to_remove {
832 self.remove_collection(id);
833 }
834 }
835
836 #[mz_ore::instrument(level = "debug")]
840 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
841 let Self {
848 build_info: _,
849 storage_collections: _,
850 peek_stash_persist_location: _,
851 initialized,
852 read_only,
853 workload_class,
854 replicas,
855 replica_dyncfg_overrides: _,
856 collections,
857 log_sources: _,
858 peeks,
859 subscribes,
860 copy_tos,
861 history: _,
862 command_rx: _,
863 response_tx: _,
864 introspection_tx: _,
865 metrics: _,
866 dyncfg: _,
867 now: _,
868 wallclock_lag: _,
869 wallclock_lag_last_recorded,
870 read_hold_tx: _,
871 replica_tx: _,
872 replica_rx: _,
873 } = self;
874
875 let replicas: BTreeMap<_, _> = replicas
876 .iter()
877 .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
878 .collect::<Result<_, anyhow::Error>>()?;
879 let collections: BTreeMap<_, _> = collections
880 .iter()
881 .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
882 .collect();
883 let peeks: BTreeMap<_, _> = peeks
884 .iter()
885 .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
886 .collect();
887 let subscribes: BTreeMap<_, _> = subscribes
888 .iter()
889 .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
890 .collect();
891 let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
892 let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
893
894 Ok(serde_json::json!({
895 "initialized": initialized,
896 "read_only": read_only,
897 "workload_class": workload_class,
898 "replicas": replicas,
899 "collections": collections,
900 "peeks": peeks,
901 "subscribes": subscribes,
902 "copy_tos": copy_tos,
903 "wallclock_lag_last_recorded": wallclock_lag_last_recorded,
904 }))
905 }
906
907 pub(super) fn collection_write_frontier(
909 &self,
910 id: GlobalId,
911 ) -> Result<Antichain<Timestamp>, CollectionMissing> {
912 Ok(self.collection(id)?.write_frontier())
913 }
914}
915
916impl Instance {
917 pub(super) fn new(
918 build_info: &'static BuildInfo,
919 storage: StorageCollections,
920 peek_stash_persist_location: PersistLocation,
921 arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState)>,
922 metrics: InstanceMetrics,
923 now: NowFn,
924 wallclock_lag: WallclockLagFn<Timestamp>,
925 dyncfg: Arc<ConfigSet>,
926 command_rx: mpsc::UnboundedReceiver<Command>,
927 response_tx: mpsc::UnboundedSender<ComputeControllerResponse>,
928 read_hold_tx: read_holds::ChangeTx,
929 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
930 read_only: bool,
931 ) -> Self {
932 let mut collections = BTreeMap::new();
933 let mut log_sources = BTreeMap::new();
934 for (log, id, shared) in arranged_logs {
935 let collection = CollectionState::new_log_collection(
936 id,
937 shared,
938 Arc::clone(&read_hold_tx),
939 introspection_tx.clone(),
940 );
941 collections.insert(id, collection);
942 log_sources.insert(log, id);
943 }
944
945 let history = ComputeCommandHistory::new(metrics.for_history());
946
947 let send_count = metrics.response_send_count.clone();
948 let recv_count = metrics.response_recv_count.clone();
949 let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
950
951 let now_dt = mz_ore::now::to_datetime(now());
952
953 Self {
954 build_info,
955 storage_collections: storage,
956 peek_stash_persist_location,
957 initialized: false,
958 read_only,
959 workload_class: None,
960 replicas: Default::default(),
961 replica_dyncfg_overrides: Default::default(),
962 collections,
963 log_sources,
964 peeks: Default::default(),
965 subscribes: Default::default(),
966 copy_tos: Default::default(),
967 history,
968 command_rx,
969 response_tx,
970 introspection_tx,
971 metrics,
972 dyncfg,
973 now,
974 wallclock_lag,
975 wallclock_lag_last_recorded: now_dt,
976 read_hold_tx,
977 replica_tx,
978 replica_rx,
979 }
980 }
981
982 pub(super) async fn run(mut self) {
983 self.send(ComputeCommand::Hello {
984 nonce: Uuid::default(),
987 });
988
989 let instance_config = InstanceConfig {
990 peek_stash_persist_location: self.peek_stash_persist_location.clone(),
991 logging: Default::default(),
994 expiration_offset: Default::default(),
995 arrangement_dictionary_compression: Default::default(),
996 };
997
998 self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
999
1000 loop {
1001 tokio::select! {
1002 command = self.command_rx.recv() => match command {
1003 Some(cmd) => cmd(&mut self),
1004 None => break,
1005 },
1006 response = self.replica_rx.recv() => match response {
1007 Some(response) => self.handle_response(response),
1008 None => unreachable!("self owns a sender side of the channel"),
1009 }
1010 }
1011 }
1012 }
1013
1014 #[mz_ore::instrument(level = "debug")]
1016 pub fn update_configuration(&mut self, config_params: ComputeParameters) {
1017 if let Some(workload_class) = &config_params.workload_class {
1018 self.workload_class = workload_class.clone();
1019 }
1020
1021 let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
1022 self.send(command);
1023 }
1024
1025 #[mz_ore::instrument(level = "debug")]
1030 pub fn initialization_complete(&mut self) {
1031 if !self.initialized {
1033 self.send(ComputeCommand::InitializationComplete);
1034 self.initialized = true;
1035 }
1036 }
1037
1038 #[mz_ore::instrument(level = "debug")]
1042 pub fn allow_writes(&mut self, collection_id: GlobalId) -> Result<(), CollectionMissing> {
1043 let collection = self.collection_mut(collection_id)?;
1044
1045 if !collection.read_only {
1047 return Ok(());
1048 }
1049
1050 let as_of = collection.read_frontier();
1052
1053 if as_of.is_empty() {
1056 return Ok(());
1057 }
1058
1059 collection.read_only = false;
1060 self.send(ComputeCommand::AllowWrites(collection_id));
1061
1062 Ok(())
1063 }
1064
1065 #[mz_ore::instrument(level = "debug")]
1075 pub fn shutdown(&mut self) {
1076 let (_tx, rx) = mpsc::unbounded_channel();
1078 self.command_rx = rx;
1079
1080 let stray_replicas: Vec<_> = self.replicas.keys().collect();
1081 soft_assert_or_log!(
1082 stray_replicas.is_empty(),
1083 "dropped instance still has provisioned replicas: {stray_replicas:?}",
1084 );
1085 }
1086
1087 fn initiate_shutdown(&mut self) {
1093 let (_tx, rx) = mpsc::unbounded_channel();
1096 self.command_rx = rx;
1097 }
1098
1099 #[mz_ore::instrument(level = "debug")]
1101 fn send(&mut self, cmd: ComputeCommand) {
1102 self.history.push(cmd.clone());
1107
1108 let target_replica = self.target_replica(&cmd);
1109
1110 let overrides = &self.replica_dyncfg_overrides;
1113
1114 if let Some(rid) = target_replica {
1115 if let Some(replica) = self.replicas.get_mut(&rid) {
1116 let cmd = Self::specialize_command_for_replica(cmd, rid, overrides);
1117 let _ = replica.client.send(cmd);
1118 }
1119 } else {
1120 for (rid, replica) in self.replicas.iter_mut() {
1121 let cmd = Self::specialize_command_for_replica(cmd.clone(), *rid, overrides);
1122 let _ = replica.client.send(cmd);
1123 }
1124 }
1125 }
1126
1127 fn specialize_command_for_replica(
1131 mut cmd: ComputeCommand,
1132 replica_id: ReplicaId,
1133 overrides: &BTreeMap<ReplicaId, ConfigUpdates>,
1134 ) -> ComputeCommand {
1135 if let ComputeCommand::UpdateConfiguration(params) = &mut cmd
1136 && let Some(over) = overrides.get(&replica_id)
1137 && !over.updates.is_empty()
1138 {
1139 params.dyncfg_updates.extend(over.clone());
1140 }
1141 cmd
1142 }
1143
1144 pub(super) fn update_replica_dyncfg_overrides(
1148 &mut self,
1149 overrides: BTreeMap<ReplicaId, ConfigUpdates>,
1150 ) {
1151 self.replica_dyncfg_overrides = overrides;
1152 }
1153
1154 fn target_replica(&self, cmd: &ComputeCommand) -> Option<ReplicaId> {
1162 match &cmd {
1163 ComputeCommand::Schedule(id)
1164 | ComputeCommand::AllowWrites(id)
1165 | ComputeCommand::AllowCompaction { id, .. } => {
1166 self.expect_collection(*id).target_replica
1167 }
1168 ComputeCommand::CreateDataflow(desc) => {
1169 let mut target_replica = None;
1170 for id in desc.export_ids() {
1171 if let Some(replica) = self.expect_collection(id).target_replica {
1172 if target_replica.is_some() {
1173 assert_eq!(target_replica, Some(replica));
1174 }
1175 target_replica = Some(replica);
1176 }
1177 }
1178 target_replica
1179 }
1180 ComputeCommand::Peek(_)
1182 | ComputeCommand::Hello { .. }
1183 | ComputeCommand::CreateInstance(_)
1184 | ComputeCommand::InitializationComplete
1185 | ComputeCommand::UpdateConfiguration(_)
1186 | ComputeCommand::CancelPeek { .. } => None,
1187 }
1188 }
1189
1190 #[mz_ore::instrument(level = "debug")]
1192 pub fn add_replica(
1193 &mut self,
1194 id: ReplicaId,
1195 mut config: ReplicaConfig,
1196 epoch: Option<u64>,
1197 ) -> Result<(), ReplicaExists> {
1198 if self.replica_exists(id) {
1199 return Err(ReplicaExists(id));
1200 }
1201
1202 config.logging.index_logs = self.log_sources.clone();
1203
1204 let epoch = epoch.unwrap_or(1);
1205 let metrics = self.metrics.for_replica(id);
1206 let client = ReplicaClient::spawn(
1207 id,
1208 self.build_info,
1209 config.clone(),
1210 epoch,
1211 metrics.clone(),
1212 Arc::clone(&self.dyncfg),
1213 self.replica_tx.clone(),
1214 );
1215
1216 self.history.reduce();
1218
1219 self.history.update_source_uppers(&self.storage_collections);
1221
1222 for command in self.history.iter() {
1224 if let Some(target_replica) = self.target_replica(command)
1226 && target_replica != id
1227 {
1228 continue;
1229 }
1230
1231 let command = Self::specialize_command_for_replica(
1233 command.clone(),
1234 id,
1235 &self.replica_dyncfg_overrides,
1236 );
1237 if client.send(command).is_err() {
1238 tracing::warn!("Replica {:?} connection terminated during hydration", id);
1241 break;
1242 }
1243 }
1244
1245 if self.add_replica_state(id, client, config, epoch).is_err() {
1247 self.initiate_shutdown();
1253 }
1254
1255 Ok(())
1256 }
1257
1258 #[mz_ore::instrument(level = "debug")]
1260 pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1261 let replica = self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1262
1263 for (collection_id, replica_collection) in &replica.collections {
1271 let collection = self.collections.get(collection_id);
1272 for replica_hold in &replica_collection.input_read_holds {
1273 let input_id = replica_hold.id();
1274 let global_hold = collection.and_then(|c| c.storage_dependencies.get(&input_id));
1275 let unprotected = global_hold
1276 .is_none_or(|h| PartialOrder::less_than(replica_hold.since(), h.since()));
1277 if unprotected {
1278 tracing::warn!(
1279 replica_id = %id,
1280 %collection_id,
1281 %input_id,
1282 replica_hold_since = ?replica_hold.since(),
1283 global_hold_since = ?global_hold.map(|h| h.since()),
1284 "dropping per-replica read hold without equivalent global read hold",
1285 );
1286 }
1287 }
1288 }
1289 drop(replica);
1290
1291 let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1295 for subscribe_id in to_drop {
1296 let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1297 let response = ComputeControllerResponse::SubscribeResponse(
1298 subscribe_id,
1299 SubscribeBatch {
1300 lower: subscribe.frontier.clone(),
1301 upper: subscribe.frontier,
1302 updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1303 },
1304 );
1305 self.deliver_response(response);
1306 }
1307
1308 let mut peek_responses = Vec::new();
1313 let mut to_drop = Vec::new();
1314 for (uuid, peek) in self.peeks_targeting(id) {
1315 peek_responses.push(ComputeControllerResponse::PeekNotification(
1316 uuid,
1317 PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1318 peek.otel_ctx.clone(),
1319 ));
1320 to_drop.push(uuid);
1321 }
1322 for response in peek_responses {
1323 self.deliver_response(response);
1324 }
1325 for uuid in to_drop {
1326 let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1327 self.finish_peek(uuid, response);
1328 }
1329
1330 self.forward_implied_capabilities();
1333
1334 Ok(())
1335 }
1336
1337 fn rehydrate_replica(&mut self, id: ReplicaId) {
1343 let config = self.replicas[&id].config.clone();
1344 let epoch = self.replicas[&id].epoch + 1;
1345
1346 self.remove_replica(id).expect("replica must exist");
1347 let result = self.add_replica(id, config, Some(epoch));
1348
1349 match result {
1350 Ok(()) => (),
1351 Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1352 }
1353 }
1354
1355 fn rehydrate_failed_replicas(&mut self) {
1357 let replicas = self.replicas.iter();
1358 let failed_replicas: Vec<_> = replicas
1359 .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1360 .collect();
1361
1362 for replica_id in failed_replicas {
1363 self.rehydrate_replica(replica_id);
1364 }
1365 }
1366
1367 #[mz_ore::instrument(level = "debug")]
1372 pub fn create_dataflow(
1373 &mut self,
1374 dataflow: DataflowDescription<mz_compute_types::plan::Plan, ()>,
1375 import_read_holds: Vec<ReadHold>,
1376 mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState>,
1377 target_replica: Option<ReplicaId>,
1378 ) -> Result<(), DataflowCreationError> {
1379 use DataflowCreationError::*;
1380
1381 if let Some(replica_id) = target_replica {
1385 if !self.replica_exists(replica_id) {
1386 return Err(ReplicaMissing(replica_id));
1387 }
1388 }
1389
1390 let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1392 if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1393 return Err(EmptyAsOfForSubscribe);
1394 }
1395 if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1396 return Err(EmptyAsOfForCopyTo);
1397 }
1398
1399 let mut storage_dependencies = BTreeMap::new();
1401 let mut compute_dependencies = BTreeMap::new();
1402
1403 let mut replica_input_read_holds = Vec::new();
1408
1409 let mut import_read_holds: BTreeMap<_, _> =
1410 import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1411
1412 for &id in dataflow.source_imports.keys() {
1413 let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1414 replica_input_read_holds.push(read_hold.clone());
1415
1416 read_hold
1417 .try_downgrade(as_of.clone())
1418 .map_err(|_| ReadHoldInsufficient(id))?;
1419 storage_dependencies.insert(id, read_hold);
1420 }
1421
1422 for &id in dataflow.index_imports.keys() {
1423 let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1424 read_hold
1425 .try_downgrade(as_of.clone())
1426 .map_err(|_| ReadHoldInsufficient(id))?;
1427 compute_dependencies.insert(id, read_hold);
1428 }
1429
1430 if as_of.is_empty() {
1433 replica_input_read_holds = Default::default();
1434 }
1435
1436 for export_id in dataflow.export_ids() {
1438 let shared = shared_collection_state
1439 .remove(&export_id)
1440 .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1441 let write_only = dataflow.sink_exports.contains_key(&export_id);
1442 let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1443
1444 self.add_collection(
1445 export_id,
1446 as_of.clone(),
1447 shared,
1448 storage_dependencies.clone(),
1449 compute_dependencies.clone(),
1450 replica_input_read_holds.clone(),
1451 write_only,
1452 storage_sink,
1453 dataflow.initial_storage_as_of.clone(),
1454 dataflow.refresh_schedule.clone(),
1455 target_replica,
1456 );
1457
1458 if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1461 self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1462 }
1463 }
1464
1465 for subscribe_id in dataflow.subscribe_ids() {
1467 self.subscribes
1468 .insert(subscribe_id, ActiveSubscribe::default());
1469 }
1470
1471 for copy_to_id in dataflow.copy_to_ids() {
1473 self.copy_tos.insert(copy_to_id);
1474 }
1475
1476 let mut source_imports = BTreeMap::new();
1479 for (id, import) in dataflow.source_imports {
1480 let frontiers = self
1481 .storage_collections
1482 .collection_frontiers(id)
1483 .expect("collection exists");
1484
1485 let collection_metadata = self
1486 .storage_collections
1487 .collection_metadata(id)
1488 .expect("we have a read hold on this collection");
1489
1490 let desc = SourceInstanceDesc {
1491 storage_metadata: collection_metadata.clone(),
1492 arguments: import.desc.arguments,
1493 typ: import.desc.typ.clone(),
1494 };
1495 source_imports.insert(
1496 id,
1497 mz_compute_types::dataflows::SourceImport {
1498 desc,
1499 monotonic: import.monotonic,
1500 with_snapshot: import.with_snapshot,
1501 upper: frontiers.write_frontier,
1502 },
1503 );
1504 }
1505
1506 let mut sink_exports = BTreeMap::new();
1507 for (id, se) in dataflow.sink_exports {
1508 let connection = match se.connection {
1509 ComputeSinkConnection::MaterializedView(conn) => {
1510 let metadata = self
1511 .storage_collections
1512 .collection_metadata(id)
1513 .map_err(|_| CollectionMissing(id))?
1514 .clone();
1515 let conn = MaterializedViewSinkConnection {
1516 value_desc: conn.value_desc,
1517 storage_metadata: metadata,
1518 };
1519 ComputeSinkConnection::MaterializedView(conn)
1520 }
1521 ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1522 ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1523 ComputeSinkConnection::CopyToS3Oneshot(conn)
1524 }
1525 };
1526 let desc = ComputeSinkDesc {
1527 from: se.from,
1528 from_desc: se.from_desc,
1529 connection,
1530 with_snapshot: se.with_snapshot,
1531 up_to: se.up_to,
1532 non_null_assertions: se.non_null_assertions,
1533 refresh_schedule: se.refresh_schedule,
1534 };
1535 sink_exports.insert(id, desc);
1536 }
1537
1538 let objects_to_build = dataflow
1540 .objects_to_build
1541 .into_iter()
1542 .map(|object| BuildDesc {
1543 id: object.id,
1544 plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1545 })
1546 .collect();
1547
1548 let augmented_dataflow = DataflowDescription {
1549 source_imports,
1550 sink_exports,
1551 objects_to_build,
1552 index_imports: dataflow.index_imports,
1554 index_exports: dataflow.index_exports,
1555 as_of: dataflow.as_of.clone(),
1556 until: dataflow.until,
1557 initial_storage_as_of: dataflow.initial_storage_as_of,
1558 refresh_schedule: dataflow.refresh_schedule,
1559 debug_name: dataflow.debug_name,
1560 time_dependence: dataflow.time_dependence,
1561 };
1562
1563 if augmented_dataflow.is_transient() {
1564 tracing::debug!(
1565 name = %augmented_dataflow.debug_name,
1566 import_ids = %augmented_dataflow.display_import_ids(),
1567 export_ids = %augmented_dataflow.display_export_ids(),
1568 as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1569 until = ?augmented_dataflow.until.elements(),
1570 "creating dataflow",
1571 );
1572 } else {
1573 tracing::info!(
1574 name = %augmented_dataflow.debug_name,
1575 import_ids = %augmented_dataflow.display_import_ids(),
1576 export_ids = %augmented_dataflow.display_export_ids(),
1577 as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1578 until = ?augmented_dataflow.until.elements(),
1579 "creating dataflow",
1580 );
1581 }
1582
1583 if as_of.is_empty() {
1586 tracing::info!(
1587 name = %augmented_dataflow.debug_name,
1588 "not sending `CreateDataflow`, because of empty `as_of`",
1589 );
1590 } else {
1591 let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1592 self.send(ComputeCommand::CreateDataflow(Box::new(augmented_dataflow)));
1593
1594 for id in collections {
1595 self.maybe_schedule_collection(id);
1596 }
1597 }
1598
1599 Ok(())
1600 }
1601
1602 fn maybe_schedule_collection(&mut self, id: GlobalId) {
1608 let collection = self.expect_collection(id);
1609
1610 if collection.scheduled {
1612 return;
1613 }
1614
1615 let as_of = collection.read_frontier();
1616
1617 if as_of.is_empty() {
1620 return;
1621 }
1622
1623 let ready = if id.is_transient() {
1624 true
1630 } else {
1631 let not_self_dep = |x: &GlobalId| *x != id;
1637
1638 let mut deps_scheduled = true;
1641
1642 let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1647 let mut compute_frontiers = Vec::new();
1648 for id in compute_deps {
1649 let dep = &self.expect_collection(id);
1650 deps_scheduled &= dep.scheduled;
1651 compute_frontiers.push(dep.write_frontier());
1652 }
1653
1654 let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1655 let storage_frontiers = self
1656 .storage_collections
1657 .collections_frontiers(storage_deps.collect())
1658 .expect("must exist");
1659 let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1660
1661 let mut frontiers = compute_frontiers.into_iter().chain(storage_frontiers);
1662 let frontiers_ready =
1663 frontiers.all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1664
1665 deps_scheduled && frontiers_ready
1666 };
1667
1668 if ready {
1669 self.send(ComputeCommand::Schedule(id));
1670 let collection = self.expect_collection_mut(id);
1671 collection.scheduled = true;
1672 }
1673 }
1674
1675 fn schedule_collections(&mut self) {
1677 let ids: Vec<_> = self.collections.keys().copied().collect();
1678 for id in ids {
1679 self.maybe_schedule_collection(id);
1680 }
1681 }
1682
1683 #[mz_ore::instrument(level = "debug")]
1686 pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1687 for id in &ids {
1688 let collection = self.collection_mut(*id)?;
1689
1690 collection.dropped = true;
1692
1693 collection.implied_read_hold.release();
1696 collection.warmup_read_hold.release();
1697
1698 self.subscribes.remove(id);
1701 self.copy_tos.remove(id);
1704 }
1705
1706 Ok(())
1707 }
1708
1709 #[mz_ore::instrument(level = "debug")]
1713 pub fn peek(
1714 &mut self,
1715 peek_target: PeekTarget,
1716 literal_constraints: Option<Vec<Row>>,
1717 uuid: Uuid,
1718 timestamp: Timestamp,
1719 result_desc: RelationDesc,
1720 finishing: RowSetFinishing,
1721 map_filter_project: mz_expr::SafeMfpPlan,
1722 mut read_hold: ReadHold,
1723 target_replica: Option<ReplicaId>,
1724 peek_response_tx: oneshot::Sender<PeekResponse>,
1725 ) -> Result<(), PeekError> {
1726 use PeekError::*;
1727
1728 let target_id = peek_target.id();
1729
1730 if read_hold.id() != target_id {
1732 return Err(ReadHoldIdMismatch(read_hold.id()));
1733 }
1734 read_hold
1735 .try_downgrade(Antichain::from_elem(timestamp.clone()))
1736 .map_err(|_| ReadHoldInsufficient(target_id))?;
1737
1738 if let Some(target) = target_replica {
1739 if !self.replica_exists(target) {
1740 return Err(ReplicaMissing(target));
1741 }
1742 }
1743
1744 let otel_ctx = OpenTelemetryContext::obtain();
1745
1746 self.peeks.insert(
1747 uuid,
1748 PendingPeek {
1749 target_replica,
1750 otel_ctx: otel_ctx.clone(),
1752 requested_at: Instant::now(),
1753 read_hold,
1754 peek_response_tx,
1755 limit: finishing.limit.map(usize::cast_from),
1756 offset: finishing.offset,
1757 },
1758 );
1759
1760 let peek = Peek {
1761 literal_constraints,
1762 uuid,
1763 timestamp,
1764 finishing,
1765 map_filter_project,
1766 otel_ctx,
1769 target: peek_target,
1770 result_desc,
1771 };
1772 self.send(ComputeCommand::Peek(Box::new(peek)));
1773
1774 Ok(())
1775 }
1776
1777 #[mz_ore::instrument(level = "debug")]
1779 pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1780 let Some(peek) = self.peeks.get_mut(&uuid) else {
1781 tracing::warn!("did not find pending peek for {uuid}");
1782 return;
1783 };
1784
1785 let duration = peek.requested_at.elapsed();
1786 self.metrics
1787 .observe_peek_response(&PeekResponse::Canceled, duration);
1788
1789 let otel_ctx = peek.otel_ctx.clone();
1791 otel_ctx.attach_as_parent();
1792
1793 self.deliver_response(ComputeControllerResponse::PeekNotification(
1794 uuid,
1795 PeekNotification::Canceled,
1796 otel_ctx,
1797 ));
1798
1799 self.finish_peek(uuid, reason);
1802 }
1803
1804 #[mz_ore::instrument(level = "debug")]
1816 pub fn set_read_policy(
1817 &mut self,
1818 policies: Vec<(GlobalId, ReadPolicy)>,
1819 ) -> Result<(), ReadPolicyError> {
1820 for (id, _policy) in &policies {
1823 let collection = self.collection(*id)?;
1824 if collection.read_policy.is_none() {
1825 return Err(ReadPolicyError::WriteOnlyCollection(*id));
1826 }
1827 }
1828
1829 for (id, new_policy) in policies {
1830 let collection = self.expect_collection_mut(id);
1831 let new_since = new_policy.frontier(collection.write_frontier().borrow());
1832 let _ = collection.implied_read_hold.try_downgrade(new_since);
1833 collection.read_policy = Some(new_policy);
1834 }
1835
1836 Ok(())
1837 }
1838
1839 #[mz_ore::instrument(level = "debug")]
1847 fn maybe_update_global_write_frontier(
1848 &mut self,
1849 id: GlobalId,
1850 new_frontier: Antichain<Timestamp>,
1851 ) {
1852 let collection = self.expect_collection_mut(id);
1853
1854 let advanced = collection.shared.lock_write_frontier(|f| {
1855 let advanced = PartialOrder::less_than(f, &new_frontier);
1856 if advanced {
1857 f.clone_from(&new_frontier);
1858 }
1859 advanced
1860 });
1861
1862 if !advanced {
1863 return;
1864 }
1865
1866 let new_since = match &collection.read_policy {
1868 Some(read_policy) => {
1869 read_policy.frontier(new_frontier.borrow())
1872 }
1873 None => {
1874 Antichain::from_iter(
1883 new_frontier
1884 .iter()
1885 .map(|t| t.step_back().unwrap_or(Timestamp::MIN)),
1886 )
1887 }
1888 };
1889 let _ = collection.implied_read_hold.try_downgrade(new_since);
1890
1891 self.deliver_response(ComputeControllerResponse::FrontierUpper {
1893 id,
1894 upper: new_frontier,
1895 });
1896 }
1897
1898 pub(super) fn apply_read_hold_change(
1900 &mut self,
1901 id: GlobalId,
1902 mut update: ChangeBatch<Timestamp>,
1903 ) {
1904 let Some(collection) = self.collections.get_mut(&id) else {
1905 soft_panic_or_log!(
1906 "read hold change for absent collection (id={id}, changes={update:?})"
1907 );
1908 return;
1909 };
1910
1911 let new_since = collection.shared.lock_read_capabilities(|caps| {
1912 let read_frontier = caps.frontier();
1915 for (time, diff) in update.iter() {
1916 let count = caps.count_for(time) + diff;
1917 assert!(
1918 count >= 0,
1919 "invalid read capabilities update: negative capability \
1920 (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1921 );
1922 assert!(
1923 count == 0 || read_frontier.less_equal(time),
1924 "invalid read capabilities update: frontier regression \
1925 (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1926 );
1927 }
1928
1929 let changes = caps.update_iter(update.drain());
1932
1933 let changed = changes.count() > 0;
1934 changed.then(|| caps.frontier().to_owned())
1935 });
1936
1937 let Some(new_since) = new_since else {
1938 return; };
1940
1941 for read_hold in collection.compute_dependencies.values_mut() {
1943 read_hold
1944 .try_downgrade(new_since.clone())
1945 .expect("frontiers don't regress");
1946 }
1947 for read_hold in collection.storage_dependencies.values_mut() {
1948 read_hold
1949 .try_downgrade(new_since.clone())
1950 .expect("frontiers don't regress");
1951 }
1952
1953 self.send(ComputeCommand::AllowCompaction {
1955 id,
1956 frontier: new_since,
1957 });
1958 }
1959
1960 fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1969 let Some(peek) = self.peeks.remove(&uuid) else {
1970 return;
1971 };
1972
1973 let _ = peek.peek_response_tx.send(response);
1975
1976 self.send(ComputeCommand::CancelPeek { uuid });
1979
1980 drop(peek.read_hold);
1981 }
1982
1983 fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse) {
1986 if self
1988 .replicas
1989 .get(&replica_id)
1990 .filter(|replica| replica.epoch == epoch)
1991 .is_none()
1992 {
1993 return;
1994 }
1995
1996 match response {
1999 ComputeResponse::Frontiers(id, frontiers) => {
2000 self.handle_frontiers_response(id, frontiers, replica_id);
2001 }
2002 ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
2003 self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
2004 }
2005 ComputeResponse::CopyToResponse(id, response) => {
2006 self.handle_copy_to_response(id, response, replica_id);
2007 }
2008 ComputeResponse::SubscribeResponse(id, response) => {
2009 self.handle_subscribe_response(id, response, replica_id);
2010 }
2011 ComputeResponse::Status(response) => {
2012 self.handle_status_response(response, replica_id);
2013 }
2014 }
2015 }
2016
2017 fn handle_frontiers_response(
2020 &mut self,
2021 id: GlobalId,
2022 frontiers: FrontiersResponse,
2023 replica_id: ReplicaId,
2024 ) {
2025 if !self.collections.contains_key(&id) {
2026 soft_panic_or_log!(
2027 "frontiers update for an unknown collection \
2028 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
2029 );
2030 return;
2031 }
2032 let Some(replica) = self.replicas.get_mut(&replica_id) else {
2033 soft_panic_or_log!(
2034 "frontiers update for an unknown replica \
2035 (replica_id={replica_id}, frontiers={frontiers:?})"
2036 );
2037 return;
2038 };
2039 let Some(replica_collection) = replica.collections.get_mut(&id) else {
2040 soft_panic_or_log!(
2041 "frontiers update for an unknown replica collection \
2042 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
2043 );
2044 return;
2045 };
2046
2047 if let Some(new_frontier) = frontiers.input_frontier {
2048 replica_collection.update_input_frontier(new_frontier.clone());
2049 }
2050 if let Some(new_frontier) = frontiers.output_frontier {
2051 replica_collection.update_output_frontier(new_frontier.clone());
2052 }
2053 if let Some(new_frontier) = frontiers.write_frontier {
2054 replica_collection.update_write_frontier(new_frontier.clone());
2055 self.maybe_update_global_write_frontier(id, new_frontier);
2056 }
2057 }
2058
2059 #[mz_ore::instrument(level = "debug")]
2060 fn handle_peek_response(
2061 &mut self,
2062 uuid: Uuid,
2063 response: PeekResponse,
2064 otel_ctx: OpenTelemetryContext,
2065 replica_id: ReplicaId,
2066 ) {
2067 otel_ctx.attach_as_parent();
2068
2069 let Some(peek) = self.peeks.get(&uuid) else {
2072 return;
2073 };
2074
2075 let target_replica = peek.target_replica.unwrap_or(replica_id);
2077 if target_replica != replica_id {
2078 return;
2079 }
2080
2081 let duration = peek.requested_at.elapsed();
2082 self.metrics.observe_peek_response(&response, duration);
2083
2084 let notification = PeekNotification::new(&response, peek.offset, peek.limit);
2085 self.deliver_response(ComputeControllerResponse::PeekNotification(
2088 uuid,
2089 notification,
2090 otel_ctx,
2091 ));
2092
2093 self.finish_peek(uuid, response)
2094 }
2095
2096 fn handle_copy_to_response(
2097 &mut self,
2098 sink_id: GlobalId,
2099 response: CopyToResponse,
2100 replica_id: ReplicaId,
2101 ) {
2102 if !self.collections.contains_key(&sink_id) {
2103 soft_panic_or_log!(
2104 "received response for an unknown copy-to \
2105 (sink_id={sink_id}, replica_id={replica_id})",
2106 );
2107 return;
2108 }
2109 let Some(replica) = self.replicas.get_mut(&replica_id) else {
2110 soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
2111 return;
2112 };
2113 let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
2114 soft_panic_or_log!(
2115 "copy-to response for an unknown replica collection \
2116 (sink_id={sink_id}, replica_id={replica_id})"
2117 );
2118 return;
2119 };
2120
2121 replica_collection.update_write_frontier(Antichain::new());
2125 replica_collection.update_input_frontier(Antichain::new());
2126 replica_collection.update_output_frontier(Antichain::new());
2127
2128 if !self.copy_tos.remove(&sink_id) {
2131 return;
2132 }
2133
2134 let result = match response {
2135 CopyToResponse::RowCount(count) => Ok(count),
2136 CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
2137 CopyToResponse::Dropped => {
2142 tracing::error!(
2143 %sink_id, %replica_id,
2144 "received `Dropped` response for a tracked copy to",
2145 );
2146 return;
2147 }
2148 };
2149
2150 self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2151 }
2152
2153 fn handle_subscribe_response(
2154 &mut self,
2155 subscribe_id: GlobalId,
2156 response: SubscribeResponse,
2157 replica_id: ReplicaId,
2158 ) {
2159 if !self.collections.contains_key(&subscribe_id) {
2160 soft_panic_or_log!(
2161 "received response for an unknown subscribe \
2162 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2163 );
2164 return;
2165 }
2166 let Some(replica) = self.replicas.get_mut(&replica_id) else {
2167 soft_panic_or_log!(
2168 "subscribe response for an unknown replica (replica_id={replica_id})"
2169 );
2170 return;
2171 };
2172 let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2173 soft_panic_or_log!(
2174 "subscribe response for an unknown replica collection \
2175 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2176 );
2177 return;
2178 };
2179
2180 let write_frontier = match &response {
2184 SubscribeResponse::Batch(batch) => batch.upper.clone(),
2185 SubscribeResponse::DroppedAt(_) => Antichain::new(),
2186 };
2187
2188 replica_collection.update_write_frontier(write_frontier.clone());
2192 replica_collection.update_input_frontier(write_frontier.clone());
2193 replica_collection.update_output_frontier(write_frontier.clone());
2194
2195 let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2197 return;
2198 };
2199
2200 self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2206
2207 match response {
2208 SubscribeResponse::Batch(batch) => {
2209 let upper = batch.upper;
2210 let mut updates = batch.updates;
2211
2212 if PartialOrder::less_than(&subscribe.frontier, &upper) {
2215 let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2216
2217 if upper.is_empty() {
2218 self.subscribes.remove(&subscribe_id);
2220 } else {
2221 self.subscribes.insert(subscribe_id, subscribe);
2223 }
2224
2225 if let Ok(updates) = updates.as_mut() {
2226 updates.retain_mut(|updates| {
2227 let offset = updates.times().partition_point(|t| {
2228 !lower.less_equal(t)
2231 });
2232 let (_, past_lower) = std::mem::take(updates).split_at(offset);
2233 *updates = past_lower;
2234 updates.len() > 0
2235 });
2236 }
2237 self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2238 subscribe_id,
2239 SubscribeBatch {
2240 lower,
2241 upper,
2242 updates,
2243 },
2244 ));
2245 }
2246 }
2247 SubscribeResponse::DroppedAt(frontier) => {
2248 tracing::error!(
2253 %subscribe_id,
2254 %replica_id,
2255 frontier = ?frontier.elements(),
2256 "received `DroppedAt` response for a tracked subscribe",
2257 );
2258 self.subscribes.remove(&subscribe_id);
2259 }
2260 }
2261 }
2262
2263 fn handle_status_response(&self, response: StatusResponse, _replica_id: ReplicaId) {
2264 match response {
2265 StatusResponse::Placeholder => {}
2266 }
2267 }
2268
2269 fn dependency_write_frontiers<'b>(
2271 &'b self,
2272 collection: &'b CollectionState,
2273 ) -> impl Iterator<Item = Antichain<Timestamp>> + 'b {
2274 let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2275 let collection = self.collections.get(&dep_id);
2276 collection.map(|c| c.write_frontier())
2277 });
2278 let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2279 let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2280 frontiers.map(|f| f.write_frontier)
2281 });
2282
2283 compute_frontiers.chain(storage_frontiers)
2284 }
2285
2286 fn transitive_storage_dependency_write_frontiers<'b>(
2288 &'b self,
2289 collection: &'b CollectionState,
2290 ) -> impl Iterator<Item = Antichain<Timestamp>> + 'b {
2291 let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2292 let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2293 let mut done = BTreeSet::new();
2294
2295 while let Some(id) = todo.pop() {
2296 if done.contains(&id) {
2297 continue;
2298 }
2299 if let Some(dep) = self.collections.get(&id) {
2300 storage_ids.extend(dep.storage_dependency_ids());
2301 todo.extend(dep.compute_dependency_ids())
2302 }
2303 done.insert(id);
2304 }
2305
2306 let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2307 let frontiers = self.storage_collections.collection_frontiers(id).ok();
2308 frontiers.map(|f| f.write_frontier)
2309 });
2310
2311 storage_frontiers
2312 }
2313
2314 fn downgrade_warmup_capabilities(&mut self) {
2327 let mut new_capabilities = BTreeMap::new();
2328 for (id, collection) in &self.collections {
2329 if collection.read_policy.is_none()
2333 && collection.shared.lock_write_frontier(|f| f.is_empty())
2334 {
2335 new_capabilities.insert(*id, Antichain::new());
2336 continue;
2337 }
2338
2339 let mut new_capability = Antichain::new();
2340 for frontier in self.dependency_write_frontiers(collection) {
2341 for time in frontier {
2342 new_capability.insert(time.step_back().unwrap_or(time));
2343 }
2344 }
2345
2346 new_capabilities.insert(*id, new_capability);
2347 }
2348
2349 for (id, new_capability) in new_capabilities {
2350 let collection = self.expect_collection_mut(id);
2351 let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2352 }
2353 }
2354
2355 fn forward_implied_capabilities(&mut self) {
2383 if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2384 return;
2385 }
2386 if !self.replicas.is_empty() {
2387 return;
2388 }
2389
2390 let mut new_capabilities = BTreeMap::new();
2391 for (id, collection) in &self.collections {
2392 let Some(read_policy) = &collection.read_policy else {
2393 continue;
2395 };
2396
2397 let mut dep_frontier = Antichain::new();
2401 for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2402 dep_frontier.extend(frontier);
2403 }
2404
2405 let new_capability = read_policy.frontier(dep_frontier.borrow());
2406 if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2407 new_capabilities.insert(*id, new_capability);
2408 }
2409 }
2410
2411 for (id, new_capability) in new_capabilities {
2412 let collection = self.expect_collection_mut(id);
2413 let _ = collection.implied_read_hold.try_downgrade(new_capability);
2414 }
2415 }
2416
2417 pub(super) fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold, CollectionMissing> {
2422 let collection = self.collection(id)?;
2428 let since = collection.shared.lock_read_capabilities(|caps| {
2429 let since = caps.frontier().to_owned();
2430 caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
2431 since
2432 });
2433 let hold = ReadHold::new(id, since, Arc::clone(&self.read_hold_tx));
2434 Ok(hold)
2435 }
2436
2437 #[mz_ore::instrument(level = "debug")]
2443 pub fn maintain(&mut self) {
2444 self.rehydrate_failed_replicas();
2445 self.downgrade_warmup_capabilities();
2446 self.forward_implied_capabilities();
2447 self.schedule_collections();
2448 self.cleanup_collections();
2449 self.update_frontier_introspection();
2450 self.refresh_state_metrics();
2451 self.refresh_wallclock_lag();
2452 }
2453}
2454
2455#[derive(Debug)]
2460struct CollectionState {
2461 target_replica: Option<ReplicaId>,
2463 log_collection: bool,
2467 dropped: bool,
2473 scheduled: bool,
2476
2477 read_only: bool,
2481
2482 shared: SharedCollectionState,
2484
2485 implied_read_hold: ReadHold,
2492 warmup_read_hold: ReadHold,
2500 read_policy: Option<ReadPolicy>,
2506
2507 storage_dependencies: BTreeMap<GlobalId, ReadHold>,
2510 compute_dependencies: BTreeMap<GlobalId, ReadHold>,
2513
2514 introspection: CollectionIntrospection,
2516
2517 wallclock_lag_histogram_stash: Option<
2524 BTreeMap<
2525 (
2526 WallclockLagHistogramPeriod,
2527 WallclockLag,
2528 BTreeMap<&'static str, String>,
2529 ),
2530 Diff,
2531 >,
2532 >,
2533}
2534
2535impl CollectionState {
2536 fn new(
2538 collection_id: GlobalId,
2539 as_of: Antichain<Timestamp>,
2540 shared: SharedCollectionState,
2541 storage_dependencies: BTreeMap<GlobalId, ReadHold>,
2542 compute_dependencies: BTreeMap<GlobalId, ReadHold>,
2543 read_hold_tx: read_holds::ChangeTx,
2544 introspection: CollectionIntrospection,
2545 ) -> Self {
2546 let since = as_of.clone();
2548 let upper = as_of;
2550
2551 assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2553 assert!(shared.lock_write_frontier(|f| f == &upper));
2554
2555 let implied_read_hold =
2559 ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2560 let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2561
2562 let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2563 shared.lock_read_capabilities(|c| {
2564 c.update_iter(updates);
2565 });
2566
2567 let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2571 true => None,
2572 false => Some(Default::default()),
2573 };
2574
2575 Self {
2576 target_replica: None,
2577 log_collection: false,
2578 dropped: false,
2579 scheduled: false,
2580 read_only: true,
2581 shared,
2582 implied_read_hold,
2583 warmup_read_hold,
2584 read_policy: Some(ReadPolicy::ValidFrom(since)),
2585 storage_dependencies,
2586 compute_dependencies,
2587 introspection,
2588 wallclock_lag_histogram_stash,
2589 }
2590 }
2591
2592 fn new_log_collection(
2594 id: GlobalId,
2595 shared: SharedCollectionState,
2596 read_hold_tx: read_holds::ChangeTx,
2597 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2598 ) -> Self {
2599 let since = Antichain::from_elem(Timestamp::MIN);
2600 let introspection = CollectionIntrospection::new(
2601 id,
2602 introspection_tx,
2603 since.clone(),
2604 false,
2605 None,
2606 None,
2607 Vec::new(),
2608 );
2609 let mut state = Self::new(
2610 id,
2611 since,
2612 shared,
2613 Default::default(),
2614 Default::default(),
2615 read_hold_tx,
2616 introspection,
2617 );
2618 state.log_collection = true;
2619 state.scheduled = true;
2621 state
2622 }
2623
2624 fn read_frontier(&self) -> Antichain<Timestamp> {
2626 self.shared
2627 .lock_read_capabilities(|c| c.frontier().to_owned())
2628 }
2629
2630 fn write_frontier(&self) -> Antichain<Timestamp> {
2632 self.shared.lock_write_frontier(|f| f.clone())
2633 }
2634
2635 fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2636 self.storage_dependencies.keys().copied()
2637 }
2638
2639 fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2640 self.compute_dependencies.keys().copied()
2641 }
2642}
2643
2644#[derive(Clone, Debug)]
2655pub(super) struct SharedCollectionState {
2656 read_capabilities: Arc<Mutex<MutableAntichain<Timestamp>>>,
2669 write_frontier: Arc<Mutex<Antichain<Timestamp>>>,
2671}
2672
2673impl SharedCollectionState {
2674 pub fn new(as_of: Antichain<Timestamp>) -> Self {
2675 let since = as_of.clone();
2677 let upper = as_of;
2679
2680 let mut read_capabilities = MutableAntichain::new();
2684 read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2685
2686 Self {
2687 read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2688 write_frontier: Arc::new(Mutex::new(upper)),
2689 }
2690 }
2691
2692 pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2693 where
2694 F: FnOnce(&mut MutableAntichain<Timestamp>) -> R,
2695 {
2696 let mut caps = self.read_capabilities.lock().expect("poisoned");
2697 f(&mut *caps)
2698 }
2699
2700 pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2701 where
2702 F: FnOnce(&mut Antichain<Timestamp>) -> R,
2703 {
2704 let mut frontier = self.write_frontier.lock().expect("poisoned");
2705 f(&mut *frontier)
2706 }
2707}
2708
2709#[derive(Debug)]
2712struct CollectionIntrospection {
2713 collection_id: GlobalId,
2715 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2717 frontiers: Option<FrontiersIntrospectionState>,
2722 refresh: Option<RefreshIntrospectionState>,
2726 dependency_ids: Vec<GlobalId>,
2728}
2729
2730impl CollectionIntrospection {
2731 fn new(
2732 collection_id: GlobalId,
2733 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2734 as_of: Antichain<Timestamp>,
2735 storage_sink: bool,
2736 initial_as_of: Option<Antichain<Timestamp>>,
2737 refresh_schedule: Option<RefreshSchedule>,
2738 dependency_ids: Vec<GlobalId>,
2739 ) -> Self {
2740 let refresh =
2741 match (refresh_schedule, initial_as_of) {
2742 (Some(refresh_schedule), Some(initial_as_of)) => Some(
2743 RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2744 ),
2745 (refresh_schedule, _) => {
2746 soft_assert_or_log!(
2749 refresh_schedule.is_none(),
2750 "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2751 );
2752 None
2753 }
2754 };
2755 let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2756
2757 let self_ = Self {
2758 collection_id,
2759 introspection_tx,
2760 frontiers,
2761 refresh,
2762 dependency_ids,
2763 };
2764
2765 self_.report_initial_state();
2766 self_
2767 }
2768
2769 fn report_initial_state(&self) {
2771 if let Some(frontiers) = &self.frontiers {
2772 let row = frontiers.row_for_collection(self.collection_id);
2773 let updates = vec![(row, Diff::ONE)];
2774 self.send(IntrospectionType::Frontiers, updates);
2775 }
2776
2777 if let Some(refresh) = &self.refresh {
2778 let row = refresh.row_for_collection(self.collection_id);
2779 let updates = vec![(row, Diff::ONE)];
2780 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2781 }
2782
2783 if !self.dependency_ids.is_empty() {
2784 let updates = self.dependency_rows(Diff::ONE);
2785 self.send(IntrospectionType::ComputeDependencies, updates);
2786 }
2787 }
2788
2789 fn dependency_rows(&self, diff: Diff) -> Vec<(Row, Diff)> {
2791 self.dependency_ids
2792 .iter()
2793 .map(|dependency_id| {
2794 let row = Row::pack_slice(&[
2795 Datum::String(&self.collection_id.to_string()),
2796 Datum::String(&dependency_id.to_string()),
2797 ]);
2798 (row, diff)
2799 })
2800 .collect()
2801 }
2802
2803 fn observe_frontiers(
2806 &mut self,
2807 read_frontier: &Antichain<Timestamp>,
2808 write_frontier: &Antichain<Timestamp>,
2809 ) {
2810 self.update_frontier_introspection(read_frontier, write_frontier);
2811 self.update_refresh_introspection(write_frontier);
2812 }
2813
2814 fn update_frontier_introspection(
2815 &mut self,
2816 read_frontier: &Antichain<Timestamp>,
2817 write_frontier: &Antichain<Timestamp>,
2818 ) {
2819 let Some(frontiers) = &mut self.frontiers else {
2820 return;
2821 };
2822
2823 if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2824 {
2825 return; };
2827
2828 let retraction = frontiers.row_for_collection(self.collection_id);
2829 frontiers.update(read_frontier, write_frontier);
2830 let insertion = frontiers.row_for_collection(self.collection_id);
2831 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2832 self.send(IntrospectionType::Frontiers, updates);
2833 }
2834
2835 fn update_refresh_introspection(&mut self, write_frontier: &Antichain<Timestamp>) {
2836 let Some(refresh) = &mut self.refresh else {
2837 return;
2838 };
2839
2840 let retraction = refresh.row_for_collection(self.collection_id);
2841 refresh.frontier_update(write_frontier);
2842 let insertion = refresh.row_for_collection(self.collection_id);
2843
2844 if retraction == insertion {
2845 return; }
2847
2848 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2849 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2850 }
2851
2852 fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2853 let _ = self.introspection_tx.send((introspection_type, updates));
2856 }
2857}
2858
2859impl Drop for CollectionIntrospection {
2860 fn drop(&mut self) {
2861 if let Some(frontiers) = &self.frontiers {
2863 let row = frontiers.row_for_collection(self.collection_id);
2864 let updates = vec![(row, Diff::MINUS_ONE)];
2865 self.send(IntrospectionType::Frontiers, updates);
2866 }
2867
2868 if let Some(refresh) = &self.refresh {
2870 let retraction = refresh.row_for_collection(self.collection_id);
2871 let updates = vec![(retraction, Diff::MINUS_ONE)];
2872 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2873 }
2874
2875 if !self.dependency_ids.is_empty() {
2877 let updates = self.dependency_rows(Diff::MINUS_ONE);
2878 self.send(IntrospectionType::ComputeDependencies, updates);
2879 }
2880 }
2881}
2882
2883#[derive(Debug)]
2884struct FrontiersIntrospectionState {
2885 read_frontier: Antichain<Timestamp>,
2886 write_frontier: Antichain<Timestamp>,
2887}
2888
2889impl FrontiersIntrospectionState {
2890 fn new(as_of: Antichain<Timestamp>) -> Self {
2891 Self {
2892 read_frontier: as_of.clone(),
2893 write_frontier: as_of,
2894 }
2895 }
2896
2897 fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2899 let read_frontier = self
2900 .read_frontier
2901 .as_option()
2902 .map_or(Datum::Null, |ts| ts.clone().into());
2903 let write_frontier = self
2904 .write_frontier
2905 .as_option()
2906 .map_or(Datum::Null, |ts| ts.clone().into());
2907 Row::pack_slice(&[
2908 Datum::String(&collection_id.to_string()),
2909 read_frontier,
2910 write_frontier,
2911 ])
2912 }
2913
2914 fn update(
2916 &mut self,
2917 read_frontier: &Antichain<Timestamp>,
2918 write_frontier: &Antichain<Timestamp>,
2919 ) {
2920 if read_frontier != &self.read_frontier {
2921 self.read_frontier.clone_from(read_frontier);
2922 }
2923 if write_frontier != &self.write_frontier {
2924 self.write_frontier.clone_from(write_frontier);
2925 }
2926 }
2927}
2928
2929#[derive(Debug)]
2932struct RefreshIntrospectionState {
2933 refresh_schedule: RefreshSchedule,
2935 initial_as_of: Antichain<Timestamp>,
2936 next_refresh: Datum<'static>, last_completed_refresh: Datum<'static>, }
2940
2941impl RefreshIntrospectionState {
2942 fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2944 Row::pack_slice(&[
2945 Datum::String(&collection_id.to_string()),
2946 self.last_completed_refresh,
2947 self.next_refresh,
2948 ])
2949 }
2950}
2951
2952impl RefreshIntrospectionState {
2953 fn new(
2956 refresh_schedule: RefreshSchedule,
2957 initial_as_of: Antichain<Timestamp>,
2958 upper: &Antichain<Timestamp>,
2959 ) -> Self {
2960 let mut self_ = Self {
2961 refresh_schedule: refresh_schedule.clone(),
2962 initial_as_of: initial_as_of.clone(),
2963 next_refresh: Datum::Null,
2964 last_completed_refresh: Datum::Null,
2965 };
2966 self_.frontier_update(upper);
2967 self_
2968 }
2969
2970 fn frontier_update(&mut self, write_frontier: &Antichain<Timestamp>) {
2973 if write_frontier.is_empty() {
2974 self.last_completed_refresh =
2975 if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2976 last_refresh.into()
2977 } else {
2978 Timestamp::MAX.into()
2981 };
2982 self.next_refresh = Datum::Null;
2983 } else {
2984 if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2985 self.last_completed_refresh = Datum::Null;
2987 let initial_as_of = self.initial_as_of.as_option().expect(
2988 "initial_as_of can't be [], because then there would be no refreshes at all",
2989 );
2990 let first_refresh = self
2991 .refresh_schedule
2992 .round_up_timestamp(*initial_as_of)
2993 .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2994 soft_assert_or_log!(
2995 first_refresh == *initial_as_of,
2996 "initial_as_of should be set to the first refresh"
2997 );
2998 self.next_refresh = first_refresh.into();
2999 } else {
3000 let write_frontier = write_frontier.as_option().expect("checked above");
3002 self.last_completed_refresh = self
3003 .refresh_schedule
3004 .round_down_timestamp_m1(*write_frontier)
3005 .map_or_else(
3006 || {
3007 soft_panic_or_log!(
3008 "rounding down should have returned the first refresh or later"
3009 );
3010 Datum::Null
3011 },
3012 |last_completed_refresh| last_completed_refresh.into(),
3013 );
3014 self.next_refresh = write_frontier.clone().into();
3015 }
3016 }
3017 }
3018}
3019
3020#[derive(Debug)]
3022struct PendingPeek {
3023 target_replica: Option<ReplicaId>,
3027 otel_ctx: OpenTelemetryContext,
3029 requested_at: Instant,
3033 read_hold: ReadHold,
3035 peek_response_tx: oneshot::Sender<PeekResponse>,
3037 limit: Option<usize>,
3039 offset: usize,
3041}
3042
3043#[derive(Debug, Clone)]
3044struct ActiveSubscribe {
3045 frontier: Antichain<Timestamp>,
3047}
3048
3049impl Default for ActiveSubscribe {
3050 fn default() -> Self {
3051 Self {
3052 frontier: Antichain::from_elem(Timestamp::MIN),
3053 }
3054 }
3055}
3056
3057#[derive(Debug)]
3059struct ReplicaState {
3060 id: ReplicaId,
3062 client: ReplicaClient,
3064 config: ReplicaConfig,
3066 metrics: ReplicaMetrics,
3068 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3070 collections: BTreeMap<GlobalId, ReplicaCollectionState>,
3072 epoch: u64,
3074}
3075
3076impl ReplicaState {
3077 fn new(
3078 id: ReplicaId,
3079 client: ReplicaClient,
3080 config: ReplicaConfig,
3081 metrics: ReplicaMetrics,
3082 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3083 epoch: u64,
3084 ) -> Self {
3085 Self {
3086 id,
3087 client,
3088 config,
3089 metrics,
3090 introspection_tx,
3091 epoch,
3092 collections: Default::default(),
3093 }
3094 }
3095
3096 fn add_collection(
3102 &mut self,
3103 id: GlobalId,
3104 as_of: Antichain<Timestamp>,
3105 input_read_holds: Vec<ReadHold>,
3106 ) {
3107 let metrics = self.metrics.for_collection(id);
3108 let introspection = ReplicaCollectionIntrospection::new(
3109 self.id,
3110 id,
3111 self.introspection_tx.clone(),
3112 as_of.clone(),
3113 );
3114 let mut state =
3115 ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
3116
3117 if id.is_transient() {
3121 state.wallclock_lag_max = None;
3122 }
3123
3124 if let Some(previous) = self.collections.insert(id, state) {
3125 panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
3126 }
3127 }
3128
3129 fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState> {
3131 self.collections.remove(&id)
3132 }
3133
3134 fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
3136 self.collections.get(&id).map_or(true, |c| {
3137 c.write_frontier.is_empty()
3138 && c.input_frontier.is_empty()
3139 && c.output_frontier.is_empty()
3140 })
3141 }
3142
3143 #[mz_ore::instrument(level = "debug")]
3147 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3148 let Self {
3155 id,
3156 client: _,
3157 config: _,
3158 metrics: _,
3159 introspection_tx: _,
3160 epoch,
3161 collections,
3162 } = self;
3163
3164 let collections: BTreeMap<_, _> = collections
3165 .iter()
3166 .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
3167 .collect();
3168
3169 Ok(serde_json::json!({
3170 "id": id.to_string(),
3171 "collections": collections,
3172 "epoch": epoch,
3173 }))
3174 }
3175}
3176
3177#[derive(Debug)]
3178struct ReplicaCollectionState {
3179 write_frontier: Antichain<Timestamp>,
3183 input_frontier: Antichain<Timestamp>,
3187 output_frontier: Antichain<Timestamp>,
3191
3192 metrics: Option<ReplicaCollectionMetrics>,
3196 as_of: Antichain<Timestamp>,
3198 introspection: ReplicaCollectionIntrospection,
3200 input_read_holds: Vec<ReadHold>,
3206
3207 wallclock_lag_max: Option<WallclockLag>,
3211}
3212
3213impl ReplicaCollectionState {
3214 fn new(
3215 metrics: Option<ReplicaCollectionMetrics>,
3216 as_of: Antichain<Timestamp>,
3217 introspection: ReplicaCollectionIntrospection,
3218 input_read_holds: Vec<ReadHold>,
3219 ) -> Self {
3220 Self {
3221 write_frontier: as_of.clone(),
3222 input_frontier: as_of.clone(),
3223 output_frontier: as_of.clone(),
3224 metrics,
3225 as_of,
3226 introspection,
3227 input_read_holds,
3228 wallclock_lag_max: Some(WallclockLag::MIN),
3229 }
3230 }
3231
3232 fn hydrated(&self) -> bool {
3234 self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3250 }
3251
3252 fn update_write_frontier(&mut self, new_frontier: Antichain<Timestamp>) {
3254 if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3255 soft_panic_or_log!(
3256 "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3257 self.write_frontier,
3258 );
3259 return;
3260 } else if new_frontier == self.write_frontier {
3261 return;
3262 }
3263
3264 self.write_frontier = new_frontier;
3265 }
3266
3267 fn update_input_frontier(&mut self, new_frontier: Antichain<Timestamp>) {
3269 if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3270 soft_panic_or_log!(
3271 "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3272 self.input_frontier,
3273 );
3274 return;
3275 } else if new_frontier == self.input_frontier {
3276 return;
3277 }
3278
3279 self.input_frontier = new_frontier;
3280
3281 for read_hold in &mut self.input_read_holds {
3283 let result = read_hold.try_downgrade(self.input_frontier.clone());
3284 soft_assert_or_log!(
3285 result.is_ok(),
3286 "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3287 self.input_frontier,
3288 );
3289 }
3290 }
3291
3292 fn update_output_frontier(&mut self, new_frontier: Antichain<Timestamp>) {
3294 if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3295 soft_panic_or_log!(
3296 "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3297 self.output_frontier,
3298 );
3299 return;
3300 } else if new_frontier == self.output_frontier {
3301 return;
3302 }
3303
3304 self.output_frontier = new_frontier;
3305 }
3306}
3307
3308#[derive(Debug)]
3311struct ReplicaCollectionIntrospection {
3312 replica_id: ReplicaId,
3314 collection_id: GlobalId,
3316 write_frontier: Antichain<Timestamp>,
3318 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3320}
3321
3322impl ReplicaCollectionIntrospection {
3323 fn new(
3325 replica_id: ReplicaId,
3326 collection_id: GlobalId,
3327 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3328 as_of: Antichain<Timestamp>,
3329 ) -> Self {
3330 let self_ = Self {
3331 replica_id,
3332 collection_id,
3333 write_frontier: as_of,
3334 introspection_tx,
3335 };
3336
3337 self_.report_initial_state();
3338 self_
3339 }
3340
3341 fn report_initial_state(&self) {
3343 let row = self.write_frontier_row();
3344 let updates = vec![(row, Diff::ONE)];
3345 self.send(IntrospectionType::ReplicaFrontiers, updates);
3346 }
3347
3348 fn observe_frontier(&mut self, write_frontier: &Antichain<Timestamp>) {
3350 if self.write_frontier == *write_frontier {
3351 return; }
3353
3354 let retraction = self.write_frontier_row();
3355 self.write_frontier.clone_from(write_frontier);
3356 let insertion = self.write_frontier_row();
3357
3358 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3359 self.send(IntrospectionType::ReplicaFrontiers, updates);
3360 }
3361
3362 fn write_frontier_row(&self) -> Row {
3364 let write_frontier = self
3365 .write_frontier
3366 .as_option()
3367 .map_or(Datum::Null, |ts| ts.clone().into());
3368 Row::pack_slice(&[
3369 Datum::String(&self.collection_id.to_string()),
3370 Datum::String(&self.replica_id.to_string()),
3371 write_frontier,
3372 ])
3373 }
3374
3375 fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3376 let _ = self.introspection_tx.send((introspection_type, updates));
3379 }
3380}
3381
3382impl Drop for ReplicaCollectionIntrospection {
3383 fn drop(&mut self) {
3384 let row = self.write_frontier_row();
3386 let updates = vec![(row, Diff::MINUS_ONE)];
3387 self.send(IntrospectionType::ReplicaFrontiers, updates);
3388 }
3389}