1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
21use mz_compute_types::plan::render_plan::RenderPlan;
22use mz_compute_types::sinks::{
23 ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, MaterializedViewSinkConnection,
24};
25use mz_compute_types::sources::SourceInstanceDesc;
26use mz_controller_types::dyncfgs::{
27 ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, WALLCLOCK_LAG_RECORDING_INTERVAL,
28};
29use mz_dyncfg::ConfigSet;
30use mz_expr::RowSetFinishing;
31use mz_ore::cast::CastFrom;
32use mz_ore::channel::instrumented_unbounded_channel;
33use mz_ore::now::NowFn;
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{soft_assert_or_log, soft_panic_or_log};
36use mz_persist_types::PersistLocation;
37use mz_repr::adt::timestamp::CheckedTimestamp;
38use mz_repr::refresh_schedule::RefreshSchedule;
39use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row};
40use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
41use mz_storage_types::read_holds::{self, ReadHold};
42use mz_storage_types::read_policy::ReadPolicy;
43use thiserror::Error;
44use timely::PartialOrder;
45use timely::progress::frontier::MutableAntichain;
46use timely::progress::{Antichain, ChangeBatch, Timestamp};
47use tokio::sync::{mpsc, oneshot};
48use uuid::Uuid;
49
50use crate::controller::error::{
51 CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
52};
53use crate::controller::instance_client::PeekError;
54use crate::controller::replica::{ReplicaClient, ReplicaConfig};
55use crate::controller::{
56 ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, PeekNotification,
57 ReplicaId, StorageCollections,
58};
59use crate::logging::LogVariant;
60use crate::metrics::IntCounter;
61use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
62use crate::protocol::command::{
63 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
64};
65use crate::protocol::history::ComputeCommandHistory;
66use crate::protocol::response::{
67 ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StatusResponse,
68 SubscribeBatch, SubscribeResponse,
69};
70
71#[derive(Error, Debug)]
72#[error("replica exists already: {0}")]
73pub(super) struct ReplicaExists(pub ReplicaId);
74
75#[derive(Error, Debug)]
76#[error("replica does not exist: {0}")]
77pub(super) struct ReplicaMissing(pub ReplicaId);
78
79#[derive(Error, Debug)]
80pub(super) enum DataflowCreationError {
81 #[error("collection does not exist: {0}")]
82 CollectionMissing(GlobalId),
83 #[error("replica does not exist: {0}")]
84 ReplicaMissing(ReplicaId),
85 #[error("dataflow definition lacks an as_of value")]
86 MissingAsOf,
87 #[error("subscribe dataflow has an empty as_of")]
88 EmptyAsOfForSubscribe,
89 #[error("copy to dataflow has an empty as_of")]
90 EmptyAsOfForCopyTo,
91 #[error("no read hold provided for dataflow import: {0}")]
92 ReadHoldMissing(GlobalId),
93 #[error("insufficient read hold provided for dataflow import: {0}")]
94 ReadHoldInsufficient(GlobalId),
95}
96
97impl From<CollectionMissing> for DataflowCreationError {
98 fn from(error: CollectionMissing) -> Self {
99 Self::CollectionMissing(error.0)
100 }
101}
102
103#[derive(Error, Debug)]
104pub(super) enum ReadPolicyError {
105 #[error("collection does not exist: {0}")]
106 CollectionMissing(GlobalId),
107 #[error("collection is write-only: {0}")]
108 WriteOnlyCollection(GlobalId),
109}
110
111impl From<CollectionMissing> for ReadPolicyError {
112 fn from(error: CollectionMissing) -> Self {
113 Self::CollectionMissing(error.0)
114 }
115}
116
117pub(super) type Command<T> = Box<dyn FnOnce(&mut Instance<T>) + Send>;
119
120pub(super) type ReplicaResponse<T> = (ReplicaId, u64, ComputeResponse<T>);
123
124pub(super) struct Instance<T: ComputeControllerTimestamp> {
126 build_info: &'static BuildInfo,
128 storage_collections: StorageCollections<T>,
130 initialized: bool,
132 read_only: bool,
137 workload_class: Option<String>,
141 replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
143 collections: BTreeMap<GlobalId, CollectionState<T>>,
151 log_sources: BTreeMap<LogVariant, GlobalId>,
153 peeks: BTreeMap<Uuid, PendingPeek<T>>,
162 subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>,
176 copy_tos: BTreeSet<GlobalId>,
184 history: ComputeCommandHistory<UIntGauge, T>,
186 command_rx: mpsc::UnboundedReceiver<Command<T>>,
188 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
190 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
192 metrics: InstanceMetrics,
194 dyncfg: Arc<ConfigSet>,
196
197 peek_stash_persist_location: PersistLocation,
199
200 now: NowFn,
202 wallclock_lag: WallclockLagFn<T>,
204 wallclock_lag_last_recorded: DateTime<Utc>,
206
207 read_hold_tx: read_holds::ChangeTx<T>,
212 replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
214 replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse<T>, IntCounter>,
216}
217
218impl<T: ComputeControllerTimestamp> Instance<T> {
219 fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, CollectionMissing> {
221 self.collections.get(&id).ok_or(CollectionMissing(id))
222 }
223
224 fn collection_mut(
226 &mut self,
227 id: GlobalId,
228 ) -> Result<&mut CollectionState<T>, CollectionMissing> {
229 self.collections.get_mut(&id).ok_or(CollectionMissing(id))
230 }
231
232 fn expect_collection(&self, id: GlobalId) -> &CollectionState<T> {
238 self.collections.get(&id).expect("collection must exist")
239 }
240
241 fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T> {
247 self.collections
248 .get_mut(&id)
249 .expect("collection must exist")
250 }
251
252 fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState<T>)> {
253 self.collections.iter().map(|(id, coll)| (*id, coll))
254 }
255
256 fn add_collection(
262 &mut self,
263 id: GlobalId,
264 as_of: Antichain<T>,
265 shared: SharedCollectionState<T>,
266 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
267 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
268 replica_input_read_holds: Vec<ReadHold<T>>,
269 write_only: bool,
270 storage_sink: bool,
271 initial_as_of: Option<Antichain<T>>,
272 refresh_schedule: Option<RefreshSchedule>,
273 target_replica: Option<ReplicaId>,
274 ) {
275 let dependency_ids: Vec<GlobalId> = compute_dependencies
277 .keys()
278 .chain(storage_dependencies.keys())
279 .copied()
280 .collect();
281 let introspection = CollectionIntrospection::new(
282 id,
283 self.introspection_tx.clone(),
284 as_of.clone(),
285 storage_sink,
286 initial_as_of,
287 refresh_schedule,
288 dependency_ids,
289 );
290 let mut state = CollectionState::new(
291 id,
292 as_of.clone(),
293 shared,
294 storage_dependencies,
295 compute_dependencies,
296 Arc::clone(&self.read_hold_tx),
297 introspection,
298 );
299 state.target_replica = target_replica;
300 if write_only {
302 state.read_policy = None;
303 }
304
305 if let Some(previous) = self.collections.insert(id, state) {
306 panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
307 }
308
309 for replica in self.replicas.values_mut() {
311 if target_replica.is_some_and(|id| id != replica.id) {
312 continue;
313 }
314 replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
315 }
316 }
317
318 fn remove_collection(&mut self, id: GlobalId) {
319 for replica in self.replicas.values_mut() {
321 replica.remove_collection(id);
322 }
323
324 self.collections.remove(&id);
326 }
327
328 fn add_replica_state(
329 &mut self,
330 id: ReplicaId,
331 client: ReplicaClient<T>,
332 config: ReplicaConfig,
333 epoch: u64,
334 ) {
335 let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
336
337 let metrics = self.metrics.for_replica(id);
338 let mut replica = ReplicaState::new(
339 id,
340 client,
341 config,
342 metrics,
343 self.introspection_tx.clone(),
344 epoch,
345 );
346
347 for (collection_id, collection) in &self.collections {
349 if (collection.log_collection && !log_ids.contains(collection_id))
352 || collection.target_replica.is_some_and(|rid| rid != id)
353 {
354 continue;
355 }
356
357 let as_of = if collection.log_collection {
358 Antichain::from_elem(T::minimum())
363 } else {
364 collection.read_frontier().to_owned()
365 };
366
367 let input_read_holds = collection.storage_dependencies.values().cloned().collect();
368 replica.add_collection(*collection_id, as_of, input_read_holds);
369 }
370
371 self.replicas.insert(id, replica);
372 }
373
374 fn deliver_response(&self, response: ComputeControllerResponse<T>) {
376 let _ = self.response_tx.send(response);
379 }
380
381 fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
383 let _ = self.introspection_tx.send((type_, updates));
386 }
387
388 fn replica_exists(&self, id: ReplicaId) -> bool {
390 self.replicas.contains_key(&id)
391 }
392
393 fn peeks_targeting(
395 &self,
396 replica_id: ReplicaId,
397 ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)> {
398 self.peeks.iter().filter_map(move |(uuid, peek)| {
399 if peek.target_replica == Some(replica_id) {
400 Some((*uuid, peek))
401 } else {
402 None
403 }
404 })
405 }
406
407 fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
409 self.subscribes.keys().copied().filter(move |id| {
410 let collection = self.expect_collection(*id);
411 collection.target_replica == Some(replica_id)
412 })
413 }
414
415 fn update_frontier_introspection(&mut self) {
424 for collection in self.collections.values_mut() {
425 collection
426 .introspection
427 .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
428 }
429
430 for replica in self.replicas.values_mut() {
431 for collection in replica.collections.values_mut() {
432 collection
433 .introspection
434 .observe_frontier(&collection.write_frontier);
435 }
436 }
437 }
438
439 fn refresh_state_metrics(&self) {
448 let unscheduled_collections_count =
449 self.collections.values().filter(|c| !c.scheduled).count();
450 let connected_replica_count = self
451 .replicas
452 .values()
453 .filter(|r| r.client.is_connected())
454 .count();
455
456 self.metrics
457 .replica_count
458 .set(u64::cast_from(self.replicas.len()));
459 self.metrics
460 .collection_count
461 .set(u64::cast_from(self.collections.len()));
462 self.metrics
463 .collection_unscheduled_count
464 .set(u64::cast_from(unscheduled_collections_count));
465 self.metrics
466 .peek_count
467 .set(u64::cast_from(self.peeks.len()));
468 self.metrics
469 .subscribe_count
470 .set(u64::cast_from(self.subscribes.len()));
471 self.metrics
472 .copy_to_count
473 .set(u64::cast_from(self.copy_tos.len()));
474 self.metrics
475 .connected_replica_count
476 .set(u64::cast_from(connected_replica_count));
477 }
478
479 fn refresh_wallclock_lag(&mut self) {
498 let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
499 Some(ts) => (self.wallclock_lag)(ts.clone()),
500 None => Duration::ZERO,
501 };
502
503 let now_ms = (self.now)();
504 let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
505 let histogram_labels = match &self.workload_class {
506 Some(wc) => [("workload_class", wc.clone())].into(),
507 None => BTreeMap::new(),
508 };
509
510 let readable_storage_collections: BTreeSet<_> = self
513 .collections
514 .keys()
515 .filter_map(|id| {
516 let frontiers = self.storage_collections.collection_frontiers(*id).ok()?;
517 PartialOrder::less_than(&frontiers.read_capabilities, &frontiers.write_frontier)
518 .then_some(*id)
519 })
520 .collect();
521
522 for (id, collection) in &mut self.collections {
524 let write_frontier = collection.write_frontier();
525 let readable = if self.storage_collections.check_exists(*id).is_ok() {
526 readable_storage_collections.contains(id)
527 } else {
528 PartialOrder::less_than(&collection.read_frontier(), &write_frontier)
529 };
530
531 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
532 let bucket = if readable {
533 let lag = frontier_lag(&write_frontier);
534 let lag = lag.as_secs().next_power_of_two();
535 WallclockLag::Seconds(lag)
536 } else {
537 WallclockLag::Undefined
538 };
539
540 let key = (histogram_period, bucket, histogram_labels.clone());
541 *stash.entry(key).or_default() += Diff::ONE;
542 }
543 }
544
545 for replica in self.replicas.values_mut() {
547 for (id, collection) in &mut replica.collections {
548 let readable = readable_storage_collections.contains(id) || collection.hydrated();
553
554 let lag = if readable {
555 let lag = frontier_lag(&collection.write_frontier);
556 WallclockLag::Seconds(lag.as_secs())
557 } else {
558 WallclockLag::Undefined
559 };
560
561 if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
562 *wallclock_lag_max = (*wallclock_lag_max).max(lag);
563 }
564
565 if let Some(metrics) = &mut collection.metrics {
566 let secs = lag.unwrap_seconds_or(u64::MAX);
569 metrics.wallclock_lag.observe(secs);
570 };
571 }
572 }
573
574 self.maybe_record_wallclock_lag();
576 }
577
578 fn maybe_record_wallclock_lag(&mut self) {
586 if self.read_only {
587 return;
588 }
589
590 let duration_trunc = |datetime: DateTime<_>, interval| {
591 let td = TimeDelta::from_std(interval).ok()?;
592 datetime.duration_trunc(td).ok()
593 };
594
595 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
596 let now_dt = mz_ore::now::to_datetime((self.now)());
597 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
598 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
599 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
600 duration_trunc(now_dt, *default).unwrap()
601 });
602 if now_trunc <= self.wallclock_lag_last_recorded {
603 return;
604 }
605
606 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
607
608 let mut history_updates = Vec::new();
609 for (replica_id, replica) in &mut self.replicas {
610 for (collection_id, collection) in &mut replica.collections {
611 let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
612 continue;
613 };
614
615 let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
616 let row = Row::pack_slice(&[
617 Datum::String(&collection_id.to_string()),
618 Datum::String(&replica_id.to_string()),
619 max_lag.into_interval_datum(),
620 Datum::TimestampTz(now_ts),
621 ]);
622 history_updates.push((row, Diff::ONE));
623 }
624 }
625 if !history_updates.is_empty() {
626 self.deliver_introspection_updates(
627 IntrospectionType::WallclockLagHistory,
628 history_updates,
629 );
630 }
631
632 let mut histogram_updates = Vec::new();
633 let mut row_buf = Row::default();
634 for (collection_id, collection) in &mut self.collections {
635 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
636 continue;
637 };
638
639 for ((period, lag, labels), count) in std::mem::take(stash) {
640 let mut packer = row_buf.packer();
641 packer.extend([
642 Datum::TimestampTz(period.start),
643 Datum::TimestampTz(period.end),
644 Datum::String(&collection_id.to_string()),
645 lag.into_uint64_datum(),
646 ]);
647 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
648 packer.push_dict(labels);
649
650 histogram_updates.push((row_buf.clone(), count));
651 }
652 }
653 if !histogram_updates.is_empty() {
654 self.deliver_introspection_updates(
655 IntrospectionType::WallclockLagHistogram,
656 histogram_updates,
657 );
658 }
659
660 self.wallclock_lag_last_recorded = now_trunc;
661 }
662
663 #[mz_ore::instrument(level = "debug")]
669 pub fn collection_hydrated(&self, collection_id: GlobalId) -> Result<bool, CollectionMissing> {
670 if self.replicas.is_empty() {
671 return Ok(true);
672 }
673 for replica_state in self.replicas.values() {
674 let collection_state = replica_state
675 .collections
676 .get(&collection_id)
677 .ok_or(CollectionMissing(collection_id))?;
678
679 if collection_state.hydrated() {
680 return Ok(true);
681 }
682 }
683
684 Ok(false)
685 }
686
687 #[mz_ore::instrument(level = "debug")]
693 pub fn collections_hydrated_on_replicas(
694 &self,
695 target_replica_ids: Option<Vec<ReplicaId>>,
696 exclude_collections: &BTreeSet<GlobalId>,
697 ) -> Result<bool, HydrationCheckBadTarget> {
698 if self.replicas.is_empty() {
699 return Ok(true);
700 }
701 let mut all_hydrated = true;
702 let target_replicas: BTreeSet<ReplicaId> = self
703 .replicas
704 .keys()
705 .filter_map(|id| match target_replica_ids {
706 None => Some(id.clone()),
707 Some(ref ids) if ids.contains(id) => Some(id.clone()),
708 Some(_) => None,
709 })
710 .collect();
711 if let Some(targets) = target_replica_ids {
712 if target_replicas.is_empty() {
713 return Err(HydrationCheckBadTarget(targets));
714 }
715 }
716
717 for (id, _collection) in self.collections_iter() {
718 if id.is_transient() || exclude_collections.contains(&id) {
719 continue;
720 }
721
722 let mut collection_hydrated = false;
723 for replica_state in self.replicas.values() {
724 if !target_replicas.contains(&replica_state.id) {
725 continue;
726 }
727 let collection_state = replica_state
728 .collections
729 .get(&id)
730 .expect("missing collection state");
731
732 if collection_state.hydrated() {
733 collection_hydrated = true;
734 break;
735 }
736 }
737
738 if !collection_hydrated {
739 tracing::info!("collection {id} is not hydrated on any replica");
740 all_hydrated = false;
741 }
744 }
745
746 Ok(all_hydrated)
747 }
748
749 fn cleanup_collections(&mut self) {
765 let to_remove: Vec<_> = self
766 .collections_iter()
767 .filter(|(id, collection)| {
768 collection.dropped
769 && collection.shared.lock_read_capabilities(|c| c.is_empty())
770 && self
771 .replicas
772 .values()
773 .all(|r| r.collection_frontiers_empty(*id))
774 })
775 .map(|(id, _collection)| id)
776 .collect();
777
778 for id in to_remove {
779 self.remove_collection(id);
780 }
781 }
782
783 #[mz_ore::instrument(level = "debug")]
787 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
788 let Self {
795 build_info: _,
796 storage_collections: _,
797 peek_stash_persist_location: _,
798 initialized,
799 read_only,
800 workload_class,
801 replicas,
802 collections,
803 log_sources: _,
804 peeks,
805 subscribes,
806 copy_tos,
807 history: _,
808 command_rx: _,
809 response_tx: _,
810 introspection_tx: _,
811 metrics: _,
812 dyncfg: _,
813 now: _,
814 wallclock_lag: _,
815 wallclock_lag_last_recorded,
816 read_hold_tx: _,
817 replica_tx: _,
818 replica_rx: _,
819 } = self;
820
821 let replicas: BTreeMap<_, _> = replicas
822 .iter()
823 .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
824 .collect::<Result<_, anyhow::Error>>()?;
825 let collections: BTreeMap<_, _> = collections
826 .iter()
827 .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
828 .collect();
829 let peeks: BTreeMap<_, _> = peeks
830 .iter()
831 .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
832 .collect();
833 let subscribes: BTreeMap<_, _> = subscribes
834 .iter()
835 .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
836 .collect();
837 let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
838 let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
839
840 Ok(serde_json::json!({
841 "initialized": initialized,
842 "read_only": read_only,
843 "workload_class": workload_class,
844 "replicas": replicas,
845 "collections": collections,
846 "peeks": peeks,
847 "subscribes": subscribes,
848 "copy_tos": copy_tos,
849 "wallclock_lag_last_recorded": wallclock_lag_last_recorded,
850 }))
851 }
852
853 pub(super) fn collection_write_frontier(
855 &self,
856 id: GlobalId,
857 ) -> Result<Antichain<T>, CollectionMissing> {
858 Ok(self.collection(id)?.write_frontier())
859 }
860}
861
862impl<T> Instance<T>
863where
864 T: ComputeControllerTimestamp,
865{
866 pub(super) fn new(
867 build_info: &'static BuildInfo,
868 storage: StorageCollections<T>,
869 peek_stash_persist_location: PersistLocation,
870 arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
871 metrics: InstanceMetrics,
872 now: NowFn,
873 wallclock_lag: WallclockLagFn<T>,
874 dyncfg: Arc<ConfigSet>,
875 command_rx: mpsc::UnboundedReceiver<Command<T>>,
876 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
877 read_hold_tx: read_holds::ChangeTx<T>,
878 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
879 read_only: bool,
880 ) -> Self {
881 let mut collections = BTreeMap::new();
882 let mut log_sources = BTreeMap::new();
883 for (log, id, shared) in arranged_logs {
884 let collection = CollectionState::new_log_collection(
885 id,
886 shared,
887 Arc::clone(&read_hold_tx),
888 introspection_tx.clone(),
889 );
890 collections.insert(id, collection);
891 log_sources.insert(log, id);
892 }
893
894 let history = ComputeCommandHistory::new(metrics.for_history());
895
896 let send_count = metrics.response_send_count.clone();
897 let recv_count = metrics.response_recv_count.clone();
898 let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
899
900 let now_dt = mz_ore::now::to_datetime(now());
901
902 Self {
903 build_info,
904 storage_collections: storage,
905 peek_stash_persist_location,
906 initialized: false,
907 read_only,
908 workload_class: None,
909 replicas: Default::default(),
910 collections,
911 log_sources,
912 peeks: Default::default(),
913 subscribes: Default::default(),
914 copy_tos: Default::default(),
915 history,
916 command_rx,
917 response_tx,
918 introspection_tx,
919 metrics,
920 dyncfg,
921 now,
922 wallclock_lag,
923 wallclock_lag_last_recorded: now_dt,
924 read_hold_tx,
925 replica_tx,
926 replica_rx,
927 }
928 }
929
930 pub(super) async fn run(mut self) {
931 self.send(ComputeCommand::Hello {
932 nonce: Uuid::default(),
935 });
936
937 let instance_config = InstanceConfig {
938 peek_stash_persist_location: self.peek_stash_persist_location.clone(),
939 logging: Default::default(),
942 expiration_offset: Default::default(),
943 };
944
945 self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
946
947 loop {
948 tokio::select! {
949 command = self.command_rx.recv() => match command {
950 Some(cmd) => cmd(&mut self),
951 None => break,
952 },
953 response = self.replica_rx.recv() => match response {
954 Some(response) => self.handle_response(response),
955 None => unreachable!("self owns a sender side of the channel"),
956 }
957 }
958 }
959 }
960
961 #[mz_ore::instrument(level = "debug")]
963 pub fn update_configuration(&mut self, config_params: ComputeParameters) {
964 if let Some(workload_class) = &config_params.workload_class {
965 self.workload_class = workload_class.clone();
966 }
967
968 let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
969 self.send(command);
970 }
971
972 #[mz_ore::instrument(level = "debug")]
977 pub fn initialization_complete(&mut self) {
978 if !self.initialized {
980 self.send(ComputeCommand::InitializationComplete);
981 self.initialized = true;
982 }
983 }
984
985 #[mz_ore::instrument(level = "debug")]
989 pub fn allow_writes(&mut self, collection_id: GlobalId) -> Result<(), CollectionMissing> {
990 let collection = self.collection_mut(collection_id)?;
991
992 if !collection.read_only {
994 return Ok(());
995 }
996
997 let as_of = collection.read_frontier();
999
1000 if as_of.is_empty() {
1003 return Ok(());
1004 }
1005
1006 collection.read_only = false;
1007 self.send(ComputeCommand::AllowWrites(collection_id));
1008
1009 Ok(())
1010 }
1011
1012 #[mz_ore::instrument(level = "debug")]
1022 pub fn shutdown(&mut self) {
1023 let (_tx, rx) = mpsc::unbounded_channel();
1025 self.command_rx = rx;
1026
1027 let stray_replicas: Vec<_> = self.replicas.keys().collect();
1028 soft_assert_or_log!(
1029 stray_replicas.is_empty(),
1030 "dropped instance still has provisioned replicas: {stray_replicas:?}",
1031 );
1032 }
1033
1034 #[mz_ore::instrument(level = "debug")]
1036 fn send(&mut self, cmd: ComputeCommand<T>) {
1037 self.history.push(cmd.clone());
1039
1040 let target_replica = self.target_replica(&cmd);
1041
1042 if let Some(rid) = target_replica {
1043 if let Some(replica) = self.replicas.get_mut(&rid) {
1044 let _ = replica.client.send(cmd);
1045 }
1046 } else {
1047 for replica in self.replicas.values_mut() {
1048 let _ = replica.client.send(cmd.clone());
1049 }
1050 }
1051 }
1052
1053 fn target_replica(&self, cmd: &ComputeCommand<T>) -> Option<ReplicaId> {
1061 match &cmd {
1062 ComputeCommand::Schedule(id)
1063 | ComputeCommand::AllowWrites(id)
1064 | ComputeCommand::AllowCompaction { id, .. } => {
1065 self.expect_collection(*id).target_replica
1066 }
1067 ComputeCommand::CreateDataflow(desc) => {
1068 let mut target_replica = None;
1069 for id in desc.export_ids() {
1070 if let Some(replica) = self.expect_collection(id).target_replica {
1071 if target_replica.is_some() {
1072 assert_eq!(target_replica, Some(replica));
1073 }
1074 target_replica = Some(replica);
1075 }
1076 }
1077 target_replica
1078 }
1079 ComputeCommand::Peek(_)
1081 | ComputeCommand::Hello { .. }
1082 | ComputeCommand::CreateInstance(_)
1083 | ComputeCommand::InitializationComplete
1084 | ComputeCommand::UpdateConfiguration(_)
1085 | ComputeCommand::CancelPeek { .. } => None,
1086 }
1087 }
1088
1089 #[mz_ore::instrument(level = "debug")]
1091 pub fn add_replica(
1092 &mut self,
1093 id: ReplicaId,
1094 mut config: ReplicaConfig,
1095 epoch: Option<u64>,
1096 ) -> Result<(), ReplicaExists> {
1097 if self.replica_exists(id) {
1098 return Err(ReplicaExists(id));
1099 }
1100
1101 config.logging.index_logs = self.log_sources.clone();
1102
1103 let epoch = epoch.unwrap_or(1);
1104 let metrics = self.metrics.for_replica(id);
1105 let client = ReplicaClient::spawn(
1106 id,
1107 self.build_info,
1108 config.clone(),
1109 epoch,
1110 metrics.clone(),
1111 Arc::clone(&self.dyncfg),
1112 self.replica_tx.clone(),
1113 );
1114
1115 self.history.reduce();
1117
1118 self.history.update_source_uppers(&self.storage_collections);
1120
1121 for command in self.history.iter() {
1123 if let Some(target_replica) = self.target_replica(command)
1125 && target_replica != id
1126 {
1127 continue;
1128 }
1129
1130 if client.send(command.clone()).is_err() {
1131 tracing::warn!("Replica {:?} connection terminated during hydration", id);
1134 break;
1135 }
1136 }
1137
1138 self.add_replica_state(id, client, config, epoch);
1140
1141 Ok(())
1142 }
1143
1144 #[mz_ore::instrument(level = "debug")]
1146 pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1147 self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1148
1149 let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1153 for subscribe_id in to_drop {
1154 let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1155 let response = ComputeControllerResponse::SubscribeResponse(
1156 subscribe_id,
1157 SubscribeBatch {
1158 lower: subscribe.frontier.clone(),
1159 upper: subscribe.frontier,
1160 updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1161 },
1162 );
1163 self.deliver_response(response);
1164 }
1165
1166 let mut peek_responses = Vec::new();
1171 let mut to_drop = Vec::new();
1172 for (uuid, peek) in self.peeks_targeting(id) {
1173 peek_responses.push(ComputeControllerResponse::PeekNotification(
1174 uuid,
1175 PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1176 peek.otel_ctx.clone(),
1177 ));
1178 to_drop.push(uuid);
1179 }
1180 for response in peek_responses {
1181 self.deliver_response(response);
1182 }
1183 for uuid in to_drop {
1184 let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1185 self.finish_peek(uuid, response);
1186 }
1187
1188 self.forward_implied_capabilities();
1191
1192 Ok(())
1193 }
1194
1195 fn rehydrate_replica(&mut self, id: ReplicaId) {
1201 let config = self.replicas[&id].config.clone();
1202 let epoch = self.replicas[&id].epoch + 1;
1203
1204 self.remove_replica(id).expect("replica must exist");
1205 let result = self.add_replica(id, config, Some(epoch));
1206
1207 match result {
1208 Ok(()) => (),
1209 Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1210 }
1211 }
1212
1213 fn rehydrate_failed_replicas(&mut self) {
1215 let replicas = self.replicas.iter();
1216 let failed_replicas: Vec<_> = replicas
1217 .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1218 .collect();
1219
1220 for replica_id in failed_replicas {
1221 self.rehydrate_replica(replica_id);
1222 }
1223 }
1224
1225 #[mz_ore::instrument(level = "debug")]
1230 pub fn create_dataflow(
1231 &mut self,
1232 dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1233 import_read_holds: Vec<ReadHold<T>>,
1234 mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState<T>>,
1235 target_replica: Option<ReplicaId>,
1236 ) -> Result<(), DataflowCreationError> {
1237 use DataflowCreationError::*;
1238
1239 if let Some(replica_id) = target_replica {
1243 if !self.replica_exists(replica_id) {
1244 return Err(ReplicaMissing(replica_id));
1245 }
1246 }
1247
1248 let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1250 if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1251 return Err(EmptyAsOfForSubscribe);
1252 }
1253 if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1254 return Err(EmptyAsOfForCopyTo);
1255 }
1256
1257 let mut storage_dependencies = BTreeMap::new();
1259 let mut compute_dependencies = BTreeMap::new();
1260
1261 let mut replica_input_read_holds = Vec::new();
1266
1267 let mut import_read_holds: BTreeMap<_, _> =
1268 import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1269
1270 for &id in dataflow.source_imports.keys() {
1271 let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1272 replica_input_read_holds.push(read_hold.clone());
1273
1274 read_hold
1275 .try_downgrade(as_of.clone())
1276 .map_err(|_| ReadHoldInsufficient(id))?;
1277 storage_dependencies.insert(id, read_hold);
1278 }
1279
1280 for &id in dataflow.index_imports.keys() {
1281 let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1282 read_hold
1283 .try_downgrade(as_of.clone())
1284 .map_err(|_| ReadHoldInsufficient(id))?;
1285 compute_dependencies.insert(id, read_hold);
1286 }
1287
1288 if as_of.is_empty() {
1291 replica_input_read_holds = Default::default();
1292 }
1293
1294 for export_id in dataflow.export_ids() {
1296 let shared = shared_collection_state
1297 .remove(&export_id)
1298 .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1299 let write_only = dataflow.sink_exports.contains_key(&export_id);
1300 let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1301
1302 self.add_collection(
1303 export_id,
1304 as_of.clone(),
1305 shared,
1306 storage_dependencies.clone(),
1307 compute_dependencies.clone(),
1308 replica_input_read_holds.clone(),
1309 write_only,
1310 storage_sink,
1311 dataflow.initial_storage_as_of.clone(),
1312 dataflow.refresh_schedule.clone(),
1313 target_replica,
1314 );
1315
1316 if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1319 self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1320 }
1321 }
1322
1323 for subscribe_id in dataflow.subscribe_ids() {
1325 self.subscribes
1326 .insert(subscribe_id, ActiveSubscribe::default());
1327 }
1328
1329 for copy_to_id in dataflow.copy_to_ids() {
1331 self.copy_tos.insert(copy_to_id);
1332 }
1333
1334 let mut source_imports = BTreeMap::new();
1337 for (id, import) in dataflow.source_imports {
1338 let frontiers = self
1339 .storage_collections
1340 .collection_frontiers(id)
1341 .expect("collection exists");
1342
1343 let collection_metadata = self
1344 .storage_collections
1345 .collection_metadata(id)
1346 .expect("we have a read hold on this collection");
1347
1348 let desc = SourceInstanceDesc {
1349 storage_metadata: collection_metadata.clone(),
1350 arguments: import.desc.arguments,
1351 typ: import.desc.typ.clone(),
1352 };
1353 source_imports.insert(
1354 id,
1355 mz_compute_types::dataflows::SourceImport {
1356 desc,
1357 monotonic: import.monotonic,
1358 with_snapshot: import.with_snapshot,
1359 upper: frontiers.write_frontier,
1360 },
1361 );
1362 }
1363
1364 let mut sink_exports = BTreeMap::new();
1365 for (id, se) in dataflow.sink_exports {
1366 let connection = match se.connection {
1367 ComputeSinkConnection::MaterializedView(conn) => {
1368 let metadata = self
1369 .storage_collections
1370 .collection_metadata(id)
1371 .map_err(|_| CollectionMissing(id))?
1372 .clone();
1373 let conn = MaterializedViewSinkConnection {
1374 value_desc: conn.value_desc,
1375 storage_metadata: metadata,
1376 };
1377 ComputeSinkConnection::MaterializedView(conn)
1378 }
1379 ComputeSinkConnection::ContinualTask(conn) => {
1380 let metadata = self
1381 .storage_collections
1382 .collection_metadata(id)
1383 .map_err(|_| DataflowCreationError::CollectionMissing(id))?
1384 .clone();
1385 let conn = ContinualTaskConnection {
1386 input_id: conn.input_id,
1387 storage_metadata: metadata,
1388 };
1389 ComputeSinkConnection::ContinualTask(conn)
1390 }
1391 ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1392 ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1393 ComputeSinkConnection::CopyToS3Oneshot(conn)
1394 }
1395 };
1396 let desc = ComputeSinkDesc {
1397 from: se.from,
1398 from_desc: se.from_desc,
1399 connection,
1400 with_snapshot: se.with_snapshot,
1401 up_to: se.up_to,
1402 non_null_assertions: se.non_null_assertions,
1403 refresh_schedule: se.refresh_schedule,
1404 };
1405 sink_exports.insert(id, desc);
1406 }
1407
1408 let objects_to_build = dataflow
1410 .objects_to_build
1411 .into_iter()
1412 .map(|object| BuildDesc {
1413 id: object.id,
1414 plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1415 })
1416 .collect();
1417
1418 let augmented_dataflow = DataflowDescription {
1419 source_imports,
1420 sink_exports,
1421 objects_to_build,
1422 index_imports: dataflow.index_imports,
1424 index_exports: dataflow.index_exports,
1425 as_of: dataflow.as_of.clone(),
1426 until: dataflow.until,
1427 initial_storage_as_of: dataflow.initial_storage_as_of,
1428 refresh_schedule: dataflow.refresh_schedule,
1429 debug_name: dataflow.debug_name,
1430 time_dependence: dataflow.time_dependence,
1431 };
1432
1433 if augmented_dataflow.is_transient() {
1434 tracing::debug!(
1435 name = %augmented_dataflow.debug_name,
1436 import_ids = %augmented_dataflow.display_import_ids(),
1437 export_ids = %augmented_dataflow.display_export_ids(),
1438 as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1439 until = ?augmented_dataflow.until.elements(),
1440 "creating dataflow",
1441 );
1442 } else {
1443 tracing::info!(
1444 name = %augmented_dataflow.debug_name,
1445 import_ids = %augmented_dataflow.display_import_ids(),
1446 export_ids = %augmented_dataflow.display_export_ids(),
1447 as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1448 until = ?augmented_dataflow.until.elements(),
1449 "creating dataflow",
1450 );
1451 }
1452
1453 if as_of.is_empty() {
1456 tracing::info!(
1457 name = %augmented_dataflow.debug_name,
1458 "not sending `CreateDataflow`, because of empty `as_of`",
1459 );
1460 } else {
1461 let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1462 self.send(ComputeCommand::CreateDataflow(Box::new(augmented_dataflow)));
1463
1464 for id in collections {
1465 self.maybe_schedule_collection(id);
1466 }
1467 }
1468
1469 Ok(())
1470 }
1471
1472 fn maybe_schedule_collection(&mut self, id: GlobalId) {
1478 let collection = self.expect_collection(id);
1479
1480 if collection.scheduled {
1482 return;
1483 }
1484
1485 let as_of = collection.read_frontier();
1486
1487 if as_of.is_empty() {
1490 return;
1491 }
1492
1493 let ready = if id.is_transient() {
1494 true
1500 } else {
1501 let not_self_dep = |x: &GlobalId| *x != id;
1507
1508 let mut deps_scheduled = true;
1511
1512 let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1517 let mut compute_frontiers = Vec::new();
1518 for id in compute_deps {
1519 let dep = &self.expect_collection(id);
1520 deps_scheduled &= dep.scheduled;
1521 compute_frontiers.push(dep.write_frontier());
1522 }
1523
1524 let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1525 let storage_frontiers = self
1526 .storage_collections
1527 .collections_frontiers(storage_deps.collect())
1528 .expect("must exist");
1529 let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1530
1531 let mut frontiers = compute_frontiers.into_iter().chain(storage_frontiers);
1532 let frontiers_ready =
1533 frontiers.all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1534
1535 deps_scheduled && frontiers_ready
1536 };
1537
1538 if ready {
1539 self.send(ComputeCommand::Schedule(id));
1540 let collection = self.expect_collection_mut(id);
1541 collection.scheduled = true;
1542 }
1543 }
1544
1545 fn schedule_collections(&mut self) {
1547 let ids: Vec<_> = self.collections.keys().copied().collect();
1548 for id in ids {
1549 self.maybe_schedule_collection(id);
1550 }
1551 }
1552
1553 #[mz_ore::instrument(level = "debug")]
1556 pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1557 for id in &ids {
1558 let collection = self.collection_mut(*id)?;
1559
1560 collection.dropped = true;
1562
1563 collection.implied_read_hold.release();
1566 collection.warmup_read_hold.release();
1567
1568 self.subscribes.remove(id);
1571 self.copy_tos.remove(id);
1574 }
1575
1576 Ok(())
1577 }
1578
1579 #[mz_ore::instrument(level = "debug")]
1583 pub fn peek(
1584 &mut self,
1585 peek_target: PeekTarget,
1586 literal_constraints: Option<Vec<Row>>,
1587 uuid: Uuid,
1588 timestamp: T,
1589 result_desc: RelationDesc,
1590 finishing: RowSetFinishing,
1591 map_filter_project: mz_expr::SafeMfpPlan,
1592 mut read_hold: ReadHold<T>,
1593 target_replica: Option<ReplicaId>,
1594 peek_response_tx: oneshot::Sender<PeekResponse>,
1595 ) -> Result<(), PeekError> {
1596 use PeekError::*;
1597
1598 let target_id = peek_target.id();
1599
1600 if read_hold.id() != target_id {
1602 return Err(ReadHoldIdMismatch(read_hold.id()));
1603 }
1604 read_hold
1605 .try_downgrade(Antichain::from_elem(timestamp.clone()))
1606 .map_err(|_| ReadHoldInsufficient(target_id))?;
1607
1608 if let Some(target) = target_replica {
1609 if !self.replica_exists(target) {
1610 return Err(ReplicaMissing(target));
1611 }
1612 }
1613
1614 let otel_ctx = OpenTelemetryContext::obtain();
1615
1616 self.peeks.insert(
1617 uuid,
1618 PendingPeek {
1619 target_replica,
1620 otel_ctx: otel_ctx.clone(),
1622 requested_at: Instant::now(),
1623 read_hold,
1624 peek_response_tx,
1625 limit: finishing.limit.map(usize::cast_from),
1626 offset: finishing.offset,
1627 },
1628 );
1629
1630 let peek = Peek {
1631 literal_constraints,
1632 uuid,
1633 timestamp,
1634 finishing,
1635 map_filter_project,
1636 otel_ctx,
1639 target: peek_target,
1640 result_desc,
1641 };
1642 self.send(ComputeCommand::Peek(Box::new(peek)));
1643
1644 Ok(())
1645 }
1646
1647 #[mz_ore::instrument(level = "debug")]
1649 pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1650 let Some(peek) = self.peeks.get_mut(&uuid) else {
1651 tracing::warn!("did not find pending peek for {uuid}");
1652 return;
1653 };
1654
1655 let duration = peek.requested_at.elapsed();
1656 self.metrics
1657 .observe_peek_response(&PeekResponse::Canceled, duration);
1658
1659 let otel_ctx = peek.otel_ctx.clone();
1661 otel_ctx.attach_as_parent();
1662
1663 self.deliver_response(ComputeControllerResponse::PeekNotification(
1664 uuid,
1665 PeekNotification::Canceled,
1666 otel_ctx,
1667 ));
1668
1669 self.finish_peek(uuid, reason);
1672 }
1673
1674 #[mz_ore::instrument(level = "debug")]
1686 pub fn set_read_policy(
1687 &mut self,
1688 policies: Vec<(GlobalId, ReadPolicy<T>)>,
1689 ) -> Result<(), ReadPolicyError> {
1690 for (id, _policy) in &policies {
1693 let collection = self.collection(*id)?;
1694 if collection.read_policy.is_none() {
1695 return Err(ReadPolicyError::WriteOnlyCollection(*id));
1696 }
1697 }
1698
1699 for (id, new_policy) in policies {
1700 let collection = self.expect_collection_mut(id);
1701 let new_since = new_policy.frontier(collection.write_frontier().borrow());
1702 let _ = collection.implied_read_hold.try_downgrade(new_since);
1703 collection.read_policy = Some(new_policy);
1704 }
1705
1706 Ok(())
1707 }
1708
1709 #[mz_ore::instrument(level = "debug")]
1717 fn maybe_update_global_write_frontier(&mut self, id: GlobalId, new_frontier: Antichain<T>) {
1718 let collection = self.expect_collection_mut(id);
1719
1720 let advanced = collection.shared.lock_write_frontier(|f| {
1721 let advanced = PartialOrder::less_than(f, &new_frontier);
1722 if advanced {
1723 f.clone_from(&new_frontier);
1724 }
1725 advanced
1726 });
1727
1728 if !advanced {
1729 return;
1730 }
1731
1732 let new_since = match &collection.read_policy {
1734 Some(read_policy) => {
1735 read_policy.frontier(new_frontier.borrow())
1738 }
1739 None => {
1740 Antichain::from_iter(
1749 new_frontier
1750 .iter()
1751 .map(|t| t.step_back().unwrap_or_else(T::minimum)),
1752 )
1753 }
1754 };
1755 let _ = collection.implied_read_hold.try_downgrade(new_since);
1756
1757 self.deliver_response(ComputeControllerResponse::FrontierUpper {
1759 id,
1760 upper: new_frontier,
1761 });
1762 }
1763
1764 pub(super) fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch<T>) {
1766 let Some(collection) = self.collections.get_mut(&id) else {
1767 soft_panic_or_log!(
1768 "read hold change for absent collection (id={id}, changes={update:?})"
1769 );
1770 return;
1771 };
1772
1773 let new_since = collection.shared.lock_read_capabilities(|caps| {
1774 let read_frontier = caps.frontier();
1777 for (time, diff) in update.iter() {
1778 let count = caps.count_for(time) + diff;
1779 assert!(
1780 count >= 0,
1781 "invalid read capabilities update: negative capability \
1782 (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1783 );
1784 assert!(
1785 count == 0 || read_frontier.less_equal(time),
1786 "invalid read capabilities update: frontier regression \
1787 (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1788 );
1789 }
1790
1791 let changes = caps.update_iter(update.drain());
1794
1795 let changed = changes.count() > 0;
1796 changed.then(|| caps.frontier().to_owned())
1797 });
1798
1799 let Some(new_since) = new_since else {
1800 return; };
1802
1803 for read_hold in collection.compute_dependencies.values_mut() {
1805 read_hold
1806 .try_downgrade(new_since.clone())
1807 .expect("frontiers don't regress");
1808 }
1809 for read_hold in collection.storage_dependencies.values_mut() {
1810 read_hold
1811 .try_downgrade(new_since.clone())
1812 .expect("frontiers don't regress");
1813 }
1814
1815 self.send(ComputeCommand::AllowCompaction {
1817 id,
1818 frontier: new_since,
1819 });
1820 }
1821
1822 fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1831 let Some(peek) = self.peeks.remove(&uuid) else {
1832 return;
1833 };
1834
1835 let _ = peek.peek_response_tx.send(response);
1837
1838 self.send(ComputeCommand::CancelPeek { uuid });
1841
1842 drop(peek.read_hold);
1843 }
1844
1845 fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse<T>) {
1848 if self
1850 .replicas
1851 .get(&replica_id)
1852 .filter(|replica| replica.epoch == epoch)
1853 .is_none()
1854 {
1855 return;
1856 }
1857
1858 match response {
1861 ComputeResponse::Frontiers(id, frontiers) => {
1862 self.handle_frontiers_response(id, frontiers, replica_id);
1863 }
1864 ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
1865 self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
1866 }
1867 ComputeResponse::CopyToResponse(id, response) => {
1868 self.handle_copy_to_response(id, response, replica_id);
1869 }
1870 ComputeResponse::SubscribeResponse(id, response) => {
1871 self.handle_subscribe_response(id, response, replica_id);
1872 }
1873 ComputeResponse::Status(response) => {
1874 self.handle_status_response(response, replica_id);
1875 }
1876 }
1877 }
1878
1879 fn handle_frontiers_response(
1882 &mut self,
1883 id: GlobalId,
1884 frontiers: FrontiersResponse<T>,
1885 replica_id: ReplicaId,
1886 ) {
1887 if !self.collections.contains_key(&id) {
1888 soft_panic_or_log!(
1889 "frontiers update for an unknown collection \
1890 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1891 );
1892 return;
1893 }
1894 let Some(replica) = self.replicas.get_mut(&replica_id) else {
1895 soft_panic_or_log!(
1896 "frontiers update for an unknown replica \
1897 (replica_id={replica_id}, frontiers={frontiers:?})"
1898 );
1899 return;
1900 };
1901 let Some(replica_collection) = replica.collections.get_mut(&id) else {
1902 soft_panic_or_log!(
1903 "frontiers update for an unknown replica collection \
1904 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1905 );
1906 return;
1907 };
1908
1909 if let Some(new_frontier) = frontiers.input_frontier {
1910 replica_collection.update_input_frontier(new_frontier.clone());
1911 }
1912 if let Some(new_frontier) = frontiers.output_frontier {
1913 replica_collection.update_output_frontier(new_frontier.clone());
1914 }
1915 if let Some(new_frontier) = frontiers.write_frontier {
1916 replica_collection.update_write_frontier(new_frontier.clone());
1917 self.maybe_update_global_write_frontier(id, new_frontier);
1918 }
1919 }
1920
1921 #[mz_ore::instrument(level = "debug")]
1922 fn handle_peek_response(
1923 &mut self,
1924 uuid: Uuid,
1925 response: PeekResponse,
1926 otel_ctx: OpenTelemetryContext,
1927 replica_id: ReplicaId,
1928 ) {
1929 otel_ctx.attach_as_parent();
1930
1931 let Some(peek) = self.peeks.get(&uuid) else {
1934 return;
1935 };
1936
1937 let target_replica = peek.target_replica.unwrap_or(replica_id);
1939 if target_replica != replica_id {
1940 return;
1941 }
1942
1943 let duration = peek.requested_at.elapsed();
1944 self.metrics.observe_peek_response(&response, duration);
1945
1946 let notification = PeekNotification::new(&response, peek.offset, peek.limit);
1947 self.deliver_response(ComputeControllerResponse::PeekNotification(
1950 uuid,
1951 notification,
1952 otel_ctx,
1953 ));
1954
1955 self.finish_peek(uuid, response)
1956 }
1957
1958 fn handle_copy_to_response(
1959 &mut self,
1960 sink_id: GlobalId,
1961 response: CopyToResponse,
1962 replica_id: ReplicaId,
1963 ) {
1964 if !self.collections.contains_key(&sink_id) {
1965 soft_panic_or_log!(
1966 "received response for an unknown copy-to \
1967 (sink_id={sink_id}, replica_id={replica_id})",
1968 );
1969 return;
1970 }
1971 let Some(replica) = self.replicas.get_mut(&replica_id) else {
1972 soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
1973 return;
1974 };
1975 let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
1976 soft_panic_or_log!(
1977 "copy-to response for an unknown replica collection \
1978 (sink_id={sink_id}, replica_id={replica_id})"
1979 );
1980 return;
1981 };
1982
1983 replica_collection.update_write_frontier(Antichain::new());
1987 replica_collection.update_input_frontier(Antichain::new());
1988 replica_collection.update_output_frontier(Antichain::new());
1989
1990 if !self.copy_tos.remove(&sink_id) {
1993 return;
1994 }
1995
1996 let result = match response {
1997 CopyToResponse::RowCount(count) => Ok(count),
1998 CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
1999 CopyToResponse::Dropped => {
2004 tracing::error!(
2005 %sink_id, %replica_id,
2006 "received `Dropped` response for a tracked copy to",
2007 );
2008 return;
2009 }
2010 };
2011
2012 self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2013 }
2014
2015 fn handle_subscribe_response(
2016 &mut self,
2017 subscribe_id: GlobalId,
2018 response: SubscribeResponse<T>,
2019 replica_id: ReplicaId,
2020 ) {
2021 if !self.collections.contains_key(&subscribe_id) {
2022 soft_panic_or_log!(
2023 "received response for an unknown subscribe \
2024 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2025 );
2026 return;
2027 }
2028 let Some(replica) = self.replicas.get_mut(&replica_id) else {
2029 soft_panic_or_log!(
2030 "subscribe response for an unknown replica (replica_id={replica_id})"
2031 );
2032 return;
2033 };
2034 let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2035 soft_panic_or_log!(
2036 "subscribe response for an unknown replica collection \
2037 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2038 );
2039 return;
2040 };
2041
2042 let write_frontier = match &response {
2046 SubscribeResponse::Batch(batch) => batch.upper.clone(),
2047 SubscribeResponse::DroppedAt(_) => Antichain::new(),
2048 };
2049
2050 replica_collection.update_write_frontier(write_frontier.clone());
2054 replica_collection.update_input_frontier(write_frontier.clone());
2055 replica_collection.update_output_frontier(write_frontier.clone());
2056
2057 let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2059 return;
2060 };
2061
2062 self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2068
2069 match response {
2070 SubscribeResponse::Batch(batch) => {
2071 let upper = batch.upper;
2072 let mut updates = batch.updates;
2073
2074 if PartialOrder::less_than(&subscribe.frontier, &upper) {
2077 let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2078
2079 if upper.is_empty() {
2080 self.subscribes.remove(&subscribe_id);
2082 } else {
2083 self.subscribes.insert(subscribe_id, subscribe);
2085 }
2086
2087 if let Ok(updates) = updates.as_mut() {
2088 updates.retain(|(time, _data, _diff)| lower.less_equal(time));
2089 }
2090 self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2091 subscribe_id,
2092 SubscribeBatch {
2093 lower,
2094 upper,
2095 updates,
2096 },
2097 ));
2098 }
2099 }
2100 SubscribeResponse::DroppedAt(frontier) => {
2101 tracing::error!(
2106 %subscribe_id,
2107 %replica_id,
2108 frontier = ?frontier.elements(),
2109 "received `DroppedAt` response for a tracked subscribe",
2110 );
2111 self.subscribes.remove(&subscribe_id);
2112 }
2113 }
2114 }
2115
2116 fn handle_status_response(&self, response: StatusResponse, _replica_id: ReplicaId) {
2117 match response {
2118 StatusResponse::Placeholder => {}
2119 }
2120 }
2121
2122 fn dependency_write_frontiers<'b>(
2124 &'b self,
2125 collection: &'b CollectionState<T>,
2126 ) -> impl Iterator<Item = Antichain<T>> + 'b {
2127 let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2128 let collection = self.collections.get(&dep_id);
2129 collection.map(|c| c.write_frontier())
2130 });
2131 let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2132 let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2133 frontiers.map(|f| f.write_frontier)
2134 });
2135
2136 compute_frontiers.chain(storage_frontiers)
2137 }
2138
2139 fn transitive_storage_dependency_write_frontiers<'b>(
2141 &'b self,
2142 collection: &'b CollectionState<T>,
2143 ) -> impl Iterator<Item = Antichain<T>> + 'b {
2144 let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2145 let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2146 let mut done = BTreeSet::new();
2147
2148 while let Some(id) = todo.pop() {
2149 if done.contains(&id) {
2150 continue;
2151 }
2152 if let Some(dep) = self.collections.get(&id) {
2153 storage_ids.extend(dep.storage_dependency_ids());
2154 todo.extend(dep.compute_dependency_ids())
2155 }
2156 done.insert(id);
2157 }
2158
2159 let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2160 let frontiers = self.storage_collections.collection_frontiers(id).ok();
2161 frontiers.map(|f| f.write_frontier)
2162 });
2163
2164 storage_frontiers
2165 }
2166
2167 fn downgrade_warmup_capabilities(&mut self) {
2180 let mut new_capabilities = BTreeMap::new();
2181 for (id, collection) in &self.collections {
2182 if collection.read_policy.is_none()
2186 && collection.shared.lock_write_frontier(|f| f.is_empty())
2187 {
2188 new_capabilities.insert(*id, Antichain::new());
2189 continue;
2190 }
2191
2192 let mut new_capability = Antichain::new();
2193 for frontier in self.dependency_write_frontiers(collection) {
2194 for time in frontier {
2195 new_capability.insert(time.step_back().unwrap_or(time));
2196 }
2197 }
2198
2199 new_capabilities.insert(*id, new_capability);
2200 }
2201
2202 for (id, new_capability) in new_capabilities {
2203 let collection = self.expect_collection_mut(id);
2204 let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2205 }
2206 }
2207
2208 fn forward_implied_capabilities(&mut self) {
2236 if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2237 return;
2238 }
2239 if !self.replicas.is_empty() {
2240 return;
2241 }
2242
2243 let mut new_capabilities = BTreeMap::new();
2244 for (id, collection) in &self.collections {
2245 let Some(read_policy) = &collection.read_policy else {
2246 continue;
2248 };
2249
2250 let mut dep_frontier = Antichain::new();
2254 for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2255 dep_frontier.extend(frontier);
2256 }
2257
2258 let new_capability = read_policy.frontier(dep_frontier.borrow());
2259 if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2260 new_capabilities.insert(*id, new_capability);
2261 }
2262 }
2263
2264 for (id, new_capability) in new_capabilities {
2265 let collection = self.expect_collection_mut(id);
2266 let _ = collection.implied_read_hold.try_downgrade(new_capability);
2267 }
2268 }
2269
2270 pub(super) fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
2275 let collection = self.collection(id)?;
2281 let since = collection.shared.lock_read_capabilities(|caps| {
2282 let since = caps.frontier().to_owned();
2283 caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
2284 since
2285 });
2286 let hold = ReadHold::new(id, since, Arc::clone(&self.read_hold_tx));
2287 Ok(hold)
2288 }
2289
2290 #[mz_ore::instrument(level = "debug")]
2296 pub fn maintain(&mut self) {
2297 self.rehydrate_failed_replicas();
2298 self.downgrade_warmup_capabilities();
2299 self.forward_implied_capabilities();
2300 self.schedule_collections();
2301 self.cleanup_collections();
2302 self.update_frontier_introspection();
2303 self.refresh_state_metrics();
2304 self.refresh_wallclock_lag();
2305 }
2306}
2307
2308#[derive(Debug)]
2313struct CollectionState<T: ComputeControllerTimestamp> {
2314 target_replica: Option<ReplicaId>,
2316 log_collection: bool,
2320 dropped: bool,
2326 scheduled: bool,
2329
2330 read_only: bool,
2334
2335 shared: SharedCollectionState<T>,
2337
2338 implied_read_hold: ReadHold<T>,
2345 warmup_read_hold: ReadHold<T>,
2353 read_policy: Option<ReadPolicy<T>>,
2359
2360 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2363 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2366
2367 introspection: CollectionIntrospection<T>,
2369
2370 wallclock_lag_histogram_stash: Option<
2377 BTreeMap<
2378 (
2379 WallclockLagHistogramPeriod,
2380 WallclockLag,
2381 BTreeMap<&'static str, String>,
2382 ),
2383 Diff,
2384 >,
2385 >,
2386}
2387
2388impl<T: ComputeControllerTimestamp> CollectionState<T> {
2389 fn new(
2391 collection_id: GlobalId,
2392 as_of: Antichain<T>,
2393 shared: SharedCollectionState<T>,
2394 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2395 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2396 read_hold_tx: read_holds::ChangeTx<T>,
2397 introspection: CollectionIntrospection<T>,
2398 ) -> Self {
2399 let since = as_of.clone();
2401 let upper = as_of;
2403
2404 assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2406 assert!(shared.lock_write_frontier(|f| f == &upper));
2407
2408 let implied_read_hold =
2412 ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2413 let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2414
2415 let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2416 shared.lock_read_capabilities(|c| {
2417 c.update_iter(updates);
2418 });
2419
2420 let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2424 true => None,
2425 false => Some(Default::default()),
2426 };
2427
2428 Self {
2429 target_replica: None,
2430 log_collection: false,
2431 dropped: false,
2432 scheduled: false,
2433 read_only: true,
2434 shared,
2435 implied_read_hold,
2436 warmup_read_hold,
2437 read_policy: Some(ReadPolicy::ValidFrom(since)),
2438 storage_dependencies,
2439 compute_dependencies,
2440 introspection,
2441 wallclock_lag_histogram_stash,
2442 }
2443 }
2444
2445 fn new_log_collection(
2447 id: GlobalId,
2448 shared: SharedCollectionState<T>,
2449 read_hold_tx: read_holds::ChangeTx<T>,
2450 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2451 ) -> Self {
2452 let since = Antichain::from_elem(T::minimum());
2453 let introspection = CollectionIntrospection::new(
2454 id,
2455 introspection_tx,
2456 since.clone(),
2457 false,
2458 None,
2459 None,
2460 Vec::new(),
2461 );
2462 let mut state = Self::new(
2463 id,
2464 since,
2465 shared,
2466 Default::default(),
2467 Default::default(),
2468 read_hold_tx,
2469 introspection,
2470 );
2471 state.log_collection = true;
2472 state.scheduled = true;
2474 state
2475 }
2476
2477 fn read_frontier(&self) -> Antichain<T> {
2479 self.shared
2480 .lock_read_capabilities(|c| c.frontier().to_owned())
2481 }
2482
2483 fn write_frontier(&self) -> Antichain<T> {
2485 self.shared.lock_write_frontier(|f| f.clone())
2486 }
2487
2488 fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2489 self.storage_dependencies.keys().copied()
2490 }
2491
2492 fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2493 self.compute_dependencies.keys().copied()
2494 }
2495}
2496
2497#[derive(Clone, Debug)]
2508pub(super) struct SharedCollectionState<T> {
2509 read_capabilities: Arc<Mutex<MutableAntichain<T>>>,
2522 write_frontier: Arc<Mutex<Antichain<T>>>,
2524}
2525
2526impl<T: Timestamp> SharedCollectionState<T> {
2527 pub fn new(as_of: Antichain<T>) -> Self {
2528 let since = as_of.clone();
2530 let upper = as_of;
2532
2533 let mut read_capabilities = MutableAntichain::new();
2537 read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2538
2539 Self {
2540 read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2541 write_frontier: Arc::new(Mutex::new(upper)),
2542 }
2543 }
2544
2545 pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2546 where
2547 F: FnOnce(&mut MutableAntichain<T>) -> R,
2548 {
2549 let mut caps = self.read_capabilities.lock().expect("poisoned");
2550 f(&mut *caps)
2551 }
2552
2553 pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2554 where
2555 F: FnOnce(&mut Antichain<T>) -> R,
2556 {
2557 let mut frontier = self.write_frontier.lock().expect("poisoned");
2558 f(&mut *frontier)
2559 }
2560}
2561
2562#[derive(Debug)]
2565struct CollectionIntrospection<T: ComputeControllerTimestamp> {
2566 collection_id: GlobalId,
2568 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2570 frontiers: Option<FrontiersIntrospectionState<T>>,
2575 refresh: Option<RefreshIntrospectionState<T>>,
2579 dependency_ids: Vec<GlobalId>,
2581}
2582
2583impl<T: ComputeControllerTimestamp> CollectionIntrospection<T> {
2584 fn new(
2585 collection_id: GlobalId,
2586 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2587 as_of: Antichain<T>,
2588 storage_sink: bool,
2589 initial_as_of: Option<Antichain<T>>,
2590 refresh_schedule: Option<RefreshSchedule>,
2591 dependency_ids: Vec<GlobalId>,
2592 ) -> Self {
2593 let refresh =
2594 match (refresh_schedule, initial_as_of) {
2595 (Some(refresh_schedule), Some(initial_as_of)) => Some(
2596 RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2597 ),
2598 (refresh_schedule, _) => {
2599 soft_assert_or_log!(
2602 refresh_schedule.is_none(),
2603 "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2604 );
2605 None
2606 }
2607 };
2608 let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2609
2610 let self_ = Self {
2611 collection_id,
2612 introspection_tx,
2613 frontiers,
2614 refresh,
2615 dependency_ids,
2616 };
2617
2618 self_.report_initial_state();
2619 self_
2620 }
2621
2622 fn report_initial_state(&self) {
2624 if let Some(frontiers) = &self.frontiers {
2625 let row = frontiers.row_for_collection(self.collection_id);
2626 let updates = vec![(row, Diff::ONE)];
2627 self.send(IntrospectionType::Frontiers, updates);
2628 }
2629
2630 if let Some(refresh) = &self.refresh {
2631 let row = refresh.row_for_collection(self.collection_id);
2632 let updates = vec![(row, Diff::ONE)];
2633 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2634 }
2635
2636 if !self.dependency_ids.is_empty() {
2637 let updates = self.dependency_rows(Diff::ONE);
2638 self.send(IntrospectionType::ComputeDependencies, updates);
2639 }
2640 }
2641
2642 fn dependency_rows(&self, diff: Diff) -> Vec<(Row, Diff)> {
2644 self.dependency_ids
2645 .iter()
2646 .map(|dependency_id| {
2647 let row = Row::pack_slice(&[
2648 Datum::String(&self.collection_id.to_string()),
2649 Datum::String(&dependency_id.to_string()),
2650 ]);
2651 (row, diff)
2652 })
2653 .collect()
2654 }
2655
2656 fn observe_frontiers(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2659 self.update_frontier_introspection(read_frontier, write_frontier);
2660 self.update_refresh_introspection(write_frontier);
2661 }
2662
2663 fn update_frontier_introspection(
2664 &mut self,
2665 read_frontier: &Antichain<T>,
2666 write_frontier: &Antichain<T>,
2667 ) {
2668 let Some(frontiers) = &mut self.frontiers else {
2669 return;
2670 };
2671
2672 if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2673 {
2674 return; };
2676
2677 let retraction = frontiers.row_for_collection(self.collection_id);
2678 frontiers.update(read_frontier, write_frontier);
2679 let insertion = frontiers.row_for_collection(self.collection_id);
2680 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2681 self.send(IntrospectionType::Frontiers, updates);
2682 }
2683
2684 fn update_refresh_introspection(&mut self, write_frontier: &Antichain<T>) {
2685 let Some(refresh) = &mut self.refresh else {
2686 return;
2687 };
2688
2689 let retraction = refresh.row_for_collection(self.collection_id);
2690 refresh.frontier_update(write_frontier);
2691 let insertion = refresh.row_for_collection(self.collection_id);
2692
2693 if retraction == insertion {
2694 return; }
2696
2697 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2698 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2699 }
2700
2701 fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2702 let _ = self.introspection_tx.send((introspection_type, updates));
2705 }
2706}
2707
2708impl<T: ComputeControllerTimestamp> Drop for CollectionIntrospection<T> {
2709 fn drop(&mut self) {
2710 if let Some(frontiers) = &self.frontiers {
2712 let row = frontiers.row_for_collection(self.collection_id);
2713 let updates = vec![(row, Diff::MINUS_ONE)];
2714 self.send(IntrospectionType::Frontiers, updates);
2715 }
2716
2717 if let Some(refresh) = &self.refresh {
2719 let retraction = refresh.row_for_collection(self.collection_id);
2720 let updates = vec![(retraction, Diff::MINUS_ONE)];
2721 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2722 }
2723
2724 if !self.dependency_ids.is_empty() {
2726 let updates = self.dependency_rows(Diff::MINUS_ONE);
2727 self.send(IntrospectionType::ComputeDependencies, updates);
2728 }
2729 }
2730}
2731
2732#[derive(Debug)]
2733struct FrontiersIntrospectionState<T> {
2734 read_frontier: Antichain<T>,
2735 write_frontier: Antichain<T>,
2736}
2737
2738impl<T: ComputeControllerTimestamp> FrontiersIntrospectionState<T> {
2739 fn new(as_of: Antichain<T>) -> Self {
2740 Self {
2741 read_frontier: as_of.clone(),
2742 write_frontier: as_of,
2743 }
2744 }
2745
2746 fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2748 let read_frontier = self
2749 .read_frontier
2750 .as_option()
2751 .map_or(Datum::Null, |ts| ts.clone().into());
2752 let write_frontier = self
2753 .write_frontier
2754 .as_option()
2755 .map_or(Datum::Null, |ts| ts.clone().into());
2756 Row::pack_slice(&[
2757 Datum::String(&collection_id.to_string()),
2758 read_frontier,
2759 write_frontier,
2760 ])
2761 }
2762
2763 fn update(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2765 if read_frontier != &self.read_frontier {
2766 self.read_frontier.clone_from(read_frontier);
2767 }
2768 if write_frontier != &self.write_frontier {
2769 self.write_frontier.clone_from(write_frontier);
2770 }
2771 }
2772}
2773
2774#[derive(Debug)]
2777struct RefreshIntrospectionState<T> {
2778 refresh_schedule: RefreshSchedule,
2780 initial_as_of: Antichain<T>,
2781 next_refresh: Datum<'static>, last_completed_refresh: Datum<'static>, }
2785
2786impl<T> RefreshIntrospectionState<T> {
2787 fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2789 Row::pack_slice(&[
2790 Datum::String(&collection_id.to_string()),
2791 self.last_completed_refresh,
2792 self.next_refresh,
2793 ])
2794 }
2795}
2796
2797impl<T: ComputeControllerTimestamp> RefreshIntrospectionState<T> {
2798 fn new(
2801 refresh_schedule: RefreshSchedule,
2802 initial_as_of: Antichain<T>,
2803 upper: &Antichain<T>,
2804 ) -> Self {
2805 let mut self_ = Self {
2806 refresh_schedule: refresh_schedule.clone(),
2807 initial_as_of: initial_as_of.clone(),
2808 next_refresh: Datum::Null,
2809 last_completed_refresh: Datum::Null,
2810 };
2811 self_.frontier_update(upper);
2812 self_
2813 }
2814
2815 fn frontier_update(&mut self, write_frontier: &Antichain<T>) {
2818 if write_frontier.is_empty() {
2819 self.last_completed_refresh =
2820 if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2821 last_refresh.into()
2822 } else {
2823 T::maximum().into()
2826 };
2827 self.next_refresh = Datum::Null;
2828 } else {
2829 if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2830 self.last_completed_refresh = Datum::Null;
2832 let initial_as_of = self.initial_as_of.as_option().expect(
2833 "initial_as_of can't be [], because then there would be no refreshes at all",
2834 );
2835 let first_refresh = initial_as_of
2836 .round_up(&self.refresh_schedule)
2837 .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2838 soft_assert_or_log!(
2839 first_refresh == *initial_as_of,
2840 "initial_as_of should be set to the first refresh"
2841 );
2842 self.next_refresh = first_refresh.into();
2843 } else {
2844 let write_frontier = write_frontier.as_option().expect("checked above");
2846 self.last_completed_refresh = write_frontier
2847 .round_down_minus_1(&self.refresh_schedule)
2848 .map_or_else(
2849 || {
2850 soft_panic_or_log!(
2851 "rounding down should have returned the first refresh or later"
2852 );
2853 Datum::Null
2854 },
2855 |last_completed_refresh| last_completed_refresh.into(),
2856 );
2857 self.next_refresh = write_frontier.clone().into();
2858 }
2859 }
2860 }
2861}
2862
2863#[derive(Debug)]
2865struct PendingPeek<T: Timestamp> {
2866 target_replica: Option<ReplicaId>,
2870 otel_ctx: OpenTelemetryContext,
2872 requested_at: Instant,
2876 read_hold: ReadHold<T>,
2878 peek_response_tx: oneshot::Sender<PeekResponse>,
2880 limit: Option<usize>,
2882 offset: usize,
2884}
2885
2886#[derive(Debug, Clone)]
2887struct ActiveSubscribe<T> {
2888 frontier: Antichain<T>,
2890}
2891
2892impl<T: ComputeControllerTimestamp> Default for ActiveSubscribe<T> {
2893 fn default() -> Self {
2894 Self {
2895 frontier: Antichain::from_elem(T::minimum()),
2896 }
2897 }
2898}
2899
2900#[derive(Debug)]
2902struct ReplicaState<T: ComputeControllerTimestamp> {
2903 id: ReplicaId,
2905 client: ReplicaClient<T>,
2907 config: ReplicaConfig,
2909 metrics: ReplicaMetrics,
2911 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2913 collections: BTreeMap<GlobalId, ReplicaCollectionState<T>>,
2915 epoch: u64,
2917}
2918
2919impl<T: ComputeControllerTimestamp> ReplicaState<T> {
2920 fn new(
2921 id: ReplicaId,
2922 client: ReplicaClient<T>,
2923 config: ReplicaConfig,
2924 metrics: ReplicaMetrics,
2925 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2926 epoch: u64,
2927 ) -> Self {
2928 Self {
2929 id,
2930 client,
2931 config,
2932 metrics,
2933 introspection_tx,
2934 epoch,
2935 collections: Default::default(),
2936 }
2937 }
2938
2939 fn add_collection(
2945 &mut self,
2946 id: GlobalId,
2947 as_of: Antichain<T>,
2948 input_read_holds: Vec<ReadHold<T>>,
2949 ) {
2950 let metrics = self.metrics.for_collection(id);
2951 let introspection = ReplicaCollectionIntrospection::new(
2952 self.id,
2953 id,
2954 self.introspection_tx.clone(),
2955 as_of.clone(),
2956 );
2957 let mut state =
2958 ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
2959
2960 if id.is_transient() {
2964 state.wallclock_lag_max = None;
2965 }
2966
2967 if let Some(previous) = self.collections.insert(id, state) {
2968 panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
2969 }
2970 }
2971
2972 fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState<T>> {
2974 self.collections.remove(&id)
2975 }
2976
2977 fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
2979 self.collections.get(&id).map_or(true, |c| {
2980 c.write_frontier.is_empty()
2981 && c.input_frontier.is_empty()
2982 && c.output_frontier.is_empty()
2983 })
2984 }
2985
2986 #[mz_ore::instrument(level = "debug")]
2990 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2991 let Self {
2998 id,
2999 client: _,
3000 config: _,
3001 metrics: _,
3002 introspection_tx: _,
3003 epoch,
3004 collections,
3005 } = self;
3006
3007 let collections: BTreeMap<_, _> = collections
3008 .iter()
3009 .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
3010 .collect();
3011
3012 Ok(serde_json::json!({
3013 "id": id.to_string(),
3014 "collections": collections,
3015 "epoch": epoch,
3016 }))
3017 }
3018}
3019
3020#[derive(Debug)]
3021struct ReplicaCollectionState<T: ComputeControllerTimestamp> {
3022 write_frontier: Antichain<T>,
3026 input_frontier: Antichain<T>,
3030 output_frontier: Antichain<T>,
3034
3035 metrics: Option<ReplicaCollectionMetrics>,
3039 as_of: Antichain<T>,
3041 introspection: ReplicaCollectionIntrospection<T>,
3043 input_read_holds: Vec<ReadHold<T>>,
3049
3050 wallclock_lag_max: Option<WallclockLag>,
3054}
3055
3056impl<T: ComputeControllerTimestamp> ReplicaCollectionState<T> {
3057 fn new(
3058 metrics: Option<ReplicaCollectionMetrics>,
3059 as_of: Antichain<T>,
3060 introspection: ReplicaCollectionIntrospection<T>,
3061 input_read_holds: Vec<ReadHold<T>>,
3062 ) -> Self {
3063 Self {
3064 write_frontier: as_of.clone(),
3065 input_frontier: as_of.clone(),
3066 output_frontier: as_of.clone(),
3067 metrics,
3068 as_of,
3069 introspection,
3070 input_read_holds,
3071 wallclock_lag_max: Some(WallclockLag::MIN),
3072 }
3073 }
3074
3075 fn hydrated(&self) -> bool {
3077 self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3093 }
3094
3095 fn update_write_frontier(&mut self, new_frontier: Antichain<T>) {
3097 if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3098 soft_panic_or_log!(
3099 "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3100 self.write_frontier,
3101 );
3102 return;
3103 } else if new_frontier == self.write_frontier {
3104 return;
3105 }
3106
3107 self.write_frontier = new_frontier;
3108 }
3109
3110 fn update_input_frontier(&mut self, new_frontier: Antichain<T>) {
3112 if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3113 soft_panic_or_log!(
3114 "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3115 self.input_frontier,
3116 );
3117 return;
3118 } else if new_frontier == self.input_frontier {
3119 return;
3120 }
3121
3122 self.input_frontier = new_frontier;
3123
3124 for read_hold in &mut self.input_read_holds {
3126 let result = read_hold.try_downgrade(self.input_frontier.clone());
3127 soft_assert_or_log!(
3128 result.is_ok(),
3129 "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3130 self.input_frontier,
3131 );
3132 }
3133 }
3134
3135 fn update_output_frontier(&mut self, new_frontier: Antichain<T>) {
3137 if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3138 soft_panic_or_log!(
3139 "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3140 self.output_frontier,
3141 );
3142 return;
3143 } else if new_frontier == self.output_frontier {
3144 return;
3145 }
3146
3147 self.output_frontier = new_frontier;
3148 }
3149}
3150
3151#[derive(Debug)]
3154struct ReplicaCollectionIntrospection<T: ComputeControllerTimestamp> {
3155 replica_id: ReplicaId,
3157 collection_id: GlobalId,
3159 write_frontier: Antichain<T>,
3161 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3163}
3164
3165impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
3166 fn new(
3168 replica_id: ReplicaId,
3169 collection_id: GlobalId,
3170 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3171 as_of: Antichain<T>,
3172 ) -> Self {
3173 let self_ = Self {
3174 replica_id,
3175 collection_id,
3176 write_frontier: as_of,
3177 introspection_tx,
3178 };
3179
3180 self_.report_initial_state();
3181 self_
3182 }
3183
3184 fn report_initial_state(&self) {
3186 let row = self.write_frontier_row();
3187 let updates = vec![(row, Diff::ONE)];
3188 self.send(IntrospectionType::ReplicaFrontiers, updates);
3189 }
3190
3191 fn observe_frontier(&mut self, write_frontier: &Antichain<T>) {
3193 if self.write_frontier == *write_frontier {
3194 return; }
3196
3197 let retraction = self.write_frontier_row();
3198 self.write_frontier.clone_from(write_frontier);
3199 let insertion = self.write_frontier_row();
3200
3201 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3202 self.send(IntrospectionType::ReplicaFrontiers, updates);
3203 }
3204
3205 fn write_frontier_row(&self) -> Row {
3207 let write_frontier = self
3208 .write_frontier
3209 .as_option()
3210 .map_or(Datum::Null, |ts| ts.clone().into());
3211 Row::pack_slice(&[
3212 Datum::String(&self.collection_id.to_string()),
3213 Datum::String(&self.replica_id.to_string()),
3214 write_frontier,
3215 ])
3216 }
3217
3218 fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3219 let _ = self.introspection_tx.send((introspection_type, updates));
3222 }
3223}
3224
3225impl<T: ComputeControllerTimestamp> Drop for ReplicaCollectionIntrospection<T> {
3226 fn drop(&mut self) {
3227 let row = self.write_frontier_row();
3229 let updates = vec![(row, Diff::MINUS_ONE)];
3230 self.send(IntrospectionType::ReplicaFrontiers, updates);
3231 }
3232}