1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
21use mz_compute_types::plan::render_plan::RenderPlan;
22use mz_compute_types::sinks::{
23 ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, MaterializedViewSinkConnection,
24};
25use mz_compute_types::sources::SourceInstanceDesc;
26use mz_controller_types::dyncfgs::{
27 ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, WALLCLOCK_LAG_RECORDING_INTERVAL,
28};
29use mz_dyncfg::ConfigSet;
30use mz_expr::RowSetFinishing;
31use mz_ore::cast::CastFrom;
32use mz_ore::channel::instrumented_unbounded_channel;
33use mz_ore::now::NowFn;
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{soft_assert_or_log, soft_panic_or_log};
36use mz_persist_types::PersistLocation;
37use mz_repr::adt::timestamp::CheckedTimestamp;
38use mz_repr::refresh_schedule::RefreshSchedule;
39use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row};
40use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
41use mz_storage_types::read_holds::{self, ReadHold};
42use mz_storage_types::read_policy::ReadPolicy;
43use thiserror::Error;
44use timely::PartialOrder;
45use timely::progress::frontier::MutableAntichain;
46use timely::progress::{Antichain, ChangeBatch, Timestamp};
47use tokio::sync::{mpsc, oneshot};
48use uuid::Uuid;
49
50use crate::controller::error::{
51 CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
52};
53use crate::controller::instance_client::PeekError;
54use crate::controller::replica::{ReplicaClient, ReplicaConfig};
55use crate::controller::{
56 ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, PeekNotification,
57 ReplicaId, StorageCollections,
58};
59use crate::logging::LogVariant;
60use crate::metrics::IntCounter;
61use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
62use crate::protocol::command::{
63 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
64};
65use crate::protocol::history::ComputeCommandHistory;
66use crate::protocol::response::{
67 ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StatusResponse,
68 SubscribeBatch, SubscribeResponse,
69};
70
71#[derive(Error, Debug)]
72#[error("replica exists already: {0}")]
73pub(super) struct ReplicaExists(pub ReplicaId);
74
75#[derive(Error, Debug)]
76#[error("replica does not exist: {0}")]
77pub(super) struct ReplicaMissing(pub ReplicaId);
78
79#[derive(Error, Debug)]
80pub(super) enum DataflowCreationError {
81 #[error("collection does not exist: {0}")]
82 CollectionMissing(GlobalId),
83 #[error("replica does not exist: {0}")]
84 ReplicaMissing(ReplicaId),
85 #[error("dataflow definition lacks an as_of value")]
86 MissingAsOf,
87 #[error("subscribe dataflow has an empty as_of")]
88 EmptyAsOfForSubscribe,
89 #[error("copy to dataflow has an empty as_of")]
90 EmptyAsOfForCopyTo,
91 #[error("no read hold provided for dataflow import: {0}")]
92 ReadHoldMissing(GlobalId),
93 #[error("insufficient read hold provided for dataflow import: {0}")]
94 ReadHoldInsufficient(GlobalId),
95}
96
97impl From<CollectionMissing> for DataflowCreationError {
98 fn from(error: CollectionMissing) -> Self {
99 Self::CollectionMissing(error.0)
100 }
101}
102
103#[derive(Error, Debug)]
104pub(super) enum ReadPolicyError {
105 #[error("collection does not exist: {0}")]
106 CollectionMissing(GlobalId),
107 #[error("collection is write-only: {0}")]
108 WriteOnlyCollection(GlobalId),
109}
110
111impl From<CollectionMissing> for ReadPolicyError {
112 fn from(error: CollectionMissing) -> Self {
113 Self::CollectionMissing(error.0)
114 }
115}
116
117pub(super) type Command<T> = Box<dyn FnOnce(&mut Instance<T>) + Send>;
119
120pub(super) type ReplicaResponse<T> = (ReplicaId, u64, ComputeResponse<T>);
123
124pub(super) struct Instance<T: ComputeControllerTimestamp> {
126 build_info: &'static BuildInfo,
128 storage_collections: StorageCollections<T>,
130 initialized: bool,
132 read_only: bool,
137 workload_class: Option<String>,
141 replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
143 collections: BTreeMap<GlobalId, CollectionState<T>>,
151 log_sources: BTreeMap<LogVariant, GlobalId>,
153 peeks: BTreeMap<Uuid, PendingPeek<T>>,
162 subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>,
176 copy_tos: BTreeSet<GlobalId>,
184 history: ComputeCommandHistory<UIntGauge, T>,
186 command_rx: mpsc::UnboundedReceiver<Command<T>>,
188 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
190 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
192 metrics: InstanceMetrics,
194 dyncfg: Arc<ConfigSet>,
196
197 peek_stash_persist_location: PersistLocation,
199
200 now: NowFn,
202 wallclock_lag: WallclockLagFn<T>,
204 wallclock_lag_last_recorded: DateTime<Utc>,
206
207 read_hold_tx: read_holds::ChangeTx<T>,
212 replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
214 replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse<T>, IntCounter>,
216}
217
218impl<T: ComputeControllerTimestamp> Instance<T> {
219 fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, CollectionMissing> {
221 self.collections.get(&id).ok_or(CollectionMissing(id))
222 }
223
224 fn collection_mut(
226 &mut self,
227 id: GlobalId,
228 ) -> Result<&mut CollectionState<T>, CollectionMissing> {
229 self.collections.get_mut(&id).ok_or(CollectionMissing(id))
230 }
231
232 fn expect_collection(&self, id: GlobalId) -> &CollectionState<T> {
238 self.collections.get(&id).expect("collection must exist")
239 }
240
241 fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T> {
247 self.collections
248 .get_mut(&id)
249 .expect("collection must exist")
250 }
251
252 fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState<T>)> {
253 self.collections.iter().map(|(id, coll)| (*id, coll))
254 }
255
256 fn replicas_hosting(
263 &self,
264 id: GlobalId,
265 ) -> Result<impl Iterator<Item = &ReplicaState<T>>, CollectionMissing> {
266 let target = self.collection(id)?.target_replica;
267 Ok(self
268 .replicas
269 .values()
270 .filter(move |r| target.map_or(true, |t| t == r.id)))
271 }
272
273 fn add_collection(
279 &mut self,
280 id: GlobalId,
281 as_of: Antichain<T>,
282 shared: SharedCollectionState<T>,
283 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
284 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
285 replica_input_read_holds: Vec<ReadHold<T>>,
286 write_only: bool,
287 storage_sink: bool,
288 initial_as_of: Option<Antichain<T>>,
289 refresh_schedule: Option<RefreshSchedule>,
290 target_replica: Option<ReplicaId>,
291 ) {
292 let dependency_ids: Vec<GlobalId> = compute_dependencies
294 .keys()
295 .chain(storage_dependencies.keys())
296 .copied()
297 .collect();
298 let introspection = CollectionIntrospection::new(
299 id,
300 self.introspection_tx.clone(),
301 as_of.clone(),
302 storage_sink,
303 initial_as_of,
304 refresh_schedule,
305 dependency_ids,
306 );
307 let mut state = CollectionState::new(
308 id,
309 as_of.clone(),
310 shared,
311 storage_dependencies,
312 compute_dependencies,
313 Arc::clone(&self.read_hold_tx),
314 introspection,
315 );
316 state.target_replica = target_replica;
317 if write_only {
319 state.read_policy = None;
320 }
321
322 if let Some(previous) = self.collections.insert(id, state) {
323 panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
324 }
325
326 for replica in self.replicas.values_mut() {
328 if target_replica.is_some_and(|id| id != replica.id) {
329 continue;
330 }
331 replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
332 }
333 }
334
335 fn remove_collection(&mut self, id: GlobalId) {
336 for replica in self.replicas.values_mut() {
338 replica.remove_collection(id);
339 }
340
341 self.collections.remove(&id);
343 }
344
345 fn add_replica_state(
346 &mut self,
347 id: ReplicaId,
348 client: ReplicaClient<T>,
349 config: ReplicaConfig,
350 epoch: u64,
351 ) {
352 let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
353
354 let metrics = self.metrics.for_replica(id);
355 let mut replica = ReplicaState::new(
356 id,
357 client,
358 config,
359 metrics,
360 self.introspection_tx.clone(),
361 epoch,
362 );
363
364 for (collection_id, collection) in &self.collections {
366 if (collection.log_collection && !log_ids.contains(collection_id))
369 || collection.target_replica.is_some_and(|rid| rid != id)
370 {
371 continue;
372 }
373
374 let as_of = if collection.log_collection {
375 Antichain::from_elem(T::minimum())
380 } else {
381 collection.read_frontier().to_owned()
382 };
383
384 let input_read_holds = collection.storage_dependencies.values().cloned().collect();
385 replica.add_collection(*collection_id, as_of, input_read_holds);
386 }
387
388 self.replicas.insert(id, replica);
389 }
390
391 fn deliver_response(&self, response: ComputeControllerResponse<T>) {
393 let _ = self.response_tx.send(response);
396 }
397
398 fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
400 let _ = self.introspection_tx.send((type_, updates));
403 }
404
405 fn replica_exists(&self, id: ReplicaId) -> bool {
407 self.replicas.contains_key(&id)
408 }
409
410 fn peeks_targeting(
412 &self,
413 replica_id: ReplicaId,
414 ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)> {
415 self.peeks.iter().filter_map(move |(uuid, peek)| {
416 if peek.target_replica == Some(replica_id) {
417 Some((*uuid, peek))
418 } else {
419 None
420 }
421 })
422 }
423
424 fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
426 self.subscribes.keys().copied().filter(move |id| {
427 let collection = self.expect_collection(*id);
428 collection.target_replica == Some(replica_id)
429 })
430 }
431
432 fn update_frontier_introspection(&mut self) {
441 for collection in self.collections.values_mut() {
442 collection
443 .introspection
444 .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
445 }
446
447 for replica in self.replicas.values_mut() {
448 for collection in replica.collections.values_mut() {
449 collection
450 .introspection
451 .observe_frontier(&collection.write_frontier);
452 }
453 }
454 }
455
456 fn refresh_state_metrics(&self) {
465 let unscheduled_collections_count =
466 self.collections.values().filter(|c| !c.scheduled).count();
467 let connected_replica_count = self
468 .replicas
469 .values()
470 .filter(|r| r.client.is_connected())
471 .count();
472
473 self.metrics
474 .replica_count
475 .set(u64::cast_from(self.replicas.len()));
476 self.metrics
477 .collection_count
478 .set(u64::cast_from(self.collections.len()));
479 self.metrics
480 .collection_unscheduled_count
481 .set(u64::cast_from(unscheduled_collections_count));
482 self.metrics
483 .peek_count
484 .set(u64::cast_from(self.peeks.len()));
485 self.metrics
486 .subscribe_count
487 .set(u64::cast_from(self.subscribes.len()));
488 self.metrics
489 .copy_to_count
490 .set(u64::cast_from(self.copy_tos.len()));
491 self.metrics
492 .connected_replica_count
493 .set(u64::cast_from(connected_replica_count));
494 }
495
496 fn refresh_wallclock_lag(&mut self) {
515 let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
516 Some(ts) => (self.wallclock_lag)(ts.clone()),
517 None => Duration::ZERO,
518 };
519
520 let now_ms = (self.now)();
521 let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
522 let histogram_labels = match &self.workload_class {
523 Some(wc) => [("workload_class", wc.clone())].into(),
524 None => BTreeMap::new(),
525 };
526
527 let readable_storage_collections: BTreeSet<_> = self
530 .collections
531 .keys()
532 .filter_map(|id| {
533 let frontiers = self.storage_collections.collection_frontiers(*id).ok()?;
534 PartialOrder::less_than(&frontiers.read_capabilities, &frontiers.write_frontier)
535 .then_some(*id)
536 })
537 .collect();
538
539 for (id, collection) in &mut self.collections {
541 let write_frontier = collection.write_frontier();
542 let readable = if self.storage_collections.check_exists(*id).is_ok() {
543 readable_storage_collections.contains(id)
544 } else {
545 PartialOrder::less_than(&collection.read_frontier(), &write_frontier)
546 };
547
548 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
549 let bucket = if readable {
550 let lag = frontier_lag(&write_frontier);
551 let lag = lag.as_secs().next_power_of_two();
552 WallclockLag::Seconds(lag)
553 } else {
554 WallclockLag::Undefined
555 };
556
557 let key = (histogram_period, bucket, histogram_labels.clone());
558 *stash.entry(key).or_default() += Diff::ONE;
559 }
560 }
561
562 for replica in self.replicas.values_mut() {
564 for (id, collection) in &mut replica.collections {
565 let readable = readable_storage_collections.contains(id) || collection.hydrated();
570
571 let lag = if readable {
572 let lag = frontier_lag(&collection.write_frontier);
573 WallclockLag::Seconds(lag.as_secs())
574 } else {
575 WallclockLag::Undefined
576 };
577
578 if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
579 *wallclock_lag_max = (*wallclock_lag_max).max(lag);
580 }
581
582 if let Some(metrics) = &mut collection.metrics {
583 let secs = lag.unwrap_seconds_or(u64::MAX);
586 metrics.wallclock_lag.observe(secs);
587 };
588 }
589 }
590
591 self.maybe_record_wallclock_lag();
593 }
594
595 fn maybe_record_wallclock_lag(&mut self) {
603 if self.read_only {
604 return;
605 }
606
607 let duration_trunc = |datetime: DateTime<_>, interval| {
608 let td = TimeDelta::from_std(interval).ok()?;
609 datetime.duration_trunc(td).ok()
610 };
611
612 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
613 let now_dt = mz_ore::now::to_datetime((self.now)());
614 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
615 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
616 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
617 duration_trunc(now_dt, *default).unwrap()
618 });
619 if now_trunc <= self.wallclock_lag_last_recorded {
620 return;
621 }
622
623 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
624
625 let mut history_updates = Vec::new();
626 for (replica_id, replica) in &mut self.replicas {
627 for (collection_id, collection) in &mut replica.collections {
628 let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
629 continue;
630 };
631
632 let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
633 let row = Row::pack_slice(&[
634 Datum::String(&collection_id.to_string()),
635 Datum::String(&replica_id.to_string()),
636 max_lag.into_interval_datum(),
637 Datum::TimestampTz(now_ts),
638 ]);
639 history_updates.push((row, Diff::ONE));
640 }
641 }
642 if !history_updates.is_empty() {
643 self.deliver_introspection_updates(
644 IntrospectionType::WallclockLagHistory,
645 history_updates,
646 );
647 }
648
649 let mut histogram_updates = Vec::new();
650 let mut row_buf = Row::default();
651 for (collection_id, collection) in &mut self.collections {
652 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
653 continue;
654 };
655
656 for ((period, lag, labels), count) in std::mem::take(stash) {
657 let mut packer = row_buf.packer();
658 packer.extend([
659 Datum::TimestampTz(period.start),
660 Datum::TimestampTz(period.end),
661 Datum::String(&collection_id.to_string()),
662 lag.into_uint64_datum(),
663 ]);
664 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
665 packer.push_dict(labels);
666
667 histogram_updates.push((row_buf.clone(), count));
668 }
669 }
670 if !histogram_updates.is_empty() {
671 self.deliver_introspection_updates(
672 IntrospectionType::WallclockLagHistogram,
673 histogram_updates,
674 );
675 }
676
677 self.wallclock_lag_last_recorded = now_trunc;
678 }
679
680 #[mz_ore::instrument(level = "debug")]
686 pub fn collection_hydrated(&self, collection_id: GlobalId) -> Result<bool, CollectionMissing> {
687 let mut hosting_replicas = self.replicas_hosting(collection_id)?.peekable();
688 if hosting_replicas.peek().is_none() {
689 return Ok(true);
690 }
691 for replica_state in hosting_replicas {
692 let collection_state = replica_state
693 .collections
694 .get(&collection_id)
695 .expect("hosting replica must have per-replica collection state");
696
697 if collection_state.hydrated() {
698 return Ok(true);
699 }
700 }
701
702 Ok(false)
703 }
704
705 #[mz_ore::instrument(level = "debug")]
711 pub fn collections_hydrated_on_replicas(
712 &self,
713 target_replica_ids: Option<Vec<ReplicaId>>,
714 exclude_collections: &BTreeSet<GlobalId>,
715 ) -> Result<bool, HydrationCheckBadTarget> {
716 if self.replicas.is_empty() {
717 return Ok(true);
718 }
719 let mut all_hydrated = true;
720 let target_replicas: BTreeSet<ReplicaId> = self
721 .replicas
722 .keys()
723 .filter_map(|id| match target_replica_ids {
724 None => Some(id.clone()),
725 Some(ref ids) if ids.contains(id) => Some(id.clone()),
726 Some(_) => None,
727 })
728 .collect();
729 if let Some(targets) = target_replica_ids {
730 if target_replicas.is_empty() {
731 return Err(HydrationCheckBadTarget(targets));
732 }
733 }
734
735 for (id, _collection) in self.collections_iter() {
736 if id.is_transient() || exclude_collections.contains(&id) {
737 continue;
738 }
739
740 let mut collection_hydrated = false;
741 for replica_state in self.replicas_hosting(id).expect("collection must exist") {
744 if !target_replicas.contains(&replica_state.id) {
745 continue;
746 }
747 let collection_state = replica_state
748 .collections
749 .get(&id)
750 .expect("hosting replica must have per-replica collection state");
751
752 if collection_state.hydrated() {
753 collection_hydrated = true;
754 break;
755 }
756 }
757
758 if !collection_hydrated {
759 tracing::info!("collection {id} is not hydrated on any replica");
760 all_hydrated = false;
761 }
764 }
765
766 Ok(all_hydrated)
767 }
768
769 fn cleanup_collections(&mut self) {
785 let to_remove: Vec<_> = self
786 .collections_iter()
787 .filter(|(id, collection)| {
788 collection.dropped
789 && collection.shared.lock_read_capabilities(|c| c.is_empty())
790 && self
791 .replicas
792 .values()
793 .all(|r| r.collection_frontiers_empty(*id))
794 })
795 .map(|(id, _collection)| id)
796 .collect();
797
798 for id in to_remove {
799 self.remove_collection(id);
800 }
801 }
802
803 #[mz_ore::instrument(level = "debug")]
807 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
808 let Self {
815 build_info: _,
816 storage_collections: _,
817 peek_stash_persist_location: _,
818 initialized,
819 read_only,
820 workload_class,
821 replicas,
822 collections,
823 log_sources: _,
824 peeks,
825 subscribes,
826 copy_tos,
827 history: _,
828 command_rx: _,
829 response_tx: _,
830 introspection_tx: _,
831 metrics: _,
832 dyncfg: _,
833 now: _,
834 wallclock_lag: _,
835 wallclock_lag_last_recorded,
836 read_hold_tx: _,
837 replica_tx: _,
838 replica_rx: _,
839 } = self;
840
841 let replicas: BTreeMap<_, _> = replicas
842 .iter()
843 .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
844 .collect::<Result<_, anyhow::Error>>()?;
845 let collections: BTreeMap<_, _> = collections
846 .iter()
847 .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
848 .collect();
849 let peeks: BTreeMap<_, _> = peeks
850 .iter()
851 .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
852 .collect();
853 let subscribes: BTreeMap<_, _> = subscribes
854 .iter()
855 .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
856 .collect();
857 let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
858 let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
859
860 Ok(serde_json::json!({
861 "initialized": initialized,
862 "read_only": read_only,
863 "workload_class": workload_class,
864 "replicas": replicas,
865 "collections": collections,
866 "peeks": peeks,
867 "subscribes": subscribes,
868 "copy_tos": copy_tos,
869 "wallclock_lag_last_recorded": wallclock_lag_last_recorded,
870 }))
871 }
872
873 pub(super) fn collection_write_frontier(
875 &self,
876 id: GlobalId,
877 ) -> Result<Antichain<T>, CollectionMissing> {
878 Ok(self.collection(id)?.write_frontier())
879 }
880}
881
882impl<T> Instance<T>
883where
884 T: ComputeControllerTimestamp,
885{
886 pub(super) fn new(
887 build_info: &'static BuildInfo,
888 storage: StorageCollections<T>,
889 peek_stash_persist_location: PersistLocation,
890 arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
891 metrics: InstanceMetrics,
892 now: NowFn,
893 wallclock_lag: WallclockLagFn<T>,
894 dyncfg: Arc<ConfigSet>,
895 command_rx: mpsc::UnboundedReceiver<Command<T>>,
896 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
897 read_hold_tx: read_holds::ChangeTx<T>,
898 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
899 read_only: bool,
900 ) -> Self {
901 let mut collections = BTreeMap::new();
902 let mut log_sources = BTreeMap::new();
903 for (log, id, shared) in arranged_logs {
904 let collection = CollectionState::new_log_collection(
905 id,
906 shared,
907 Arc::clone(&read_hold_tx),
908 introspection_tx.clone(),
909 );
910 collections.insert(id, collection);
911 log_sources.insert(log, id);
912 }
913
914 let history = ComputeCommandHistory::new(metrics.for_history());
915
916 let send_count = metrics.response_send_count.clone();
917 let recv_count = metrics.response_recv_count.clone();
918 let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
919
920 let now_dt = mz_ore::now::to_datetime(now());
921
922 Self {
923 build_info,
924 storage_collections: storage,
925 peek_stash_persist_location,
926 initialized: false,
927 read_only,
928 workload_class: None,
929 replicas: Default::default(),
930 collections,
931 log_sources,
932 peeks: Default::default(),
933 subscribes: Default::default(),
934 copy_tos: Default::default(),
935 history,
936 command_rx,
937 response_tx,
938 introspection_tx,
939 metrics,
940 dyncfg,
941 now,
942 wallclock_lag,
943 wallclock_lag_last_recorded: now_dt,
944 read_hold_tx,
945 replica_tx,
946 replica_rx,
947 }
948 }
949
950 pub(super) async fn run(mut self) {
951 self.send(ComputeCommand::Hello {
952 nonce: Uuid::default(),
955 });
956
957 let instance_config = InstanceConfig {
958 peek_stash_persist_location: self.peek_stash_persist_location.clone(),
959 logging: Default::default(),
962 expiration_offset: Default::default(),
963 };
964
965 self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
966
967 loop {
968 tokio::select! {
969 command = self.command_rx.recv() => match command {
970 Some(cmd) => cmd(&mut self),
971 None => break,
972 },
973 response = self.replica_rx.recv() => match response {
974 Some(response) => self.handle_response(response),
975 None => unreachable!("self owns a sender side of the channel"),
976 }
977 }
978 }
979 }
980
981 #[mz_ore::instrument(level = "debug")]
983 pub fn update_configuration(&mut self, config_params: ComputeParameters) {
984 if let Some(workload_class) = &config_params.workload_class {
985 self.workload_class = workload_class.clone();
986 }
987
988 let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
989 self.send(command);
990 }
991
992 #[mz_ore::instrument(level = "debug")]
997 pub fn initialization_complete(&mut self) {
998 if !self.initialized {
1000 self.send(ComputeCommand::InitializationComplete);
1001 self.initialized = true;
1002 }
1003 }
1004
1005 #[mz_ore::instrument(level = "debug")]
1009 pub fn allow_writes(&mut self, collection_id: GlobalId) -> Result<(), CollectionMissing> {
1010 let collection = self.collection_mut(collection_id)?;
1011
1012 if !collection.read_only {
1014 return Ok(());
1015 }
1016
1017 let as_of = collection.read_frontier();
1019
1020 if as_of.is_empty() {
1023 return Ok(());
1024 }
1025
1026 collection.read_only = false;
1027 self.send(ComputeCommand::AllowWrites(collection_id));
1028
1029 Ok(())
1030 }
1031
1032 #[mz_ore::instrument(level = "debug")]
1042 pub fn shutdown(&mut self) {
1043 let (_tx, rx) = mpsc::unbounded_channel();
1045 self.command_rx = rx;
1046
1047 let stray_replicas: Vec<_> = self.replicas.keys().collect();
1048 soft_assert_or_log!(
1049 stray_replicas.is_empty(),
1050 "dropped instance still has provisioned replicas: {stray_replicas:?}",
1051 );
1052 }
1053
1054 #[mz_ore::instrument(level = "debug")]
1056 fn send(&mut self, cmd: ComputeCommand<T>) {
1057 self.history.push(cmd.clone());
1059
1060 let target_replica = self.target_replica(&cmd);
1061
1062 if let Some(rid) = target_replica {
1063 if let Some(replica) = self.replicas.get_mut(&rid) {
1064 let _ = replica.client.send(cmd);
1065 }
1066 } else {
1067 for replica in self.replicas.values_mut() {
1068 let _ = replica.client.send(cmd.clone());
1069 }
1070 }
1071 }
1072
1073 fn target_replica(&self, cmd: &ComputeCommand<T>) -> Option<ReplicaId> {
1081 match &cmd {
1082 ComputeCommand::Schedule(id)
1083 | ComputeCommand::AllowWrites(id)
1084 | ComputeCommand::AllowCompaction { id, .. } => {
1085 self.expect_collection(*id).target_replica
1086 }
1087 ComputeCommand::CreateDataflow(desc) => {
1088 let mut target_replica = None;
1089 for id in desc.export_ids() {
1090 if let Some(replica) = self.expect_collection(id).target_replica {
1091 if target_replica.is_some() {
1092 assert_eq!(target_replica, Some(replica));
1093 }
1094 target_replica = Some(replica);
1095 }
1096 }
1097 target_replica
1098 }
1099 ComputeCommand::Peek(_)
1101 | ComputeCommand::Hello { .. }
1102 | ComputeCommand::CreateInstance(_)
1103 | ComputeCommand::InitializationComplete
1104 | ComputeCommand::UpdateConfiguration(_)
1105 | ComputeCommand::CancelPeek { .. } => None,
1106 }
1107 }
1108
1109 #[mz_ore::instrument(level = "debug")]
1111 pub fn add_replica(
1112 &mut self,
1113 id: ReplicaId,
1114 mut config: ReplicaConfig,
1115 epoch: Option<u64>,
1116 ) -> Result<(), ReplicaExists> {
1117 if self.replica_exists(id) {
1118 return Err(ReplicaExists(id));
1119 }
1120
1121 config.logging.index_logs = self.log_sources.clone();
1122
1123 let epoch = epoch.unwrap_or(1);
1124 let metrics = self.metrics.for_replica(id);
1125 let client = ReplicaClient::spawn(
1126 id,
1127 self.build_info,
1128 config.clone(),
1129 epoch,
1130 metrics.clone(),
1131 Arc::clone(&self.dyncfg),
1132 self.replica_tx.clone(),
1133 );
1134
1135 self.history.reduce();
1137
1138 self.history.update_source_uppers(&self.storage_collections);
1140
1141 for command in self.history.iter() {
1143 if let Some(target_replica) = self.target_replica(command)
1145 && target_replica != id
1146 {
1147 continue;
1148 }
1149
1150 if client.send(command.clone()).is_err() {
1151 tracing::warn!("Replica {:?} connection terminated during hydration", id);
1154 break;
1155 }
1156 }
1157
1158 self.add_replica_state(id, client, config, epoch);
1160
1161 Ok(())
1162 }
1163
1164 #[mz_ore::instrument(level = "debug")]
1166 pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1167 self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1168
1169 let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1173 for subscribe_id in to_drop {
1174 let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1175 let response = ComputeControllerResponse::SubscribeResponse(
1176 subscribe_id,
1177 SubscribeBatch {
1178 lower: subscribe.frontier.clone(),
1179 upper: subscribe.frontier,
1180 updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1181 },
1182 );
1183 self.deliver_response(response);
1184 }
1185
1186 let mut peek_responses = Vec::new();
1191 let mut to_drop = Vec::new();
1192 for (uuid, peek) in self.peeks_targeting(id) {
1193 peek_responses.push(ComputeControllerResponse::PeekNotification(
1194 uuid,
1195 PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1196 peek.otel_ctx.clone(),
1197 ));
1198 to_drop.push(uuid);
1199 }
1200 for response in peek_responses {
1201 self.deliver_response(response);
1202 }
1203 for uuid in to_drop {
1204 let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1205 self.finish_peek(uuid, response);
1206 }
1207
1208 self.forward_implied_capabilities();
1211
1212 Ok(())
1213 }
1214
1215 fn rehydrate_replica(&mut self, id: ReplicaId) {
1221 let config = self.replicas[&id].config.clone();
1222 let epoch = self.replicas[&id].epoch + 1;
1223
1224 self.remove_replica(id).expect("replica must exist");
1225 let result = self.add_replica(id, config, Some(epoch));
1226
1227 match result {
1228 Ok(()) => (),
1229 Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1230 }
1231 }
1232
1233 fn rehydrate_failed_replicas(&mut self) {
1235 let replicas = self.replicas.iter();
1236 let failed_replicas: Vec<_> = replicas
1237 .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1238 .collect();
1239
1240 for replica_id in failed_replicas {
1241 self.rehydrate_replica(replica_id);
1242 }
1243 }
1244
1245 #[mz_ore::instrument(level = "debug")]
1250 pub fn create_dataflow(
1251 &mut self,
1252 dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1253 import_read_holds: Vec<ReadHold<T>>,
1254 mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState<T>>,
1255 target_replica: Option<ReplicaId>,
1256 ) -> Result<(), DataflowCreationError> {
1257 use DataflowCreationError::*;
1258
1259 if let Some(replica_id) = target_replica {
1263 if !self.replica_exists(replica_id) {
1264 return Err(ReplicaMissing(replica_id));
1265 }
1266 }
1267
1268 let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1270 if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1271 return Err(EmptyAsOfForSubscribe);
1272 }
1273 if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1274 return Err(EmptyAsOfForCopyTo);
1275 }
1276
1277 let mut storage_dependencies = BTreeMap::new();
1279 let mut compute_dependencies = BTreeMap::new();
1280
1281 let mut replica_input_read_holds = Vec::new();
1286
1287 let mut import_read_holds: BTreeMap<_, _> =
1288 import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1289
1290 for &id in dataflow.source_imports.keys() {
1291 let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1292 replica_input_read_holds.push(read_hold.clone());
1293
1294 read_hold
1295 .try_downgrade(as_of.clone())
1296 .map_err(|_| ReadHoldInsufficient(id))?;
1297 storage_dependencies.insert(id, read_hold);
1298 }
1299
1300 for &id in dataflow.index_imports.keys() {
1301 let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1302 read_hold
1303 .try_downgrade(as_of.clone())
1304 .map_err(|_| ReadHoldInsufficient(id))?;
1305 compute_dependencies.insert(id, read_hold);
1306 }
1307
1308 if as_of.is_empty() {
1311 replica_input_read_holds = Default::default();
1312 }
1313
1314 for export_id in dataflow.export_ids() {
1316 let shared = shared_collection_state
1317 .remove(&export_id)
1318 .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1319 let write_only = dataflow.sink_exports.contains_key(&export_id);
1320 let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1321
1322 self.add_collection(
1323 export_id,
1324 as_of.clone(),
1325 shared,
1326 storage_dependencies.clone(),
1327 compute_dependencies.clone(),
1328 replica_input_read_holds.clone(),
1329 write_only,
1330 storage_sink,
1331 dataflow.initial_storage_as_of.clone(),
1332 dataflow.refresh_schedule.clone(),
1333 target_replica,
1334 );
1335
1336 if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1339 self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1340 }
1341 }
1342
1343 for subscribe_id in dataflow.subscribe_ids() {
1345 self.subscribes
1346 .insert(subscribe_id, ActiveSubscribe::default());
1347 }
1348
1349 for copy_to_id in dataflow.copy_to_ids() {
1351 self.copy_tos.insert(copy_to_id);
1352 }
1353
1354 let mut source_imports = BTreeMap::new();
1357 for (id, import) in dataflow.source_imports {
1358 let frontiers = self
1359 .storage_collections
1360 .collection_frontiers(id)
1361 .expect("collection exists");
1362
1363 let collection_metadata = self
1364 .storage_collections
1365 .collection_metadata(id)
1366 .expect("we have a read hold on this collection");
1367
1368 let desc = SourceInstanceDesc {
1369 storage_metadata: collection_metadata.clone(),
1370 arguments: import.desc.arguments,
1371 typ: import.desc.typ.clone(),
1372 };
1373 source_imports.insert(
1374 id,
1375 mz_compute_types::dataflows::SourceImport {
1376 desc,
1377 monotonic: import.monotonic,
1378 with_snapshot: import.with_snapshot,
1379 upper: frontiers.write_frontier,
1380 },
1381 );
1382 }
1383
1384 let mut sink_exports = BTreeMap::new();
1385 for (id, se) in dataflow.sink_exports {
1386 let connection = match se.connection {
1387 ComputeSinkConnection::MaterializedView(conn) => {
1388 let metadata = self
1389 .storage_collections
1390 .collection_metadata(id)
1391 .map_err(|_| CollectionMissing(id))?
1392 .clone();
1393 let conn = MaterializedViewSinkConnection {
1394 value_desc: conn.value_desc,
1395 storage_metadata: metadata,
1396 };
1397 ComputeSinkConnection::MaterializedView(conn)
1398 }
1399 ComputeSinkConnection::ContinualTask(conn) => {
1400 let metadata = self
1401 .storage_collections
1402 .collection_metadata(id)
1403 .map_err(|_| DataflowCreationError::CollectionMissing(id))?
1404 .clone();
1405 let conn = ContinualTaskConnection {
1406 input_id: conn.input_id,
1407 storage_metadata: metadata,
1408 };
1409 ComputeSinkConnection::ContinualTask(conn)
1410 }
1411 ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1412 ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1413 ComputeSinkConnection::CopyToS3Oneshot(conn)
1414 }
1415 };
1416 let desc = ComputeSinkDesc {
1417 from: se.from,
1418 from_desc: se.from_desc,
1419 connection,
1420 with_snapshot: se.with_snapshot,
1421 up_to: se.up_to,
1422 non_null_assertions: se.non_null_assertions,
1423 refresh_schedule: se.refresh_schedule,
1424 };
1425 sink_exports.insert(id, desc);
1426 }
1427
1428 let objects_to_build = dataflow
1430 .objects_to_build
1431 .into_iter()
1432 .map(|object| BuildDesc {
1433 id: object.id,
1434 plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1435 })
1436 .collect();
1437
1438 let augmented_dataflow = DataflowDescription {
1439 source_imports,
1440 sink_exports,
1441 objects_to_build,
1442 index_imports: dataflow.index_imports,
1444 index_exports: dataflow.index_exports,
1445 as_of: dataflow.as_of.clone(),
1446 until: dataflow.until,
1447 initial_storage_as_of: dataflow.initial_storage_as_of,
1448 refresh_schedule: dataflow.refresh_schedule,
1449 debug_name: dataflow.debug_name,
1450 time_dependence: dataflow.time_dependence,
1451 };
1452
1453 if augmented_dataflow.is_transient() {
1454 tracing::debug!(
1455 name = %augmented_dataflow.debug_name,
1456 import_ids = %augmented_dataflow.display_import_ids(),
1457 export_ids = %augmented_dataflow.display_export_ids(),
1458 as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1459 until = ?augmented_dataflow.until.elements(),
1460 "creating dataflow",
1461 );
1462 } else {
1463 tracing::info!(
1464 name = %augmented_dataflow.debug_name,
1465 import_ids = %augmented_dataflow.display_import_ids(),
1466 export_ids = %augmented_dataflow.display_export_ids(),
1467 as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1468 until = ?augmented_dataflow.until.elements(),
1469 "creating dataflow",
1470 );
1471 }
1472
1473 if as_of.is_empty() {
1476 tracing::info!(
1477 name = %augmented_dataflow.debug_name,
1478 "not sending `CreateDataflow`, because of empty `as_of`",
1479 );
1480 } else {
1481 let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1482 self.send(ComputeCommand::CreateDataflow(Box::new(augmented_dataflow)));
1483
1484 for id in collections {
1485 self.maybe_schedule_collection(id);
1486 }
1487 }
1488
1489 Ok(())
1490 }
1491
1492 fn maybe_schedule_collection(&mut self, id: GlobalId) {
1498 let collection = self.expect_collection(id);
1499
1500 if collection.scheduled {
1502 return;
1503 }
1504
1505 let as_of = collection.read_frontier();
1506
1507 if as_of.is_empty() {
1510 return;
1511 }
1512
1513 let ready = if id.is_transient() {
1514 true
1520 } else {
1521 let not_self_dep = |x: &GlobalId| *x != id;
1527
1528 let mut deps_scheduled = true;
1531
1532 let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1537 let mut compute_frontiers = Vec::new();
1538 for id in compute_deps {
1539 let dep = &self.expect_collection(id);
1540 deps_scheduled &= dep.scheduled;
1541 compute_frontiers.push(dep.write_frontier());
1542 }
1543
1544 let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1545 let storage_frontiers = self
1546 .storage_collections
1547 .collections_frontiers(storage_deps.collect())
1548 .expect("must exist");
1549 let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1550
1551 let mut frontiers = compute_frontiers.into_iter().chain(storage_frontiers);
1552 let frontiers_ready =
1553 frontiers.all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1554
1555 deps_scheduled && frontiers_ready
1556 };
1557
1558 if ready {
1559 self.send(ComputeCommand::Schedule(id));
1560 let collection = self.expect_collection_mut(id);
1561 collection.scheduled = true;
1562 }
1563 }
1564
1565 fn schedule_collections(&mut self) {
1567 let ids: Vec<_> = self.collections.keys().copied().collect();
1568 for id in ids {
1569 self.maybe_schedule_collection(id);
1570 }
1571 }
1572
1573 #[mz_ore::instrument(level = "debug")]
1576 pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1577 for id in &ids {
1578 let collection = self.collection_mut(*id)?;
1579
1580 collection.dropped = true;
1582
1583 collection.implied_read_hold.release();
1586 collection.warmup_read_hold.release();
1587
1588 self.subscribes.remove(id);
1591 self.copy_tos.remove(id);
1594 }
1595
1596 Ok(())
1597 }
1598
1599 #[mz_ore::instrument(level = "debug")]
1603 pub fn peek(
1604 &mut self,
1605 peek_target: PeekTarget,
1606 literal_constraints: Option<Vec<Row>>,
1607 uuid: Uuid,
1608 timestamp: T,
1609 result_desc: RelationDesc,
1610 finishing: RowSetFinishing,
1611 map_filter_project: mz_expr::SafeMfpPlan,
1612 mut read_hold: ReadHold<T>,
1613 target_replica: Option<ReplicaId>,
1614 peek_response_tx: oneshot::Sender<PeekResponse>,
1615 ) -> Result<(), PeekError> {
1616 use PeekError::*;
1617
1618 let target_id = peek_target.id();
1619
1620 if read_hold.id() != target_id {
1622 return Err(ReadHoldIdMismatch(read_hold.id()));
1623 }
1624 read_hold
1625 .try_downgrade(Antichain::from_elem(timestamp.clone()))
1626 .map_err(|_| ReadHoldInsufficient(target_id))?;
1627
1628 if let Some(target) = target_replica {
1629 if !self.replica_exists(target) {
1630 return Err(ReplicaMissing(target));
1631 }
1632 }
1633
1634 let otel_ctx = OpenTelemetryContext::obtain();
1635
1636 self.peeks.insert(
1637 uuid,
1638 PendingPeek {
1639 target_replica,
1640 otel_ctx: otel_ctx.clone(),
1642 requested_at: Instant::now(),
1643 read_hold,
1644 peek_response_tx,
1645 limit: finishing.limit.map(usize::cast_from),
1646 offset: finishing.offset,
1647 },
1648 );
1649
1650 let peek = Peek {
1651 literal_constraints,
1652 uuid,
1653 timestamp,
1654 finishing,
1655 map_filter_project,
1656 otel_ctx,
1659 target: peek_target,
1660 result_desc,
1661 };
1662 self.send(ComputeCommand::Peek(Box::new(peek)));
1663
1664 Ok(())
1665 }
1666
1667 #[mz_ore::instrument(level = "debug")]
1669 pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1670 let Some(peek) = self.peeks.get_mut(&uuid) else {
1671 tracing::warn!("did not find pending peek for {uuid}");
1672 return;
1673 };
1674
1675 let duration = peek.requested_at.elapsed();
1676 self.metrics
1677 .observe_peek_response(&PeekResponse::Canceled, duration);
1678
1679 let otel_ctx = peek.otel_ctx.clone();
1681 otel_ctx.attach_as_parent();
1682
1683 self.deliver_response(ComputeControllerResponse::PeekNotification(
1684 uuid,
1685 PeekNotification::Canceled,
1686 otel_ctx,
1687 ));
1688
1689 self.finish_peek(uuid, reason);
1692 }
1693
1694 #[mz_ore::instrument(level = "debug")]
1706 pub fn set_read_policy(
1707 &mut self,
1708 policies: Vec<(GlobalId, ReadPolicy<T>)>,
1709 ) -> Result<(), ReadPolicyError> {
1710 for (id, _policy) in &policies {
1713 let collection = self.collection(*id)?;
1714 if collection.read_policy.is_none() {
1715 return Err(ReadPolicyError::WriteOnlyCollection(*id));
1716 }
1717 }
1718
1719 for (id, new_policy) in policies {
1720 let collection = self.expect_collection_mut(id);
1721 let new_since = new_policy.frontier(collection.write_frontier().borrow());
1722 let _ = collection.implied_read_hold.try_downgrade(new_since);
1723 collection.read_policy = Some(new_policy);
1724 }
1725
1726 Ok(())
1727 }
1728
1729 #[mz_ore::instrument(level = "debug")]
1737 fn maybe_update_global_write_frontier(&mut self, id: GlobalId, new_frontier: Antichain<T>) {
1738 let collection = self.expect_collection_mut(id);
1739
1740 let advanced = collection.shared.lock_write_frontier(|f| {
1741 let advanced = PartialOrder::less_than(f, &new_frontier);
1742 if advanced {
1743 f.clone_from(&new_frontier);
1744 }
1745 advanced
1746 });
1747
1748 if !advanced {
1749 return;
1750 }
1751
1752 let new_since = match &collection.read_policy {
1754 Some(read_policy) => {
1755 read_policy.frontier(new_frontier.borrow())
1758 }
1759 None => {
1760 Antichain::from_iter(
1769 new_frontier
1770 .iter()
1771 .map(|t| t.step_back().unwrap_or_else(T::minimum)),
1772 )
1773 }
1774 };
1775 let _ = collection.implied_read_hold.try_downgrade(new_since);
1776
1777 self.deliver_response(ComputeControllerResponse::FrontierUpper {
1779 id,
1780 upper: new_frontier,
1781 });
1782 }
1783
1784 pub(super) fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch<T>) {
1786 let Some(collection) = self.collections.get_mut(&id) else {
1787 soft_panic_or_log!(
1788 "read hold change for absent collection (id={id}, changes={update:?})"
1789 );
1790 return;
1791 };
1792
1793 let new_since = collection.shared.lock_read_capabilities(|caps| {
1794 let read_frontier = caps.frontier();
1797 for (time, diff) in update.iter() {
1798 let count = caps.count_for(time) + diff;
1799 assert!(
1800 count >= 0,
1801 "invalid read capabilities update: negative capability \
1802 (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1803 );
1804 assert!(
1805 count == 0 || read_frontier.less_equal(time),
1806 "invalid read capabilities update: frontier regression \
1807 (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1808 );
1809 }
1810
1811 let changes = caps.update_iter(update.drain());
1814
1815 let changed = changes.count() > 0;
1816 changed.then(|| caps.frontier().to_owned())
1817 });
1818
1819 let Some(new_since) = new_since else {
1820 return; };
1822
1823 for read_hold in collection.compute_dependencies.values_mut() {
1825 read_hold
1826 .try_downgrade(new_since.clone())
1827 .expect("frontiers don't regress");
1828 }
1829 for read_hold in collection.storage_dependencies.values_mut() {
1830 read_hold
1831 .try_downgrade(new_since.clone())
1832 .expect("frontiers don't regress");
1833 }
1834
1835 self.send(ComputeCommand::AllowCompaction {
1837 id,
1838 frontier: new_since,
1839 });
1840 }
1841
1842 fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1851 let Some(peek) = self.peeks.remove(&uuid) else {
1852 return;
1853 };
1854
1855 let _ = peek.peek_response_tx.send(response);
1857
1858 self.send(ComputeCommand::CancelPeek { uuid });
1861
1862 drop(peek.read_hold);
1863 }
1864
1865 fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse<T>) {
1868 if self
1870 .replicas
1871 .get(&replica_id)
1872 .filter(|replica| replica.epoch == epoch)
1873 .is_none()
1874 {
1875 return;
1876 }
1877
1878 match response {
1881 ComputeResponse::Frontiers(id, frontiers) => {
1882 self.handle_frontiers_response(id, frontiers, replica_id);
1883 }
1884 ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
1885 self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
1886 }
1887 ComputeResponse::CopyToResponse(id, response) => {
1888 self.handle_copy_to_response(id, response, replica_id);
1889 }
1890 ComputeResponse::SubscribeResponse(id, response) => {
1891 self.handle_subscribe_response(id, response, replica_id);
1892 }
1893 ComputeResponse::Status(response) => {
1894 self.handle_status_response(response, replica_id);
1895 }
1896 }
1897 }
1898
1899 fn handle_frontiers_response(
1902 &mut self,
1903 id: GlobalId,
1904 frontiers: FrontiersResponse<T>,
1905 replica_id: ReplicaId,
1906 ) {
1907 if !self.collections.contains_key(&id) {
1908 soft_panic_or_log!(
1909 "frontiers update for an unknown collection \
1910 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1911 );
1912 return;
1913 }
1914 let Some(replica) = self.replicas.get_mut(&replica_id) else {
1915 soft_panic_or_log!(
1916 "frontiers update for an unknown replica \
1917 (replica_id={replica_id}, frontiers={frontiers:?})"
1918 );
1919 return;
1920 };
1921 let Some(replica_collection) = replica.collections.get_mut(&id) else {
1922 soft_panic_or_log!(
1923 "frontiers update for an unknown replica collection \
1924 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1925 );
1926 return;
1927 };
1928
1929 if let Some(new_frontier) = frontiers.input_frontier {
1930 replica_collection.update_input_frontier(new_frontier.clone());
1931 }
1932 if let Some(new_frontier) = frontiers.output_frontier {
1933 replica_collection.update_output_frontier(new_frontier.clone());
1934 }
1935 if let Some(new_frontier) = frontiers.write_frontier {
1936 replica_collection.update_write_frontier(new_frontier.clone());
1937 self.maybe_update_global_write_frontier(id, new_frontier);
1938 }
1939 }
1940
1941 #[mz_ore::instrument(level = "debug")]
1942 fn handle_peek_response(
1943 &mut self,
1944 uuid: Uuid,
1945 response: PeekResponse,
1946 otel_ctx: OpenTelemetryContext,
1947 replica_id: ReplicaId,
1948 ) {
1949 otel_ctx.attach_as_parent();
1950
1951 let Some(peek) = self.peeks.get(&uuid) else {
1954 return;
1955 };
1956
1957 let target_replica = peek.target_replica.unwrap_or(replica_id);
1959 if target_replica != replica_id {
1960 return;
1961 }
1962
1963 let duration = peek.requested_at.elapsed();
1964 self.metrics.observe_peek_response(&response, duration);
1965
1966 let notification = PeekNotification::new(&response, peek.offset, peek.limit);
1967 self.deliver_response(ComputeControllerResponse::PeekNotification(
1970 uuid,
1971 notification,
1972 otel_ctx,
1973 ));
1974
1975 self.finish_peek(uuid, response)
1976 }
1977
1978 fn handle_copy_to_response(
1979 &mut self,
1980 sink_id: GlobalId,
1981 response: CopyToResponse,
1982 replica_id: ReplicaId,
1983 ) {
1984 if !self.collections.contains_key(&sink_id) {
1985 soft_panic_or_log!(
1986 "received response for an unknown copy-to \
1987 (sink_id={sink_id}, replica_id={replica_id})",
1988 );
1989 return;
1990 }
1991 let Some(replica) = self.replicas.get_mut(&replica_id) else {
1992 soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
1993 return;
1994 };
1995 let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
1996 soft_panic_or_log!(
1997 "copy-to response for an unknown replica collection \
1998 (sink_id={sink_id}, replica_id={replica_id})"
1999 );
2000 return;
2001 };
2002
2003 replica_collection.update_write_frontier(Antichain::new());
2007 replica_collection.update_input_frontier(Antichain::new());
2008 replica_collection.update_output_frontier(Antichain::new());
2009
2010 if !self.copy_tos.remove(&sink_id) {
2013 return;
2014 }
2015
2016 let result = match response {
2017 CopyToResponse::RowCount(count) => Ok(count),
2018 CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
2019 CopyToResponse::Dropped => {
2024 tracing::error!(
2025 %sink_id, %replica_id,
2026 "received `Dropped` response for a tracked copy to",
2027 );
2028 return;
2029 }
2030 };
2031
2032 self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2033 }
2034
2035 fn handle_subscribe_response(
2036 &mut self,
2037 subscribe_id: GlobalId,
2038 response: SubscribeResponse<T>,
2039 replica_id: ReplicaId,
2040 ) {
2041 if !self.collections.contains_key(&subscribe_id) {
2042 soft_panic_or_log!(
2043 "received response for an unknown subscribe \
2044 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2045 );
2046 return;
2047 }
2048 let Some(replica) = self.replicas.get_mut(&replica_id) else {
2049 soft_panic_or_log!(
2050 "subscribe response for an unknown replica (replica_id={replica_id})"
2051 );
2052 return;
2053 };
2054 let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2055 soft_panic_or_log!(
2056 "subscribe response for an unknown replica collection \
2057 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2058 );
2059 return;
2060 };
2061
2062 let write_frontier = match &response {
2066 SubscribeResponse::Batch(batch) => batch.upper.clone(),
2067 SubscribeResponse::DroppedAt(_) => Antichain::new(),
2068 };
2069
2070 replica_collection.update_write_frontier(write_frontier.clone());
2074 replica_collection.update_input_frontier(write_frontier.clone());
2075 replica_collection.update_output_frontier(write_frontier.clone());
2076
2077 let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2079 return;
2080 };
2081
2082 self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2088
2089 match response {
2090 SubscribeResponse::Batch(batch) => {
2091 let upper = batch.upper;
2092 let mut updates = batch.updates;
2093
2094 if PartialOrder::less_than(&subscribe.frontier, &upper) {
2097 let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2098
2099 if upper.is_empty() {
2100 self.subscribes.remove(&subscribe_id);
2102 } else {
2103 self.subscribes.insert(subscribe_id, subscribe);
2105 }
2106
2107 if let Ok(updates) = updates.as_mut() {
2108 updates.retain_mut(|updates| {
2109 let offset = updates.times().partition_point(|t| {
2110 !lower.less_equal(t)
2113 });
2114 let (_, past_lower) = std::mem::take(updates).split_at(offset);
2115 *updates = past_lower;
2116 updates.len() > 0
2117 });
2118 }
2119 self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2120 subscribe_id,
2121 SubscribeBatch {
2122 lower,
2123 upper,
2124 updates,
2125 },
2126 ));
2127 }
2128 }
2129 SubscribeResponse::DroppedAt(frontier) => {
2130 tracing::error!(
2135 %subscribe_id,
2136 %replica_id,
2137 frontier = ?frontier.elements(),
2138 "received `DroppedAt` response for a tracked subscribe",
2139 );
2140 self.subscribes.remove(&subscribe_id);
2141 }
2142 }
2143 }
2144
2145 fn handle_status_response(&self, response: StatusResponse, _replica_id: ReplicaId) {
2146 match response {
2147 StatusResponse::Placeholder => {}
2148 }
2149 }
2150
2151 fn dependency_write_frontiers<'b>(
2153 &'b self,
2154 collection: &'b CollectionState<T>,
2155 ) -> impl Iterator<Item = Antichain<T>> + 'b {
2156 let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2157 let collection = self.collections.get(&dep_id);
2158 collection.map(|c| c.write_frontier())
2159 });
2160 let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2161 let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2162 frontiers.map(|f| f.write_frontier)
2163 });
2164
2165 compute_frontiers.chain(storage_frontiers)
2166 }
2167
2168 fn transitive_storage_dependency_write_frontiers<'b>(
2170 &'b self,
2171 collection: &'b CollectionState<T>,
2172 ) -> impl Iterator<Item = Antichain<T>> + 'b {
2173 let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2174 let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2175 let mut done = BTreeSet::new();
2176
2177 while let Some(id) = todo.pop() {
2178 if done.contains(&id) {
2179 continue;
2180 }
2181 if let Some(dep) = self.collections.get(&id) {
2182 storage_ids.extend(dep.storage_dependency_ids());
2183 todo.extend(dep.compute_dependency_ids())
2184 }
2185 done.insert(id);
2186 }
2187
2188 let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2189 let frontiers = self.storage_collections.collection_frontiers(id).ok();
2190 frontiers.map(|f| f.write_frontier)
2191 });
2192
2193 storage_frontiers
2194 }
2195
2196 fn downgrade_warmup_capabilities(&mut self) {
2209 let mut new_capabilities = BTreeMap::new();
2210 for (id, collection) in &self.collections {
2211 if collection.read_policy.is_none()
2215 && collection.shared.lock_write_frontier(|f| f.is_empty())
2216 {
2217 new_capabilities.insert(*id, Antichain::new());
2218 continue;
2219 }
2220
2221 let mut new_capability = Antichain::new();
2222 for frontier in self.dependency_write_frontiers(collection) {
2223 for time in frontier {
2224 new_capability.insert(time.step_back().unwrap_or(time));
2225 }
2226 }
2227
2228 new_capabilities.insert(*id, new_capability);
2229 }
2230
2231 for (id, new_capability) in new_capabilities {
2232 let collection = self.expect_collection_mut(id);
2233 let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2234 }
2235 }
2236
2237 fn forward_implied_capabilities(&mut self) {
2265 if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2266 return;
2267 }
2268 if !self.replicas.is_empty() {
2269 return;
2270 }
2271
2272 let mut new_capabilities = BTreeMap::new();
2273 for (id, collection) in &self.collections {
2274 let Some(read_policy) = &collection.read_policy else {
2275 continue;
2277 };
2278
2279 let mut dep_frontier = Antichain::new();
2283 for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2284 dep_frontier.extend(frontier);
2285 }
2286
2287 let new_capability = read_policy.frontier(dep_frontier.borrow());
2288 if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2289 new_capabilities.insert(*id, new_capability);
2290 }
2291 }
2292
2293 for (id, new_capability) in new_capabilities {
2294 let collection = self.expect_collection_mut(id);
2295 let _ = collection.implied_read_hold.try_downgrade(new_capability);
2296 }
2297 }
2298
2299 pub(super) fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
2304 let collection = self.collection(id)?;
2310 let since = collection.shared.lock_read_capabilities(|caps| {
2311 let since = caps.frontier().to_owned();
2312 caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
2313 since
2314 });
2315 let hold = ReadHold::new(id, since, Arc::clone(&self.read_hold_tx));
2316 Ok(hold)
2317 }
2318
2319 #[mz_ore::instrument(level = "debug")]
2325 pub fn maintain(&mut self) {
2326 self.rehydrate_failed_replicas();
2327 self.downgrade_warmup_capabilities();
2328 self.forward_implied_capabilities();
2329 self.schedule_collections();
2330 self.cleanup_collections();
2331 self.update_frontier_introspection();
2332 self.refresh_state_metrics();
2333 self.refresh_wallclock_lag();
2334 }
2335}
2336
2337#[derive(Debug)]
2342struct CollectionState<T: ComputeControllerTimestamp> {
2343 target_replica: Option<ReplicaId>,
2345 log_collection: bool,
2349 dropped: bool,
2355 scheduled: bool,
2358
2359 read_only: bool,
2363
2364 shared: SharedCollectionState<T>,
2366
2367 implied_read_hold: ReadHold<T>,
2374 warmup_read_hold: ReadHold<T>,
2382 read_policy: Option<ReadPolicy<T>>,
2388
2389 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2392 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2395
2396 introspection: CollectionIntrospection<T>,
2398
2399 wallclock_lag_histogram_stash: Option<
2406 BTreeMap<
2407 (
2408 WallclockLagHistogramPeriod,
2409 WallclockLag,
2410 BTreeMap<&'static str, String>,
2411 ),
2412 Diff,
2413 >,
2414 >,
2415}
2416
2417impl<T: ComputeControllerTimestamp> CollectionState<T> {
2418 fn new(
2420 collection_id: GlobalId,
2421 as_of: Antichain<T>,
2422 shared: SharedCollectionState<T>,
2423 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2424 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2425 read_hold_tx: read_holds::ChangeTx<T>,
2426 introspection: CollectionIntrospection<T>,
2427 ) -> Self {
2428 let since = as_of.clone();
2430 let upper = as_of;
2432
2433 assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2435 assert!(shared.lock_write_frontier(|f| f == &upper));
2436
2437 let implied_read_hold =
2441 ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2442 let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2443
2444 let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2445 shared.lock_read_capabilities(|c| {
2446 c.update_iter(updates);
2447 });
2448
2449 let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2453 true => None,
2454 false => Some(Default::default()),
2455 };
2456
2457 Self {
2458 target_replica: None,
2459 log_collection: false,
2460 dropped: false,
2461 scheduled: false,
2462 read_only: true,
2463 shared,
2464 implied_read_hold,
2465 warmup_read_hold,
2466 read_policy: Some(ReadPolicy::ValidFrom(since)),
2467 storage_dependencies,
2468 compute_dependencies,
2469 introspection,
2470 wallclock_lag_histogram_stash,
2471 }
2472 }
2473
2474 fn new_log_collection(
2476 id: GlobalId,
2477 shared: SharedCollectionState<T>,
2478 read_hold_tx: read_holds::ChangeTx<T>,
2479 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2480 ) -> Self {
2481 let since = Antichain::from_elem(T::minimum());
2482 let introspection = CollectionIntrospection::new(
2483 id,
2484 introspection_tx,
2485 since.clone(),
2486 false,
2487 None,
2488 None,
2489 Vec::new(),
2490 );
2491 let mut state = Self::new(
2492 id,
2493 since,
2494 shared,
2495 Default::default(),
2496 Default::default(),
2497 read_hold_tx,
2498 introspection,
2499 );
2500 state.log_collection = true;
2501 state.scheduled = true;
2503 state
2504 }
2505
2506 fn read_frontier(&self) -> Antichain<T> {
2508 self.shared
2509 .lock_read_capabilities(|c| c.frontier().to_owned())
2510 }
2511
2512 fn write_frontier(&self) -> Antichain<T> {
2514 self.shared.lock_write_frontier(|f| f.clone())
2515 }
2516
2517 fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2518 self.storage_dependencies.keys().copied()
2519 }
2520
2521 fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2522 self.compute_dependencies.keys().copied()
2523 }
2524}
2525
2526#[derive(Clone, Debug)]
2537pub(super) struct SharedCollectionState<T> {
2538 read_capabilities: Arc<Mutex<MutableAntichain<T>>>,
2551 write_frontier: Arc<Mutex<Antichain<T>>>,
2553}
2554
2555impl<T: Timestamp> SharedCollectionState<T> {
2556 pub fn new(as_of: Antichain<T>) -> Self {
2557 let since = as_of.clone();
2559 let upper = as_of;
2561
2562 let mut read_capabilities = MutableAntichain::new();
2566 read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2567
2568 Self {
2569 read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2570 write_frontier: Arc::new(Mutex::new(upper)),
2571 }
2572 }
2573
2574 pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2575 where
2576 F: FnOnce(&mut MutableAntichain<T>) -> R,
2577 {
2578 let mut caps = self.read_capabilities.lock().expect("poisoned");
2579 f(&mut *caps)
2580 }
2581
2582 pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2583 where
2584 F: FnOnce(&mut Antichain<T>) -> R,
2585 {
2586 let mut frontier = self.write_frontier.lock().expect("poisoned");
2587 f(&mut *frontier)
2588 }
2589}
2590
2591#[derive(Debug)]
2594struct CollectionIntrospection<T: ComputeControllerTimestamp> {
2595 collection_id: GlobalId,
2597 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2599 frontiers: Option<FrontiersIntrospectionState<T>>,
2604 refresh: Option<RefreshIntrospectionState<T>>,
2608 dependency_ids: Vec<GlobalId>,
2610}
2611
2612impl<T: ComputeControllerTimestamp> CollectionIntrospection<T> {
2613 fn new(
2614 collection_id: GlobalId,
2615 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2616 as_of: Antichain<T>,
2617 storage_sink: bool,
2618 initial_as_of: Option<Antichain<T>>,
2619 refresh_schedule: Option<RefreshSchedule>,
2620 dependency_ids: Vec<GlobalId>,
2621 ) -> Self {
2622 let refresh =
2623 match (refresh_schedule, initial_as_of) {
2624 (Some(refresh_schedule), Some(initial_as_of)) => Some(
2625 RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2626 ),
2627 (refresh_schedule, _) => {
2628 soft_assert_or_log!(
2631 refresh_schedule.is_none(),
2632 "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2633 );
2634 None
2635 }
2636 };
2637 let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2638
2639 let self_ = Self {
2640 collection_id,
2641 introspection_tx,
2642 frontiers,
2643 refresh,
2644 dependency_ids,
2645 };
2646
2647 self_.report_initial_state();
2648 self_
2649 }
2650
2651 fn report_initial_state(&self) {
2653 if let Some(frontiers) = &self.frontiers {
2654 let row = frontiers.row_for_collection(self.collection_id);
2655 let updates = vec![(row, Diff::ONE)];
2656 self.send(IntrospectionType::Frontiers, updates);
2657 }
2658
2659 if let Some(refresh) = &self.refresh {
2660 let row = refresh.row_for_collection(self.collection_id);
2661 let updates = vec![(row, Diff::ONE)];
2662 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2663 }
2664
2665 if !self.dependency_ids.is_empty() {
2666 let updates = self.dependency_rows(Diff::ONE);
2667 self.send(IntrospectionType::ComputeDependencies, updates);
2668 }
2669 }
2670
2671 fn dependency_rows(&self, diff: Diff) -> Vec<(Row, Diff)> {
2673 self.dependency_ids
2674 .iter()
2675 .map(|dependency_id| {
2676 let row = Row::pack_slice(&[
2677 Datum::String(&self.collection_id.to_string()),
2678 Datum::String(&dependency_id.to_string()),
2679 ]);
2680 (row, diff)
2681 })
2682 .collect()
2683 }
2684
2685 fn observe_frontiers(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2688 self.update_frontier_introspection(read_frontier, write_frontier);
2689 self.update_refresh_introspection(write_frontier);
2690 }
2691
2692 fn update_frontier_introspection(
2693 &mut self,
2694 read_frontier: &Antichain<T>,
2695 write_frontier: &Antichain<T>,
2696 ) {
2697 let Some(frontiers) = &mut self.frontiers else {
2698 return;
2699 };
2700
2701 if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2702 {
2703 return; };
2705
2706 let retraction = frontiers.row_for_collection(self.collection_id);
2707 frontiers.update(read_frontier, write_frontier);
2708 let insertion = frontiers.row_for_collection(self.collection_id);
2709 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2710 self.send(IntrospectionType::Frontiers, updates);
2711 }
2712
2713 fn update_refresh_introspection(&mut self, write_frontier: &Antichain<T>) {
2714 let Some(refresh) = &mut self.refresh else {
2715 return;
2716 };
2717
2718 let retraction = refresh.row_for_collection(self.collection_id);
2719 refresh.frontier_update(write_frontier);
2720 let insertion = refresh.row_for_collection(self.collection_id);
2721
2722 if retraction == insertion {
2723 return; }
2725
2726 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2727 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2728 }
2729
2730 fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2731 let _ = self.introspection_tx.send((introspection_type, updates));
2734 }
2735}
2736
2737impl<T: ComputeControllerTimestamp> Drop for CollectionIntrospection<T> {
2738 fn drop(&mut self) {
2739 if let Some(frontiers) = &self.frontiers {
2741 let row = frontiers.row_for_collection(self.collection_id);
2742 let updates = vec![(row, Diff::MINUS_ONE)];
2743 self.send(IntrospectionType::Frontiers, updates);
2744 }
2745
2746 if let Some(refresh) = &self.refresh {
2748 let retraction = refresh.row_for_collection(self.collection_id);
2749 let updates = vec![(retraction, Diff::MINUS_ONE)];
2750 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2751 }
2752
2753 if !self.dependency_ids.is_empty() {
2755 let updates = self.dependency_rows(Diff::MINUS_ONE);
2756 self.send(IntrospectionType::ComputeDependencies, updates);
2757 }
2758 }
2759}
2760
2761#[derive(Debug)]
2762struct FrontiersIntrospectionState<T> {
2763 read_frontier: Antichain<T>,
2764 write_frontier: Antichain<T>,
2765}
2766
2767impl<T: ComputeControllerTimestamp> FrontiersIntrospectionState<T> {
2768 fn new(as_of: Antichain<T>) -> Self {
2769 Self {
2770 read_frontier: as_of.clone(),
2771 write_frontier: as_of,
2772 }
2773 }
2774
2775 fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2777 let read_frontier = self
2778 .read_frontier
2779 .as_option()
2780 .map_or(Datum::Null, |ts| ts.clone().into());
2781 let write_frontier = self
2782 .write_frontier
2783 .as_option()
2784 .map_or(Datum::Null, |ts| ts.clone().into());
2785 Row::pack_slice(&[
2786 Datum::String(&collection_id.to_string()),
2787 read_frontier,
2788 write_frontier,
2789 ])
2790 }
2791
2792 fn update(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2794 if read_frontier != &self.read_frontier {
2795 self.read_frontier.clone_from(read_frontier);
2796 }
2797 if write_frontier != &self.write_frontier {
2798 self.write_frontier.clone_from(write_frontier);
2799 }
2800 }
2801}
2802
2803#[derive(Debug)]
2806struct RefreshIntrospectionState<T> {
2807 refresh_schedule: RefreshSchedule,
2809 initial_as_of: Antichain<T>,
2810 next_refresh: Datum<'static>, last_completed_refresh: Datum<'static>, }
2814
2815impl<T> RefreshIntrospectionState<T> {
2816 fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2818 Row::pack_slice(&[
2819 Datum::String(&collection_id.to_string()),
2820 self.last_completed_refresh,
2821 self.next_refresh,
2822 ])
2823 }
2824}
2825
2826impl<T: ComputeControllerTimestamp> RefreshIntrospectionState<T> {
2827 fn new(
2830 refresh_schedule: RefreshSchedule,
2831 initial_as_of: Antichain<T>,
2832 upper: &Antichain<T>,
2833 ) -> Self {
2834 let mut self_ = Self {
2835 refresh_schedule: refresh_schedule.clone(),
2836 initial_as_of: initial_as_of.clone(),
2837 next_refresh: Datum::Null,
2838 last_completed_refresh: Datum::Null,
2839 };
2840 self_.frontier_update(upper);
2841 self_
2842 }
2843
2844 fn frontier_update(&mut self, write_frontier: &Antichain<T>) {
2847 if write_frontier.is_empty() {
2848 self.last_completed_refresh =
2849 if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2850 last_refresh.into()
2851 } else {
2852 T::maximum().into()
2855 };
2856 self.next_refresh = Datum::Null;
2857 } else {
2858 if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2859 self.last_completed_refresh = Datum::Null;
2861 let initial_as_of = self.initial_as_of.as_option().expect(
2862 "initial_as_of can't be [], because then there would be no refreshes at all",
2863 );
2864 let first_refresh = initial_as_of
2865 .round_up(&self.refresh_schedule)
2866 .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2867 soft_assert_or_log!(
2868 first_refresh == *initial_as_of,
2869 "initial_as_of should be set to the first refresh"
2870 );
2871 self.next_refresh = first_refresh.into();
2872 } else {
2873 let write_frontier = write_frontier.as_option().expect("checked above");
2875 self.last_completed_refresh = write_frontier
2876 .round_down_minus_1(&self.refresh_schedule)
2877 .map_or_else(
2878 || {
2879 soft_panic_or_log!(
2880 "rounding down should have returned the first refresh or later"
2881 );
2882 Datum::Null
2883 },
2884 |last_completed_refresh| last_completed_refresh.into(),
2885 );
2886 self.next_refresh = write_frontier.clone().into();
2887 }
2888 }
2889 }
2890}
2891
2892#[derive(Debug)]
2894struct PendingPeek<T: Timestamp> {
2895 target_replica: Option<ReplicaId>,
2899 otel_ctx: OpenTelemetryContext,
2901 requested_at: Instant,
2905 read_hold: ReadHold<T>,
2907 peek_response_tx: oneshot::Sender<PeekResponse>,
2909 limit: Option<usize>,
2911 offset: usize,
2913}
2914
2915#[derive(Debug, Clone)]
2916struct ActiveSubscribe<T> {
2917 frontier: Antichain<T>,
2919}
2920
2921impl<T: ComputeControllerTimestamp> Default for ActiveSubscribe<T> {
2922 fn default() -> Self {
2923 Self {
2924 frontier: Antichain::from_elem(T::minimum()),
2925 }
2926 }
2927}
2928
2929#[derive(Debug)]
2931struct ReplicaState<T: ComputeControllerTimestamp> {
2932 id: ReplicaId,
2934 client: ReplicaClient<T>,
2936 config: ReplicaConfig,
2938 metrics: ReplicaMetrics,
2940 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2942 collections: BTreeMap<GlobalId, ReplicaCollectionState<T>>,
2944 epoch: u64,
2946}
2947
2948impl<T: ComputeControllerTimestamp> ReplicaState<T> {
2949 fn new(
2950 id: ReplicaId,
2951 client: ReplicaClient<T>,
2952 config: ReplicaConfig,
2953 metrics: ReplicaMetrics,
2954 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2955 epoch: u64,
2956 ) -> Self {
2957 Self {
2958 id,
2959 client,
2960 config,
2961 metrics,
2962 introspection_tx,
2963 epoch,
2964 collections: Default::default(),
2965 }
2966 }
2967
2968 fn add_collection(
2974 &mut self,
2975 id: GlobalId,
2976 as_of: Antichain<T>,
2977 input_read_holds: Vec<ReadHold<T>>,
2978 ) {
2979 let metrics = self.metrics.for_collection(id);
2980 let introspection = ReplicaCollectionIntrospection::new(
2981 self.id,
2982 id,
2983 self.introspection_tx.clone(),
2984 as_of.clone(),
2985 );
2986 let mut state =
2987 ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
2988
2989 if id.is_transient() {
2993 state.wallclock_lag_max = None;
2994 }
2995
2996 if let Some(previous) = self.collections.insert(id, state) {
2997 panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
2998 }
2999 }
3000
3001 fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState<T>> {
3003 self.collections.remove(&id)
3004 }
3005
3006 fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
3008 self.collections.get(&id).map_or(true, |c| {
3009 c.write_frontier.is_empty()
3010 && c.input_frontier.is_empty()
3011 && c.output_frontier.is_empty()
3012 })
3013 }
3014
3015 #[mz_ore::instrument(level = "debug")]
3019 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3020 let Self {
3027 id,
3028 client: _,
3029 config: _,
3030 metrics: _,
3031 introspection_tx: _,
3032 epoch,
3033 collections,
3034 } = self;
3035
3036 let collections: BTreeMap<_, _> = collections
3037 .iter()
3038 .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
3039 .collect();
3040
3041 Ok(serde_json::json!({
3042 "id": id.to_string(),
3043 "collections": collections,
3044 "epoch": epoch,
3045 }))
3046 }
3047}
3048
3049#[derive(Debug)]
3050struct ReplicaCollectionState<T: ComputeControllerTimestamp> {
3051 write_frontier: Antichain<T>,
3055 input_frontier: Antichain<T>,
3059 output_frontier: Antichain<T>,
3063
3064 metrics: Option<ReplicaCollectionMetrics>,
3068 as_of: Antichain<T>,
3070 introspection: ReplicaCollectionIntrospection<T>,
3072 input_read_holds: Vec<ReadHold<T>>,
3078
3079 wallclock_lag_max: Option<WallclockLag>,
3083}
3084
3085impl<T: ComputeControllerTimestamp> ReplicaCollectionState<T> {
3086 fn new(
3087 metrics: Option<ReplicaCollectionMetrics>,
3088 as_of: Antichain<T>,
3089 introspection: ReplicaCollectionIntrospection<T>,
3090 input_read_holds: Vec<ReadHold<T>>,
3091 ) -> Self {
3092 Self {
3093 write_frontier: as_of.clone(),
3094 input_frontier: as_of.clone(),
3095 output_frontier: as_of.clone(),
3096 metrics,
3097 as_of,
3098 introspection,
3099 input_read_holds,
3100 wallclock_lag_max: Some(WallclockLag::MIN),
3101 }
3102 }
3103
3104 fn hydrated(&self) -> bool {
3106 self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3122 }
3123
3124 fn update_write_frontier(&mut self, new_frontier: Antichain<T>) {
3126 if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3127 soft_panic_or_log!(
3128 "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3129 self.write_frontier,
3130 );
3131 return;
3132 } else if new_frontier == self.write_frontier {
3133 return;
3134 }
3135
3136 self.write_frontier = new_frontier;
3137 }
3138
3139 fn update_input_frontier(&mut self, new_frontier: Antichain<T>) {
3141 if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3142 soft_panic_or_log!(
3143 "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3144 self.input_frontier,
3145 );
3146 return;
3147 } else if new_frontier == self.input_frontier {
3148 return;
3149 }
3150
3151 self.input_frontier = new_frontier;
3152
3153 for read_hold in &mut self.input_read_holds {
3155 let result = read_hold.try_downgrade(self.input_frontier.clone());
3156 soft_assert_or_log!(
3157 result.is_ok(),
3158 "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3159 self.input_frontier,
3160 );
3161 }
3162 }
3163
3164 fn update_output_frontier(&mut self, new_frontier: Antichain<T>) {
3166 if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3167 soft_panic_or_log!(
3168 "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3169 self.output_frontier,
3170 );
3171 return;
3172 } else if new_frontier == self.output_frontier {
3173 return;
3174 }
3175
3176 self.output_frontier = new_frontier;
3177 }
3178}
3179
3180#[derive(Debug)]
3183struct ReplicaCollectionIntrospection<T: ComputeControllerTimestamp> {
3184 replica_id: ReplicaId,
3186 collection_id: GlobalId,
3188 write_frontier: Antichain<T>,
3190 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3192}
3193
3194impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
3195 fn new(
3197 replica_id: ReplicaId,
3198 collection_id: GlobalId,
3199 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3200 as_of: Antichain<T>,
3201 ) -> Self {
3202 let self_ = Self {
3203 replica_id,
3204 collection_id,
3205 write_frontier: as_of,
3206 introspection_tx,
3207 };
3208
3209 self_.report_initial_state();
3210 self_
3211 }
3212
3213 fn report_initial_state(&self) {
3215 let row = self.write_frontier_row();
3216 let updates = vec![(row, Diff::ONE)];
3217 self.send(IntrospectionType::ReplicaFrontiers, updates);
3218 }
3219
3220 fn observe_frontier(&mut self, write_frontier: &Antichain<T>) {
3222 if self.write_frontier == *write_frontier {
3223 return; }
3225
3226 let retraction = self.write_frontier_row();
3227 self.write_frontier.clone_from(write_frontier);
3228 let insertion = self.write_frontier_row();
3229
3230 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3231 self.send(IntrospectionType::ReplicaFrontiers, updates);
3232 }
3233
3234 fn write_frontier_row(&self) -> Row {
3236 let write_frontier = self
3237 .write_frontier
3238 .as_option()
3239 .map_or(Datum::Null, |ts| ts.clone().into());
3240 Row::pack_slice(&[
3241 Datum::String(&self.collection_id.to_string()),
3242 Datum::String(&self.replica_id.to_string()),
3243 write_frontier,
3244 ])
3245 }
3246
3247 fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3248 let _ = self.introspection_tx.send((introspection_type, updates));
3251 }
3252}
3253
3254impl<T: ComputeControllerTimestamp> Drop for ReplicaCollectionIntrospection<T> {
3255 fn drop(&mut self) {
3256 let row = self.write_frontier_row();
3258 let updates = vec![(row, Diff::MINUS_ONE)];
3259 self.send(IntrospectionType::ReplicaFrontiers, updates);
3260 }
3261}