1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::ComputeInstanceId;
21use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
22use mz_compute_types::plan::LirId;
23use mz_compute_types::plan::render_plan::RenderPlan;
24use mz_compute_types::sinks::{
25 ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, MaterializedViewSinkConnection,
26};
27use mz_compute_types::sources::SourceInstanceDesc;
28use mz_controller_types::dyncfgs::{
29 ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION,
30 WALLCLOCK_LAG_RECORDING_INTERVAL,
31};
32use mz_dyncfg::ConfigSet;
33use mz_expr::RowSetFinishing;
34use mz_ore::cast::CastFrom;
35use mz_ore::channel::instrumented_unbounded_channel;
36use mz_ore::now::NowFn;
37use mz_ore::tracing::OpenTelemetryContext;
38use mz_ore::{soft_assert_or_log, soft_panic_or_log};
39use mz_persist_types::PersistLocation;
40use mz_repr::adt::timestamp::CheckedTimestamp;
41use mz_repr::refresh_schedule::RefreshSchedule;
42use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row};
43use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
44use mz_storage_types::read_holds::{self, ReadHold};
45use mz_storage_types::read_policy::ReadPolicy;
46use serde::Serialize;
47use thiserror::Error;
48use timely::PartialOrder;
49use timely::progress::frontier::MutableAntichain;
50use timely::progress::{Antichain, ChangeBatch, Timestamp};
51use tokio::sync::mpsc::error::SendError;
52use tokio::sync::{mpsc, oneshot};
53use uuid::Uuid;
54
55use crate::controller::error::{
56 CollectionLookupError, CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
57};
58use crate::controller::replica::{ReplicaClient, ReplicaConfig};
59use crate::controller::{
60 ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, PeekNotification,
61 ReplicaId, StorageCollections,
62};
63use crate::logging::LogVariant;
64use crate::metrics::IntCounter;
65use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
66use crate::protocol::command::{
67 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
68};
69use crate::protocol::history::ComputeCommandHistory;
70use crate::protocol::response::{
71 ComputeResponse, CopyToResponse, FrontiersResponse, OperatorHydrationStatus, PeekResponse,
72 StatusResponse, SubscribeBatch, SubscribeResponse,
73};
74use crate::service::{ComputeClient, ComputeGrpcClient};
75
76#[derive(Error, Debug)]
77#[error("replica exists already: {0}")]
78pub(super) struct ReplicaExists(pub ReplicaId);
79
80#[derive(Error, Debug)]
81#[error("replica does not exist: {0}")]
82pub(super) struct ReplicaMissing(pub ReplicaId);
83
84#[derive(Error, Debug)]
85pub(super) enum DataflowCreationError {
86 #[error("collection does not exist: {0}")]
87 CollectionMissing(GlobalId),
88 #[error("replica does not exist: {0}")]
89 ReplicaMissing(ReplicaId),
90 #[error("dataflow definition lacks an as_of value")]
91 MissingAsOf,
92 #[error("subscribe dataflow has an empty as_of")]
93 EmptyAsOfForSubscribe,
94 #[error("copy to dataflow has an empty as_of")]
95 EmptyAsOfForCopyTo,
96 #[error("no read hold provided for dataflow import: {0}")]
97 ReadHoldMissing(GlobalId),
98 #[error("insufficient read hold provided for dataflow import: {0}")]
99 ReadHoldInsufficient(GlobalId),
100}
101
102impl From<CollectionMissing> for DataflowCreationError {
103 fn from(error: CollectionMissing) -> Self {
104 Self::CollectionMissing(error.0)
105 }
106}
107
108#[derive(Error, Debug)]
109pub(super) enum PeekError {
110 #[error("replica does not exist: {0}")]
111 ReplicaMissing(ReplicaId),
112 #[error("read hold ID does not match peeked collection: {0}")]
113 ReadHoldIdMismatch(GlobalId),
114 #[error("insufficient read hold provided: {0}")]
115 ReadHoldInsufficient(GlobalId),
116}
117
118#[derive(Error, Debug)]
119pub(super) enum ReadPolicyError {
120 #[error("collection does not exist: {0}")]
121 CollectionMissing(GlobalId),
122 #[error("collection is write-only: {0}")]
123 WriteOnlyCollection(GlobalId),
124}
125
126impl From<CollectionMissing> for ReadPolicyError {
127 fn from(error: CollectionMissing) -> Self {
128 Self::CollectionMissing(error.0)
129 }
130}
131
132pub type Command<T> = Box<dyn FnOnce(&mut Instance<T>) + Send>;
134
135#[derive(Clone, derivative::Derivative)]
137#[derivative(Debug)]
138pub(super) struct Client<T: ComputeControllerTimestamp> {
139 command_tx: mpsc::UnboundedSender<Command<T>>,
141 #[derivative(Debug = "ignore")]
143 read_hold_tx: read_holds::ChangeTx<T>,
144}
145
146impl<T: ComputeControllerTimestamp> Client<T> {
147 pub fn send(&self, command: Command<T>) -> Result<(), SendError<Command<T>>> {
148 self.command_tx.send(command)
149 }
150
151 pub fn read_hold_tx(&self) -> read_holds::ChangeTx<T> {
152 Arc::clone(&self.read_hold_tx)
153 }
154}
155
156impl<T> Client<T>
157where
158 T: ComputeControllerTimestamp,
159 ComputeGrpcClient: ComputeClient<T>,
160{
161 pub fn spawn(
162 id: ComputeInstanceId,
163 build_info: &'static BuildInfo,
164 storage: StorageCollections<T>,
165 peek_stash_persist_location: PersistLocation,
166 arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
167 metrics: InstanceMetrics,
168 now: NowFn,
169 wallclock_lag: WallclockLagFn<T>,
170 dyncfg: Arc<ConfigSet>,
171 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
172 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
173 ) -> Self {
174 let (command_tx, command_rx) = mpsc::unbounded_channel();
175
176 let read_hold_tx: read_holds::ChangeTx<_> = {
177 let command_tx = command_tx.clone();
178 Arc::new(move |id, change: ChangeBatch<_>| {
179 let cmd: Command<_> = {
180 let change = change.clone();
181 Box::new(move |i| i.apply_read_hold_change(id, change.clone()))
182 };
183 command_tx.send(cmd).map_err(|_| SendError((id, change)))
184 })
185 };
186
187 mz_ore::task::spawn(
188 || format!("compute-instance-{id}"),
189 Instance::new(
190 build_info,
191 storage,
192 peek_stash_persist_location,
193 arranged_logs,
194 metrics,
195 now,
196 wallclock_lag,
197 dyncfg,
198 command_rx,
199 response_tx,
200 Arc::clone(&read_hold_tx),
201 introspection_tx,
202 )
203 .run(),
204 );
205
206 Self {
207 command_tx,
208 read_hold_tx,
209 }
210 }
211}
212
213pub(super) type ReplicaResponse<T> = (ReplicaId, u64, ComputeResponse<T>);
216
217pub(super) struct Instance<T: ComputeControllerTimestamp> {
219 build_info: &'static BuildInfo,
221 storage_collections: StorageCollections<T>,
223 initialized: bool,
225 read_only: bool,
231 workload_class: Option<String>,
235 replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
237 collections: BTreeMap<GlobalId, CollectionState<T>>,
245 log_sources: BTreeMap<LogVariant, GlobalId>,
247 peeks: BTreeMap<Uuid, PendingPeek<T>>,
256 subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>,
270 copy_tos: BTreeSet<GlobalId>,
278 history: ComputeCommandHistory<UIntGauge, T>,
280 command_rx: mpsc::UnboundedReceiver<Command<T>>,
282 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
284 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
286 metrics: InstanceMetrics,
288 dyncfg: Arc<ConfigSet>,
290
291 peek_stash_persist_location: PersistLocation,
293
294 now: NowFn,
296 wallclock_lag: WallclockLagFn<T>,
298 wallclock_lag_last_recorded: DateTime<Utc>,
300
301 read_hold_tx: read_holds::ChangeTx<T>,
306 replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
308 replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse<T>, IntCounter>,
310}
311
312impl<T: ComputeControllerTimestamp> Instance<T> {
313 fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, CollectionMissing> {
315 self.collections.get(&id).ok_or(CollectionMissing(id))
316 }
317
318 fn collection_mut(
320 &mut self,
321 id: GlobalId,
322 ) -> Result<&mut CollectionState<T>, CollectionMissing> {
323 self.collections.get_mut(&id).ok_or(CollectionMissing(id))
324 }
325
326 fn expect_collection(&self, id: GlobalId) -> &CollectionState<T> {
332 self.collections.get(&id).expect("collection must exist")
333 }
334
335 fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T> {
341 self.collections
342 .get_mut(&id)
343 .expect("collection must exist")
344 }
345
346 fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState<T>)> {
347 self.collections.iter().map(|(id, coll)| (*id, coll))
348 }
349
350 fn add_collection(
356 &mut self,
357 id: GlobalId,
358 as_of: Antichain<T>,
359 shared: SharedCollectionState<T>,
360 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
361 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
362 replica_input_read_holds: Vec<ReadHold<T>>,
363 write_only: bool,
364 storage_sink: bool,
365 initial_as_of: Option<Antichain<T>>,
366 refresh_schedule: Option<RefreshSchedule>,
367 ) {
368 let introspection = CollectionIntrospection::new(
370 id,
371 self.introspection_tx.clone(),
372 as_of.clone(),
373 storage_sink,
374 initial_as_of,
375 refresh_schedule,
376 );
377 let mut state = CollectionState::new(
378 id,
379 as_of.clone(),
380 shared,
381 storage_dependencies,
382 compute_dependencies,
383 Arc::clone(&self.read_hold_tx),
384 introspection,
385 );
386 if write_only {
388 state.read_policy = None;
389 }
390
391 if let Some(previous) = self.collections.insert(id, state) {
392 panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
393 }
394
395 for replica in self.replicas.values_mut() {
397 replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
398 }
399
400 self.report_dependency_updates(id, Diff::ONE);
402 }
403
404 fn remove_collection(&mut self, id: GlobalId) {
405 self.report_dependency_updates(id, Diff::MINUS_ONE);
407
408 for replica in self.replicas.values_mut() {
410 replica.remove_collection(id);
411 }
412
413 self.collections.remove(&id);
415 }
416
417 fn add_replica_state(
418 &mut self,
419 id: ReplicaId,
420 client: ReplicaClient<T>,
421 config: ReplicaConfig,
422 epoch: u64,
423 ) {
424 let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
425
426 let metrics = self.metrics.for_replica(id);
427 let mut replica = ReplicaState::new(
428 id,
429 client,
430 config,
431 metrics,
432 self.introspection_tx.clone(),
433 epoch,
434 );
435
436 for (collection_id, collection) in &self.collections {
438 if collection.log_collection && !log_ids.contains(collection_id) {
440 continue;
441 }
442
443 let as_of = if collection.log_collection {
444 Antichain::from_elem(T::minimum())
449 } else {
450 collection.read_frontier().to_owned()
451 };
452
453 let input_read_holds = collection.storage_dependencies.values().cloned().collect();
454 replica.add_collection(*collection_id, as_of, input_read_holds);
455 }
456
457 self.replicas.insert(id, replica);
458 }
459
460 fn deliver_response(&self, response: ComputeControllerResponse<T>) {
462 let _ = self.response_tx.send(response);
465 }
466
467 fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
469 let _ = self.introspection_tx.send((type_, updates));
472 }
473
474 fn replica_exists(&self, id: ReplicaId) -> bool {
476 self.replicas.contains_key(&id)
477 }
478
479 fn peeks_targeting(
481 &self,
482 replica_id: ReplicaId,
483 ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)> {
484 self.peeks.iter().filter_map(move |(uuid, peek)| {
485 if peek.target_replica == Some(replica_id) {
486 Some((*uuid, peek))
487 } else {
488 None
489 }
490 })
491 }
492
493 fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
495 self.subscribes.iter().filter_map(move |(id, subscribe)| {
496 let targeting = subscribe.target_replica == Some(replica_id);
497 targeting.then_some(*id)
498 })
499 }
500
501 fn update_frontier_introspection(&mut self) {
510 for collection in self.collections.values_mut() {
511 collection
512 .introspection
513 .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
514 }
515
516 for replica in self.replicas.values_mut() {
517 for collection in replica.collections.values_mut() {
518 collection
519 .introspection
520 .observe_frontier(&collection.write_frontier);
521 }
522 }
523 }
524
525 fn refresh_state_metrics(&self) {
534 let unscheduled_collections_count =
535 self.collections.values().filter(|c| !c.scheduled).count();
536 let connected_replica_count = self
537 .replicas
538 .values()
539 .filter(|r| r.client.is_connected())
540 .count();
541
542 self.metrics
543 .replica_count
544 .set(u64::cast_from(self.replicas.len()));
545 self.metrics
546 .collection_count
547 .set(u64::cast_from(self.collections.len()));
548 self.metrics
549 .collection_unscheduled_count
550 .set(u64::cast_from(unscheduled_collections_count));
551 self.metrics
552 .peek_count
553 .set(u64::cast_from(self.peeks.len()));
554 self.metrics
555 .subscribe_count
556 .set(u64::cast_from(self.subscribes.len()));
557 self.metrics
558 .copy_to_count
559 .set(u64::cast_from(self.copy_tos.len()));
560 self.metrics
561 .connected_replica_count
562 .set(u64::cast_from(connected_replica_count));
563 }
564
565 fn refresh_wallclock_lag(&mut self) {
584 let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
585 Some(ts) => (self.wallclock_lag)(ts.clone()),
586 None => Duration::ZERO,
587 };
588
589 let now_ms = (self.now)();
590 let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
591 let histogram_labels = match &self.workload_class {
592 Some(wc) => [("workload_class", wc.clone())].into(),
593 None => BTreeMap::new(),
594 };
595
596 let mut unreadable_collections = BTreeSet::new();
600 for (id, collection) in &mut self.collections {
601 let read_frontier = match self.storage_collections.collection_frontiers(*id) {
603 Ok(f) => f.read_capabilities,
604 Err(_) => collection.read_frontier(),
605 };
606 let write_frontier = collection.write_frontier();
607 let collection_unreadable = PartialOrder::less_equal(&write_frontier, &read_frontier);
608 if collection_unreadable {
609 unreadable_collections.insert(id);
610 }
611
612 if !ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION.get(&self.dyncfg) {
613 continue;
614 }
615
616 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
617 let bucket = if collection_unreadable {
618 WallclockLag::Undefined
619 } else {
620 let lag = frontier_lag(&write_frontier);
621 let lag = lag.as_secs().next_power_of_two();
622 WallclockLag::Seconds(lag)
623 };
624
625 let key = (histogram_period, bucket, histogram_labels.clone());
626 *stash.entry(key).or_default() += Diff::ONE;
627 }
628 }
629
630 for replica in self.replicas.values_mut() {
632 for (id, collection) in &mut replica.collections {
633 let lag = if unreadable_collections.contains(&id) {
634 WallclockLag::Undefined
635 } else {
636 let lag = frontier_lag(&collection.write_frontier);
637 WallclockLag::Seconds(lag.as_secs())
638 };
639
640 if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
641 *wallclock_lag_max = (*wallclock_lag_max).max(lag);
642 }
643
644 if let Some(metrics) = &mut collection.metrics {
645 let secs = lag.unwrap_seconds_or(u64::MAX);
648 metrics.wallclock_lag.observe(secs);
649 };
650 }
651 }
652
653 self.maybe_record_wallclock_lag();
655 }
656
657 fn maybe_record_wallclock_lag(&mut self) {
665 if self.read_only {
666 return;
667 }
668
669 let duration_trunc = |datetime: DateTime<_>, interval| {
670 let td = TimeDelta::from_std(interval).ok()?;
671 datetime.duration_trunc(td).ok()
672 };
673
674 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
675 let now_dt = mz_ore::now::to_datetime((self.now)());
676 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
677 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
678 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
679 duration_trunc(now_dt, *default).unwrap()
680 });
681 if now_trunc <= self.wallclock_lag_last_recorded {
682 return;
683 }
684
685 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
686
687 let mut history_updates = Vec::new();
688 for (replica_id, replica) in &mut self.replicas {
689 for (collection_id, collection) in &mut replica.collections {
690 let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
691 continue;
692 };
693
694 let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
695 let row = Row::pack_slice(&[
696 Datum::String(&collection_id.to_string()),
697 Datum::String(&replica_id.to_string()),
698 max_lag.into_interval_datum(),
699 Datum::TimestampTz(now_ts),
700 ]);
701 history_updates.push((row, Diff::ONE));
702 }
703 }
704 if !history_updates.is_empty() {
705 self.deliver_introspection_updates(
706 IntrospectionType::WallclockLagHistory,
707 history_updates,
708 );
709 }
710
711 let mut histogram_updates = Vec::new();
712 let mut row_buf = Row::default();
713 for (collection_id, collection) in &mut self.collections {
714 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
715 continue;
716 };
717
718 for ((period, lag, labels), count) in std::mem::take(stash) {
719 let mut packer = row_buf.packer();
720 packer.extend([
721 Datum::TimestampTz(period.start),
722 Datum::TimestampTz(period.end),
723 Datum::String(&collection_id.to_string()),
724 lag.into_uint64_datum(),
725 ]);
726 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
727 packer.push_dict(labels);
728
729 histogram_updates.push((row_buf.clone(), count));
730 }
731 }
732 if !histogram_updates.is_empty() {
733 self.deliver_introspection_updates(
734 IntrospectionType::WallclockLagHistogram,
735 histogram_updates,
736 );
737 }
738
739 self.wallclock_lag_last_recorded = now_trunc;
740 }
741
742 fn report_dependency_updates(&self, id: GlobalId, diff: Diff) {
748 let collection = self.expect_collection(id);
749 let dependencies = collection.dependency_ids();
750
751 let updates = dependencies
752 .map(|dependency_id| {
753 let row = Row::pack_slice(&[
754 Datum::String(&id.to_string()),
755 Datum::String(&dependency_id.to_string()),
756 ]);
757 (row, diff)
758 })
759 .collect();
760
761 self.deliver_introspection_updates(IntrospectionType::ComputeDependencies, updates);
762 }
763
764 fn update_operator_hydration_status(
766 &mut self,
767 replica_id: ReplicaId,
768 status: OperatorHydrationStatus,
769 ) {
770 let Some(replica) = self.replicas.get_mut(&replica_id) else {
771 tracing::error!(
772 %replica_id, ?status,
773 "status update for an unknown replica"
774 );
775 return;
776 };
777 let Some(collection) = replica.collections.get_mut(&status.collection_id) else {
778 tracing::error!(
779 %replica_id, ?status,
780 "status update for an unknown collection"
781 );
782 return;
783 };
784
785 collection.introspection.operator_hydrated(
786 status.lir_id,
787 status.worker_id,
788 status.hydrated,
789 );
790 }
791
792 #[mz_ore::instrument(level = "debug")]
798 pub fn collection_hydrated(
799 &self,
800 collection_id: GlobalId,
801 ) -> Result<bool, CollectionLookupError> {
802 if self.replicas.is_empty() {
803 return Ok(true);
804 }
805
806 for replica_state in self.replicas.values() {
807 let collection_state = replica_state
808 .collections
809 .get(&collection_id)
810 .ok_or(CollectionLookupError::CollectionMissing(collection_id))?;
811
812 if collection_state.hydrated() {
813 return Ok(true);
814 }
815 }
816
817 Ok(false)
818 }
819
820 #[mz_ore::instrument(level = "debug")]
826 pub fn collections_hydrated_on_replicas(
827 &self,
828 target_replica_ids: Option<Vec<ReplicaId>>,
829 exclude_collections: &BTreeSet<GlobalId>,
830 ) -> Result<bool, HydrationCheckBadTarget> {
831 if self.replicas.is_empty() {
832 return Ok(true);
833 }
834 let mut all_hydrated = true;
835 let target_replicas: BTreeSet<ReplicaId> = self
836 .replicas
837 .keys()
838 .filter_map(|id| match target_replica_ids {
839 None => Some(id.clone()),
840 Some(ref ids) if ids.contains(id) => Some(id.clone()),
841 Some(_) => None,
842 })
843 .collect();
844 if let Some(targets) = target_replica_ids {
845 if target_replicas.is_empty() {
846 return Err(HydrationCheckBadTarget(targets));
847 }
848 }
849
850 for (id, _collection) in self.collections_iter() {
851 if id.is_transient() || exclude_collections.contains(&id) {
852 continue;
853 }
854
855 let mut collection_hydrated = false;
856 for replica_state in self.replicas.values() {
857 if !target_replicas.contains(&replica_state.id) {
858 continue;
859 }
860 let collection_state = replica_state
861 .collections
862 .get(&id)
863 .expect("missing collection state");
864
865 if collection_state.hydrated() {
866 collection_hydrated = true;
867 break;
868 }
869 }
870
871 if !collection_hydrated {
872 tracing::info!("collection {id} is not hydrated on any replica");
873 all_hydrated = false;
874 }
877 }
878
879 Ok(all_hydrated)
880 }
881
882 #[mz_ore::instrument(level = "debug")]
888 pub fn collections_hydrated(&self, exclude_collections: &BTreeSet<GlobalId>) -> bool {
889 self.collections_hydrated_on_replicas(None, exclude_collections)
890 .expect("Cannot error if target_replica_ids is None")
891 }
892
893 fn cleanup_collections(&mut self) {
909 let to_remove: Vec<_> = self
910 .collections_iter()
911 .filter(|(id, collection)| {
912 collection.dropped
913 && collection.shared.lock_read_capabilities(|c| c.is_empty())
914 && self
915 .replicas
916 .values()
917 .all(|r| r.collection_frontiers_empty(*id))
918 })
919 .map(|(id, _collection)| id)
920 .collect();
921
922 for id in to_remove {
923 self.remove_collection(id);
924 }
925 }
926
927 #[mz_ore::instrument(level = "debug")]
931 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
932 let Self {
939 build_info: _,
940 storage_collections: _,
941 peek_stash_persist_location: _,
942 initialized,
943 read_only,
944 workload_class,
945 replicas,
946 collections,
947 log_sources: _,
948 peeks,
949 subscribes,
950 copy_tos,
951 history: _,
952 command_rx: _,
953 response_tx: _,
954 introspection_tx: _,
955 metrics: _,
956 dyncfg: _,
957 now: _,
958 wallclock_lag: _,
959 wallclock_lag_last_recorded,
960 read_hold_tx: _,
961 replica_tx: _,
962 replica_rx: _,
963 } = self;
964
965 fn field(
966 key: &str,
967 value: impl Serialize,
968 ) -> Result<(String, serde_json::Value), anyhow::Error> {
969 let value = serde_json::to_value(value)?;
970 Ok((key.to_string(), value))
971 }
972
973 let replicas: BTreeMap<_, _> = replicas
974 .iter()
975 .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
976 .collect::<Result<_, anyhow::Error>>()?;
977 let collections: BTreeMap<_, _> = collections
978 .iter()
979 .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
980 .collect();
981 let peeks: BTreeMap<_, _> = peeks
982 .iter()
983 .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
984 .collect();
985 let subscribes: BTreeMap<_, _> = subscribes
986 .iter()
987 .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
988 .collect();
989 let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
990 let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
991
992 let map = serde_json::Map::from_iter([
993 field("initialized", initialized)?,
994 field("read_only", read_only)?,
995 field("workload_class", workload_class)?,
996 field("replicas", replicas)?,
997 field("collections", collections)?,
998 field("peeks", peeks)?,
999 field("subscribes", subscribes)?,
1000 field("copy_tos", copy_tos)?,
1001 field("wallclock_lag_last_recorded", wallclock_lag_last_recorded)?,
1002 ]);
1003 Ok(serde_json::Value::Object(map))
1004 }
1005}
1006
1007impl<T> Instance<T>
1008where
1009 T: ComputeControllerTimestamp,
1010 ComputeGrpcClient: ComputeClient<T>,
1011{
1012 fn new(
1013 build_info: &'static BuildInfo,
1014 storage: StorageCollections<T>,
1015 peek_stash_persist_location: PersistLocation,
1016 arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
1017 metrics: InstanceMetrics,
1018 now: NowFn,
1019 wallclock_lag: WallclockLagFn<T>,
1020 dyncfg: Arc<ConfigSet>,
1021 command_rx: mpsc::UnboundedReceiver<Command<T>>,
1022 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
1023 read_hold_tx: read_holds::ChangeTx<T>,
1024 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
1025 ) -> Self {
1026 let mut collections = BTreeMap::new();
1027 let mut log_sources = BTreeMap::new();
1028 for (log, id, shared) in arranged_logs {
1029 let collection = CollectionState::new_log_collection(
1030 id,
1031 shared,
1032 Arc::clone(&read_hold_tx),
1033 introspection_tx.clone(),
1034 );
1035 collections.insert(id, collection);
1036 log_sources.insert(log, id);
1037 }
1038
1039 let history = ComputeCommandHistory::new(metrics.for_history());
1040
1041 let send_count = metrics.response_send_count.clone();
1042 let recv_count = metrics.response_recv_count.clone();
1043 let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
1044
1045 let now_dt = mz_ore::now::to_datetime(now());
1046
1047 Self {
1048 build_info,
1049 storage_collections: storage,
1050 peek_stash_persist_location,
1051 initialized: false,
1052 read_only: true,
1053 workload_class: None,
1054 replicas: Default::default(),
1055 collections,
1056 log_sources,
1057 peeks: Default::default(),
1058 subscribes: Default::default(),
1059 copy_tos: Default::default(),
1060 history,
1061 command_rx,
1062 response_tx,
1063 introspection_tx,
1064 metrics,
1065 dyncfg,
1066 now,
1067 wallclock_lag,
1068 wallclock_lag_last_recorded: now_dt,
1069 read_hold_tx,
1070 replica_tx,
1071 replica_rx,
1072 }
1073 }
1074
1075 async fn run(mut self) {
1076 self.send(ComputeCommand::Hello {
1077 nonce: Uuid::default(),
1080 });
1081
1082 let instance_config = InstanceConfig {
1083 peek_stash_persist_location: self.peek_stash_persist_location.clone(),
1084 logging: Default::default(),
1087 expiration_offset: Default::default(),
1088 };
1089
1090 self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
1091
1092 loop {
1093 tokio::select! {
1094 command = self.command_rx.recv() => match command {
1095 Some(cmd) => cmd(&mut self),
1096 None => break,
1097 },
1098 response = self.replica_rx.recv() => match response {
1099 Some(response) => self.handle_response(response),
1100 None => unreachable!("self owns a sender side of the channel"),
1101 }
1102 }
1103 }
1104 }
1105
1106 #[mz_ore::instrument(level = "debug")]
1108 pub fn update_configuration(&mut self, config_params: ComputeParameters) {
1109 if let Some(workload_class) = &config_params.workload_class {
1110 self.workload_class = workload_class.clone();
1111 }
1112
1113 let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
1114 self.send(command);
1115 }
1116
1117 #[mz_ore::instrument(level = "debug")]
1122 pub fn initialization_complete(&mut self) {
1123 if !self.initialized {
1125 self.send(ComputeCommand::InitializationComplete);
1126 self.initialized = true;
1127 }
1128 }
1129
1130 #[mz_ore::instrument(level = "debug")]
1134 pub fn allow_writes(&mut self) {
1135 if self.read_only {
1136 self.read_only = false;
1137 self.send(ComputeCommand::AllowWrites);
1138 }
1139 }
1140
1141 #[mz_ore::instrument(level = "debug")]
1152 pub fn shutdown(&mut self) {
1153 let (_tx, rx) = mpsc::unbounded_channel();
1155 let mut command_rx = std::mem::replace(&mut self.command_rx, rx);
1156
1157 while let Ok(cmd) = command_rx.try_recv() {
1163 cmd(self);
1164 }
1165
1166 self.cleanup_collections();
1168
1169 let stray_replicas: Vec<_> = self.replicas.keys().collect();
1170 soft_assert_or_log!(
1171 stray_replicas.is_empty(),
1172 "dropped instance still has provisioned replicas: {stray_replicas:?}",
1173 );
1174
1175 let collections = self.collections.iter();
1176 let stray_collections: Vec<_> = collections
1177 .filter(|(_, c)| !c.log_collection)
1178 .map(|(id, _)| id)
1179 .collect();
1180 soft_assert_or_log!(
1181 stray_collections.is_empty(),
1182 "dropped instance still has installed collections: {stray_collections:?}",
1183 );
1184 }
1185
1186 #[mz_ore::instrument(level = "debug")]
1188 fn send(&mut self, cmd: ComputeCommand<T>) {
1189 self.history.push(cmd.clone());
1191
1192 for replica in self.replicas.values_mut() {
1194 let _ = replica.client.send(cmd.clone());
1196 }
1197 }
1198
1199 #[mz_ore::instrument(level = "debug")]
1201 pub fn add_replica(
1202 &mut self,
1203 id: ReplicaId,
1204 mut config: ReplicaConfig,
1205 epoch: Option<u64>,
1206 ) -> Result<(), ReplicaExists> {
1207 if self.replica_exists(id) {
1208 return Err(ReplicaExists(id));
1209 }
1210
1211 config.logging.index_logs = self.log_sources.clone();
1212
1213 let epoch = epoch.unwrap_or(1);
1214 let metrics = self.metrics.for_replica(id);
1215 let client = ReplicaClient::spawn(
1216 id,
1217 self.build_info,
1218 config.clone(),
1219 epoch,
1220 metrics.clone(),
1221 Arc::clone(&self.dyncfg),
1222 self.replica_tx.clone(),
1223 );
1224
1225 self.history.reduce();
1227
1228 self.history.update_source_uppers(&self.storage_collections);
1230
1231 for command in self.history.iter() {
1233 if client.send(command.clone()).is_err() {
1234 tracing::warn!("Replica {:?} connection terminated during hydration", id);
1237 break;
1238 }
1239 }
1240
1241 self.add_replica_state(id, client, config, epoch);
1243
1244 Ok(())
1245 }
1246
1247 #[mz_ore::instrument(level = "debug")]
1249 pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1250 self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1251
1252 let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1256 for subscribe_id in to_drop {
1257 let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1258 let response = ComputeControllerResponse::SubscribeResponse(
1259 subscribe_id,
1260 SubscribeBatch {
1261 lower: subscribe.frontier.clone(),
1262 upper: subscribe.frontier,
1263 updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1264 },
1265 );
1266 self.deliver_response(response);
1267 }
1268
1269 let mut peek_responses = Vec::new();
1274 let mut to_drop = Vec::new();
1275 for (uuid, peek) in self.peeks_targeting(id) {
1276 peek_responses.push(ComputeControllerResponse::PeekNotification(
1277 uuid,
1278 PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1279 peek.otel_ctx.clone(),
1280 ));
1281 to_drop.push(uuid);
1282 }
1283 for response in peek_responses {
1284 self.deliver_response(response);
1285 }
1286 for uuid in to_drop {
1287 let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1288 self.finish_peek(uuid, response);
1289 }
1290
1291 self.forward_implied_capabilities();
1294
1295 Ok(())
1296 }
1297
1298 fn rehydrate_replica(&mut self, id: ReplicaId) {
1304 let config = self.replicas[&id].config.clone();
1305 let epoch = self.replicas[&id].epoch + 1;
1306
1307 self.remove_replica(id).expect("replica must exist");
1308 let result = self.add_replica(id, config, Some(epoch));
1309
1310 match result {
1311 Ok(()) => (),
1312 Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1313 }
1314 }
1315
1316 fn rehydrate_failed_replicas(&mut self) {
1318 let replicas = self.replicas.iter();
1319 let failed_replicas: Vec<_> = replicas
1320 .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1321 .collect();
1322
1323 for replica_id in failed_replicas {
1324 self.rehydrate_replica(replica_id);
1325 }
1326 }
1327
1328 #[mz_ore::instrument(level = "debug")]
1337 pub fn create_dataflow(
1338 &mut self,
1339 dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1340 import_read_holds: Vec<ReadHold<T>>,
1341 subscribe_target_replica: Option<ReplicaId>,
1342 mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState<T>>,
1343 ) -> Result<(), DataflowCreationError> {
1344 use DataflowCreationError::*;
1345
1346 if let Some(replica_id) = subscribe_target_replica {
1347 if !self.replica_exists(replica_id) {
1348 return Err(ReplicaMissing(replica_id));
1349 }
1350 }
1351
1352 let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1354 if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1355 return Err(EmptyAsOfForSubscribe);
1356 }
1357 if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1358 return Err(EmptyAsOfForCopyTo);
1359 }
1360
1361 let mut storage_dependencies = BTreeMap::new();
1363 let mut compute_dependencies = BTreeMap::new();
1364
1365 let mut replica_input_read_holds = Vec::new();
1370
1371 let mut import_read_holds: BTreeMap<_, _> =
1372 import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1373
1374 for &id in dataflow.source_imports.keys() {
1375 let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1376 replica_input_read_holds.push(read_hold.clone());
1377
1378 read_hold
1379 .try_downgrade(as_of.clone())
1380 .map_err(|_| ReadHoldInsufficient(id))?;
1381 storage_dependencies.insert(id, read_hold);
1382 }
1383
1384 for &id in dataflow.index_imports.keys() {
1385 let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1386 read_hold
1387 .try_downgrade(as_of.clone())
1388 .map_err(|_| ReadHoldInsufficient(id))?;
1389 compute_dependencies.insert(id, read_hold);
1390 }
1391
1392 if as_of.is_empty() {
1395 replica_input_read_holds = Default::default();
1396 }
1397
1398 for export_id in dataflow.export_ids() {
1400 let shared = shared_collection_state
1401 .remove(&export_id)
1402 .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1403 let write_only = dataflow.sink_exports.contains_key(&export_id);
1404 let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1405
1406 self.add_collection(
1407 export_id,
1408 as_of.clone(),
1409 shared,
1410 storage_dependencies.clone(),
1411 compute_dependencies.clone(),
1412 replica_input_read_holds.clone(),
1413 write_only,
1414 storage_sink,
1415 dataflow.initial_storage_as_of.clone(),
1416 dataflow.refresh_schedule.clone(),
1417 );
1418
1419 if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1422 self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1423 }
1424 }
1425
1426 for subscribe_id in dataflow.subscribe_ids() {
1428 self.subscribes
1429 .insert(subscribe_id, ActiveSubscribe::new(subscribe_target_replica));
1430 }
1431
1432 for copy_to_id in dataflow.copy_to_ids() {
1434 self.copy_tos.insert(copy_to_id);
1435 }
1436
1437 let mut source_imports = BTreeMap::new();
1440 for (id, (si, monotonic, _upper)) in dataflow.source_imports {
1441 let frontiers = self
1442 .storage_collections
1443 .collection_frontiers(id)
1444 .expect("collection exists");
1445
1446 let collection_metadata = self
1447 .storage_collections
1448 .collection_metadata(id)
1449 .expect("we have a read hold on this collection");
1450
1451 let desc = SourceInstanceDesc {
1452 storage_metadata: collection_metadata.clone(),
1453 arguments: si.arguments,
1454 typ: si.typ.clone(),
1455 };
1456 source_imports.insert(id, (desc, monotonic, frontiers.write_frontier));
1457 }
1458
1459 let mut sink_exports = BTreeMap::new();
1460 for (id, se) in dataflow.sink_exports {
1461 let connection = match se.connection {
1462 ComputeSinkConnection::MaterializedView(conn) => {
1463 let metadata = self
1464 .storage_collections
1465 .collection_metadata(id)
1466 .map_err(|_| CollectionMissing(id))?
1467 .clone();
1468 let conn = MaterializedViewSinkConnection {
1469 value_desc: conn.value_desc,
1470 storage_metadata: metadata,
1471 };
1472 ComputeSinkConnection::MaterializedView(conn)
1473 }
1474 ComputeSinkConnection::ContinualTask(conn) => {
1475 let metadata = self
1476 .storage_collections
1477 .collection_metadata(id)
1478 .map_err(|_| DataflowCreationError::CollectionMissing(id))?
1479 .clone();
1480 let conn = ContinualTaskConnection {
1481 input_id: conn.input_id,
1482 storage_metadata: metadata,
1483 };
1484 ComputeSinkConnection::ContinualTask(conn)
1485 }
1486 ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1487 ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1488 ComputeSinkConnection::CopyToS3Oneshot(conn)
1489 }
1490 };
1491 let desc = ComputeSinkDesc {
1492 from: se.from,
1493 from_desc: se.from_desc,
1494 connection,
1495 with_snapshot: se.with_snapshot,
1496 up_to: se.up_to,
1497 non_null_assertions: se.non_null_assertions,
1498 refresh_schedule: se.refresh_schedule,
1499 };
1500 sink_exports.insert(id, desc);
1501 }
1502
1503 let objects_to_build = dataflow
1505 .objects_to_build
1506 .into_iter()
1507 .map(|object| BuildDesc {
1508 id: object.id,
1509 plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1510 })
1511 .collect();
1512
1513 let augmented_dataflow = DataflowDescription {
1514 source_imports,
1515 sink_exports,
1516 objects_to_build,
1517 index_imports: dataflow.index_imports,
1519 index_exports: dataflow.index_exports,
1520 as_of: dataflow.as_of.clone(),
1521 until: dataflow.until,
1522 initial_storage_as_of: dataflow.initial_storage_as_of,
1523 refresh_schedule: dataflow.refresh_schedule,
1524 debug_name: dataflow.debug_name,
1525 time_dependence: dataflow.time_dependence,
1526 };
1527
1528 if augmented_dataflow.is_transient() {
1529 tracing::debug!(
1530 name = %augmented_dataflow.debug_name,
1531 import_ids = %augmented_dataflow.display_import_ids(),
1532 export_ids = %augmented_dataflow.display_export_ids(),
1533 as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1534 until = ?augmented_dataflow.until.elements(),
1535 "creating dataflow",
1536 );
1537 } else {
1538 tracing::info!(
1539 name = %augmented_dataflow.debug_name,
1540 import_ids = %augmented_dataflow.display_import_ids(),
1541 export_ids = %augmented_dataflow.display_export_ids(),
1542 as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1543 until = ?augmented_dataflow.until.elements(),
1544 "creating dataflow",
1545 );
1546 }
1547
1548 if as_of.is_empty() {
1551 tracing::info!(
1552 name = %augmented_dataflow.debug_name,
1553 "not sending `CreateDataflow`, because of empty `as_of`",
1554 );
1555 } else {
1556 let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1557 let dataflow = Box::new(augmented_dataflow);
1558 self.send(ComputeCommand::CreateDataflow(dataflow));
1559
1560 for id in collections {
1561 self.maybe_schedule_collection(id);
1562 }
1563 }
1564
1565 Ok(())
1566 }
1567
1568 fn maybe_schedule_collection(&mut self, id: GlobalId) {
1574 let collection = self.expect_collection(id);
1575
1576 if collection.scheduled {
1578 return;
1579 }
1580
1581 let as_of = collection.read_frontier();
1582
1583 if as_of.is_empty() {
1586 return;
1587 }
1588
1589 let ready = if id.is_transient() {
1590 true
1596 } else {
1597 let not_self_dep = |x: &GlobalId| *x != id;
1603
1604 let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1609 let compute_frontiers = compute_deps.map(|id| {
1610 let dep = &self.expect_collection(id);
1611 dep.write_frontier()
1612 });
1613
1614 let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1615 let storage_frontiers = self
1616 .storage_collections
1617 .collections_frontiers(storage_deps.collect())
1618 .expect("must exist");
1619 let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1620
1621 let ready = compute_frontiers
1622 .chain(storage_frontiers)
1623 .all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1624
1625 ready
1626 };
1627
1628 if ready {
1629 self.send(ComputeCommand::Schedule(id));
1630 let collection = self.expect_collection_mut(id);
1631 collection.scheduled = true;
1632 }
1633 }
1634
1635 fn schedule_collections(&mut self) {
1637 let ids: Vec<_> = self.collections.keys().copied().collect();
1638 for id in ids {
1639 self.maybe_schedule_collection(id);
1640 }
1641 }
1642
1643 #[mz_ore::instrument(level = "debug")]
1646 pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1647 for id in &ids {
1648 let collection = self.collection_mut(*id)?;
1649
1650 collection.dropped = true;
1652
1653 collection.implied_read_hold.release();
1656 collection.warmup_read_hold.release();
1657
1658 self.subscribes.remove(id);
1661 self.copy_tos.remove(id);
1664 }
1665
1666 Ok(())
1667 }
1668
1669 #[mz_ore::instrument(level = "debug")]
1671 pub fn peek(
1672 &mut self,
1673 peek_target: PeekTarget,
1674 literal_constraints: Option<Vec<Row>>,
1675 uuid: Uuid,
1676 timestamp: T,
1677 result_desc: RelationDesc,
1678 finishing: RowSetFinishing,
1679 map_filter_project: mz_expr::SafeMfpPlan,
1680 mut read_hold: ReadHold<T>,
1681 target_replica: Option<ReplicaId>,
1682 peek_response_tx: oneshot::Sender<PeekResponse>,
1683 ) -> Result<(), PeekError> {
1684 use PeekError::*;
1685
1686 let target_id = peek_target.id();
1688 if read_hold.id() != target_id {
1689 return Err(ReadHoldIdMismatch(read_hold.id()));
1690 }
1691 read_hold
1692 .try_downgrade(Antichain::from_elem(timestamp.clone()))
1693 .map_err(|_| ReadHoldInsufficient(target_id))?;
1694
1695 if let Some(target) = target_replica {
1696 if !self.replica_exists(target) {
1697 return Err(ReplicaMissing(target));
1698 }
1699 }
1700
1701 let otel_ctx = OpenTelemetryContext::obtain();
1702
1703 self.peeks.insert(
1704 uuid,
1705 PendingPeek {
1706 target_replica,
1707 otel_ctx: otel_ctx.clone(),
1709 requested_at: Instant::now(),
1710 read_hold,
1711 peek_response_tx,
1712 limit: finishing.limit.map(usize::cast_from),
1713 offset: finishing.offset,
1714 },
1715 );
1716
1717 let peek = Peek {
1718 literal_constraints,
1719 uuid,
1720 timestamp,
1721 finishing,
1722 map_filter_project,
1723 otel_ctx,
1726 target: peek_target,
1727 result_desc,
1728 };
1729 self.send(ComputeCommand::Peek(Box::new(peek)));
1730
1731 Ok(())
1732 }
1733
1734 #[mz_ore::instrument(level = "debug")]
1736 pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1737 let Some(peek) = self.peeks.get_mut(&uuid) else {
1738 tracing::warn!("did not find pending peek for {uuid}");
1739 return;
1740 };
1741
1742 let duration = peek.requested_at.elapsed();
1743 self.metrics
1744 .observe_peek_response(&PeekResponse::Canceled, duration);
1745
1746 let otel_ctx = peek.otel_ctx.clone();
1748 otel_ctx.attach_as_parent();
1749
1750 self.deliver_response(ComputeControllerResponse::PeekNotification(
1751 uuid,
1752 PeekNotification::Canceled,
1753 otel_ctx,
1754 ));
1755
1756 self.finish_peek(uuid, reason);
1759 }
1760
1761 #[mz_ore::instrument(level = "debug")]
1773 pub fn set_read_policy(
1774 &mut self,
1775 policies: Vec<(GlobalId, ReadPolicy<T>)>,
1776 ) -> Result<(), ReadPolicyError> {
1777 for (id, _policy) in &policies {
1780 let collection = self.collection(*id)?;
1781 if collection.read_policy.is_none() {
1782 return Err(ReadPolicyError::WriteOnlyCollection(*id));
1783 }
1784 }
1785
1786 for (id, new_policy) in policies {
1787 let collection = self.expect_collection_mut(id);
1788 let new_since = new_policy.frontier(collection.write_frontier().borrow());
1789 let _ = collection.implied_read_hold.try_downgrade(new_since);
1790 collection.read_policy = Some(new_policy);
1791 }
1792
1793 Ok(())
1794 }
1795
1796 #[mz_ore::instrument(level = "debug")]
1804 fn maybe_update_global_write_frontier(&mut self, id: GlobalId, new_frontier: Antichain<T>) {
1805 let collection = self.expect_collection_mut(id);
1806
1807 let advanced = collection.shared.lock_write_frontier(|f| {
1808 let advanced = PartialOrder::less_than(f, &new_frontier);
1809 if advanced {
1810 f.clone_from(&new_frontier);
1811 }
1812 advanced
1813 });
1814
1815 if !advanced {
1816 return;
1817 }
1818
1819 let new_since = match &collection.read_policy {
1821 Some(read_policy) => {
1822 read_policy.frontier(new_frontier.borrow())
1825 }
1826 None => {
1827 Antichain::from_iter(
1836 new_frontier
1837 .iter()
1838 .map(|t| t.step_back().unwrap_or_else(T::minimum)),
1839 )
1840 }
1841 };
1842 let _ = collection.implied_read_hold.try_downgrade(new_since);
1843
1844 self.deliver_response(ComputeControllerResponse::FrontierUpper {
1846 id,
1847 upper: new_frontier,
1848 });
1849 }
1850
1851 fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch<T>) {
1853 let Some(collection) = self.collections.get_mut(&id) else {
1854 soft_panic_or_log!(
1855 "read hold change for absent collection (id={id}, changes={update:?})"
1856 );
1857 return;
1858 };
1859
1860 let new_since = collection.shared.lock_read_capabilities(|caps| {
1861 let read_frontier = caps.frontier();
1864 for (time, diff) in update.iter() {
1865 let count = caps.count_for(time) + diff;
1866 assert!(
1867 count >= 0,
1868 "invalid read capabilities update: negative capability \
1869 (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1870 );
1871 assert!(
1872 count == 0 || read_frontier.less_equal(time),
1873 "invalid read capabilities update: frontier regression \
1874 (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1875 );
1876 }
1877
1878 let changes = caps.update_iter(update.drain());
1881
1882 let changed = changes.count() > 0;
1883 changed.then(|| caps.frontier().to_owned())
1884 });
1885
1886 let Some(new_since) = new_since else {
1887 return; };
1889
1890 for read_hold in collection.compute_dependencies.values_mut() {
1892 read_hold
1893 .try_downgrade(new_since.clone())
1894 .expect("frontiers don't regress");
1895 }
1896 for read_hold in collection.storage_dependencies.values_mut() {
1897 read_hold
1898 .try_downgrade(new_since.clone())
1899 .expect("frontiers don't regress");
1900 }
1901
1902 self.send(ComputeCommand::AllowCompaction {
1904 id,
1905 frontier: new_since,
1906 });
1907 }
1908
1909 fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1918 let Some(peek) = self.peeks.remove(&uuid) else {
1919 return;
1920 };
1921
1922 let _ = peek.peek_response_tx.send(response);
1924
1925 self.send(ComputeCommand::CancelPeek { uuid });
1928
1929 drop(peek.read_hold);
1930 }
1931
1932 fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse<T>) {
1935 if self
1937 .replicas
1938 .get(&replica_id)
1939 .filter(|replica| replica.epoch == epoch)
1940 .is_none()
1941 {
1942 return;
1943 }
1944
1945 match response {
1948 ComputeResponse::Frontiers(id, frontiers) => {
1949 self.handle_frontiers_response(id, frontiers, replica_id);
1950 }
1951 ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
1952 self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
1953 }
1954 ComputeResponse::CopyToResponse(id, response) => {
1955 self.handle_copy_to_response(id, response, replica_id);
1956 }
1957 ComputeResponse::SubscribeResponse(id, response) => {
1958 self.handle_subscribe_response(id, response, replica_id);
1959 }
1960 ComputeResponse::Status(response) => {
1961 self.handle_status_response(response, replica_id);
1962 }
1963 }
1964 }
1965
1966 fn handle_frontiers_response(
1969 &mut self,
1970 id: GlobalId,
1971 frontiers: FrontiersResponse<T>,
1972 replica_id: ReplicaId,
1973 ) {
1974 if !self.collections.contains_key(&id) {
1975 soft_panic_or_log!(
1976 "frontiers update for an unknown collection \
1977 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1978 );
1979 return;
1980 }
1981 let Some(replica) = self.replicas.get_mut(&replica_id) else {
1982 soft_panic_or_log!(
1983 "frontiers update for an unknown replica \
1984 (replica_id={replica_id}, frontiers={frontiers:?})"
1985 );
1986 return;
1987 };
1988 let Some(replica_collection) = replica.collections.get_mut(&id) else {
1989 soft_panic_or_log!(
1990 "frontiers update for an unknown replica collection \
1991 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1992 );
1993 return;
1994 };
1995
1996 if let Some(new_frontier) = frontiers.input_frontier {
1997 replica_collection.update_input_frontier(new_frontier.clone());
1998 }
1999 if let Some(new_frontier) = frontiers.output_frontier {
2000 replica_collection.update_output_frontier(new_frontier.clone());
2001 }
2002 if let Some(new_frontier) = frontiers.write_frontier {
2003 replica_collection.update_write_frontier(new_frontier.clone());
2004 self.maybe_update_global_write_frontier(id, new_frontier);
2005 }
2006 }
2007
2008 #[mz_ore::instrument(level = "debug")]
2009 fn handle_peek_response(
2010 &mut self,
2011 uuid: Uuid,
2012 response: PeekResponse,
2013 otel_ctx: OpenTelemetryContext,
2014 replica_id: ReplicaId,
2015 ) {
2016 otel_ctx.attach_as_parent();
2017
2018 let Some(peek) = self.peeks.get(&uuid) else {
2021 return;
2022 };
2023
2024 let target_replica = peek.target_replica.unwrap_or(replica_id);
2026 if target_replica != replica_id {
2027 return;
2028 }
2029
2030 let duration = peek.requested_at.elapsed();
2031 self.metrics.observe_peek_response(&response, duration);
2032
2033 let notification = PeekNotification::new(&response, peek.offset, peek.limit);
2034 self.deliver_response(ComputeControllerResponse::PeekNotification(
2037 uuid,
2038 notification,
2039 otel_ctx,
2040 ));
2041
2042 self.finish_peek(uuid, response)
2043 }
2044
2045 fn handle_copy_to_response(
2046 &mut self,
2047 sink_id: GlobalId,
2048 response: CopyToResponse,
2049 replica_id: ReplicaId,
2050 ) {
2051 if !self.collections.contains_key(&sink_id) {
2052 soft_panic_or_log!(
2053 "received response for an unknown copy-to \
2054 (sink_id={sink_id}, replica_id={replica_id})",
2055 );
2056 return;
2057 }
2058 let Some(replica) = self.replicas.get_mut(&replica_id) else {
2059 soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
2060 return;
2061 };
2062 let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
2063 soft_panic_or_log!(
2064 "copy-to response for an unknown replica collection \
2065 (sink_id={sink_id}, replica_id={replica_id})"
2066 );
2067 return;
2068 };
2069
2070 replica_collection.update_write_frontier(Antichain::new());
2074 replica_collection.update_input_frontier(Antichain::new());
2075 replica_collection.update_output_frontier(Antichain::new());
2076
2077 if !self.copy_tos.remove(&sink_id) {
2080 return;
2081 }
2082
2083 let result = match response {
2084 CopyToResponse::RowCount(count) => Ok(count),
2085 CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
2086 CopyToResponse::Dropped => {
2091 tracing::error!(
2092 %sink_id, %replica_id,
2093 "received `Dropped` response for a tracked copy to",
2094 );
2095 return;
2096 }
2097 };
2098
2099 self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2100 }
2101
2102 fn handle_subscribe_response(
2103 &mut self,
2104 subscribe_id: GlobalId,
2105 response: SubscribeResponse<T>,
2106 replica_id: ReplicaId,
2107 ) {
2108 if !self.collections.contains_key(&subscribe_id) {
2109 soft_panic_or_log!(
2110 "received response for an unknown subscribe \
2111 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2112 );
2113 return;
2114 }
2115 let Some(replica) = self.replicas.get_mut(&replica_id) else {
2116 soft_panic_or_log!(
2117 "subscribe response for an unknown replica (replica_id={replica_id})"
2118 );
2119 return;
2120 };
2121 let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2122 soft_panic_or_log!(
2123 "subscribe response for an unknown replica collection \
2124 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2125 );
2126 return;
2127 };
2128
2129 let write_frontier = match &response {
2133 SubscribeResponse::Batch(batch) => batch.upper.clone(),
2134 SubscribeResponse::DroppedAt(_) => Antichain::new(),
2135 };
2136
2137 replica_collection.update_write_frontier(write_frontier.clone());
2141 replica_collection.update_input_frontier(write_frontier.clone());
2142 replica_collection.update_output_frontier(write_frontier.clone());
2143
2144 let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2146 return;
2147 };
2148 let replica_targeted = subscribe.target_replica.unwrap_or(replica_id) == replica_id;
2149 if !replica_targeted {
2150 return;
2151 }
2152
2153 self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2159
2160 match response {
2161 SubscribeResponse::Batch(batch) => {
2162 let upper = batch.upper;
2163 let mut updates = batch.updates;
2164
2165 if PartialOrder::less_than(&subscribe.frontier, &upper) {
2168 let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2169
2170 if upper.is_empty() {
2171 self.subscribes.remove(&subscribe_id);
2173 } else {
2174 self.subscribes.insert(subscribe_id, subscribe);
2176 }
2177
2178 if let Ok(updates) = updates.as_mut() {
2179 updates.retain(|(time, _data, _diff)| lower.less_equal(time));
2180 }
2181 self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2182 subscribe_id,
2183 SubscribeBatch {
2184 lower,
2185 upper,
2186 updates,
2187 },
2188 ));
2189 }
2190 }
2191 SubscribeResponse::DroppedAt(frontier) => {
2192 tracing::error!(
2197 %subscribe_id,
2198 %replica_id,
2199 frontier = ?frontier.elements(),
2200 "received `DroppedAt` response for a tracked subscribe",
2201 );
2202 self.subscribes.remove(&subscribe_id);
2203 }
2204 }
2205 }
2206
2207 fn handle_status_response(&mut self, response: StatusResponse, replica_id: ReplicaId) {
2208 match response {
2209 StatusResponse::OperatorHydration(status) => {
2210 self.update_operator_hydration_status(replica_id, status)
2211 }
2212 }
2213 }
2214
2215 fn dependency_write_frontiers<'b>(
2217 &'b self,
2218 collection: &'b CollectionState<T>,
2219 ) -> impl Iterator<Item = Antichain<T>> + 'b {
2220 let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2221 let collection = self.collections.get(&dep_id);
2222 collection.map(|c| c.write_frontier())
2223 });
2224 let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2225 let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2226 frontiers.map(|f| f.write_frontier)
2227 });
2228
2229 compute_frontiers.chain(storage_frontiers)
2230 }
2231
2232 fn transitive_storage_dependency_write_frontiers<'b>(
2234 &'b self,
2235 collection: &'b CollectionState<T>,
2236 ) -> impl Iterator<Item = Antichain<T>> + 'b {
2237 let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2238 let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2239 let mut done = BTreeSet::new();
2240
2241 while let Some(id) = todo.pop() {
2242 if done.contains(&id) {
2243 continue;
2244 }
2245 if let Some(dep) = self.collections.get(&id) {
2246 storage_ids.extend(dep.storage_dependency_ids());
2247 todo.extend(dep.compute_dependency_ids())
2248 }
2249 done.insert(id);
2250 }
2251
2252 let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2253 let frontiers = self.storage_collections.collection_frontiers(id).ok();
2254 frontiers.map(|f| f.write_frontier)
2255 });
2256
2257 storage_frontiers
2258 }
2259
2260 fn downgrade_warmup_capabilities(&mut self) {
2273 let mut new_capabilities = BTreeMap::new();
2274 for (id, collection) in &self.collections {
2275 if collection.read_policy.is_none()
2279 && collection.shared.lock_write_frontier(|f| f.is_empty())
2280 {
2281 new_capabilities.insert(*id, Antichain::new());
2282 continue;
2283 }
2284
2285 let mut new_capability = Antichain::new();
2286 for frontier in self.dependency_write_frontiers(collection) {
2287 for time in frontier {
2288 new_capability.insert(time.step_back().unwrap_or(time));
2289 }
2290 }
2291
2292 new_capabilities.insert(*id, new_capability);
2293 }
2294
2295 for (id, new_capability) in new_capabilities {
2296 let collection = self.expect_collection_mut(id);
2297 let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2298 }
2299 }
2300
2301 fn forward_implied_capabilities(&mut self) {
2329 if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2330 return;
2331 }
2332 if !self.replicas.is_empty() {
2333 return;
2334 }
2335
2336 let mut new_capabilities = BTreeMap::new();
2337 for (id, collection) in &self.collections {
2338 let Some(read_policy) = &collection.read_policy else {
2339 continue;
2341 };
2342
2343 let mut dep_frontier = Antichain::new();
2347 for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2348 dep_frontier.extend(frontier);
2349 }
2350
2351 let new_capability = read_policy.frontier(dep_frontier.borrow());
2352 if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2353 new_capabilities.insert(*id, new_capability);
2354 }
2355 }
2356
2357 for (id, new_capability) in new_capabilities {
2358 let collection = self.expect_collection_mut(id);
2359 let _ = collection.implied_read_hold.try_downgrade(new_capability);
2360 }
2361 }
2362
2363 #[mz_ore::instrument(level = "debug")]
2369 pub fn maintain(&mut self) {
2370 self.rehydrate_failed_replicas();
2371 self.downgrade_warmup_capabilities();
2372 self.forward_implied_capabilities();
2373 self.schedule_collections();
2374 self.cleanup_collections();
2375 self.update_frontier_introspection();
2376 self.refresh_state_metrics();
2377 self.refresh_wallclock_lag();
2378 }
2379}
2380
2381#[derive(Debug)]
2386struct CollectionState<T: ComputeControllerTimestamp> {
2387 log_collection: bool,
2391 dropped: bool,
2397 scheduled: bool,
2400
2401 shared: SharedCollectionState<T>,
2403
2404 implied_read_hold: ReadHold<T>,
2411 warmup_read_hold: ReadHold<T>,
2419 read_policy: Option<ReadPolicy<T>>,
2425
2426 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2429 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2432
2433 introspection: CollectionIntrospection<T>,
2435
2436 wallclock_lag_histogram_stash: Option<
2443 BTreeMap<
2444 (
2445 WallclockLagHistogramPeriod,
2446 WallclockLag,
2447 BTreeMap<&'static str, String>,
2448 ),
2449 Diff,
2450 >,
2451 >,
2452}
2453
2454impl<T: ComputeControllerTimestamp> CollectionState<T> {
2455 fn new(
2457 collection_id: GlobalId,
2458 as_of: Antichain<T>,
2459 shared: SharedCollectionState<T>,
2460 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2461 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2462 read_hold_tx: read_holds::ChangeTx<T>,
2463 introspection: CollectionIntrospection<T>,
2464 ) -> Self {
2465 let since = as_of.clone();
2467 let upper = as_of;
2469
2470 assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2472 assert!(shared.lock_write_frontier(|f| f == &upper));
2473
2474 let implied_read_hold =
2478 ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2479 let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2480
2481 let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2482 shared.lock_read_capabilities(|c| {
2483 c.update_iter(updates);
2484 });
2485
2486 let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2490 true => None,
2491 false => Some(Default::default()),
2492 };
2493
2494 Self {
2495 log_collection: false,
2496 dropped: false,
2497 scheduled: false,
2498 shared,
2499 implied_read_hold,
2500 warmup_read_hold,
2501 read_policy: Some(ReadPolicy::ValidFrom(since)),
2502 storage_dependencies,
2503 compute_dependencies,
2504 introspection,
2505 wallclock_lag_histogram_stash,
2506 }
2507 }
2508
2509 fn new_log_collection(
2511 id: GlobalId,
2512 shared: SharedCollectionState<T>,
2513 read_hold_tx: read_holds::ChangeTx<T>,
2514 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2515 ) -> Self {
2516 let since = Antichain::from_elem(T::minimum());
2517 let introspection =
2518 CollectionIntrospection::new(id, introspection_tx, since.clone(), false, None, None);
2519 let mut state = Self::new(
2520 id,
2521 since,
2522 shared,
2523 Default::default(),
2524 Default::default(),
2525 read_hold_tx,
2526 introspection,
2527 );
2528 state.log_collection = true;
2529 state.scheduled = true;
2531 state
2532 }
2533
2534 fn read_frontier(&self) -> Antichain<T> {
2536 self.shared
2537 .lock_read_capabilities(|c| c.frontier().to_owned())
2538 }
2539
2540 fn write_frontier(&self) -> Antichain<T> {
2542 self.shared.lock_write_frontier(|f| f.clone())
2543 }
2544
2545 fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2546 self.storage_dependencies.keys().copied()
2547 }
2548
2549 fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2550 self.compute_dependencies.keys().copied()
2551 }
2552
2553 fn dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2555 self.compute_dependency_ids()
2556 .chain(self.storage_dependency_ids())
2557 }
2558}
2559
2560#[derive(Clone, Debug)]
2571pub(super) struct SharedCollectionState<T> {
2572 read_capabilities: Arc<Mutex<MutableAntichain<T>>>,
2584 write_frontier: Arc<Mutex<Antichain<T>>>,
2586}
2587
2588impl<T: Timestamp> SharedCollectionState<T> {
2589 pub fn new(as_of: Antichain<T>) -> Self {
2590 let since = as_of.clone();
2592 let upper = as_of;
2594
2595 let mut read_capabilities = MutableAntichain::new();
2599 read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2600
2601 Self {
2602 read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2603 write_frontier: Arc::new(Mutex::new(upper)),
2604 }
2605 }
2606
2607 pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2608 where
2609 F: FnOnce(&mut MutableAntichain<T>) -> R,
2610 {
2611 let mut caps = self.read_capabilities.lock().expect("poisoned");
2612 f(&mut *caps)
2613 }
2614
2615 pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2616 where
2617 F: FnOnce(&mut Antichain<T>) -> R,
2618 {
2619 let mut frontier = self.write_frontier.lock().expect("poisoned");
2620 f(&mut *frontier)
2621 }
2622}
2623
2624#[derive(Debug)]
2629struct CollectionIntrospection<T: ComputeControllerTimestamp> {
2630 collection_id: GlobalId,
2632 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2634 frontiers: Option<FrontiersIntrospectionState<T>>,
2639 refresh: Option<RefreshIntrospectionState<T>>,
2643}
2644
2645impl<T: ComputeControllerTimestamp> CollectionIntrospection<T> {
2646 fn new(
2647 collection_id: GlobalId,
2648 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2649 as_of: Antichain<T>,
2650 storage_sink: bool,
2651 initial_as_of: Option<Antichain<T>>,
2652 refresh_schedule: Option<RefreshSchedule>,
2653 ) -> Self {
2654 let refresh =
2655 match (refresh_schedule, initial_as_of) {
2656 (Some(refresh_schedule), Some(initial_as_of)) => Some(
2657 RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2658 ),
2659 (refresh_schedule, _) => {
2660 soft_assert_or_log!(
2663 refresh_schedule.is_none(),
2664 "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2665 );
2666 None
2667 }
2668 };
2669 let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2670
2671 let self_ = Self {
2672 collection_id,
2673 introspection_tx,
2674 frontiers,
2675 refresh,
2676 };
2677
2678 self_.report_initial_state();
2679 self_
2680 }
2681
2682 fn report_initial_state(&self) {
2684 if let Some(frontiers) = &self.frontiers {
2685 let row = frontiers.row_for_collection(self.collection_id);
2686 let updates = vec![(row, Diff::ONE)];
2687 self.send(IntrospectionType::Frontiers, updates);
2688 }
2689
2690 if let Some(refresh) = &self.refresh {
2691 let row = refresh.row_for_collection(self.collection_id);
2692 let updates = vec![(row, Diff::ONE)];
2693 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2694 }
2695 }
2696
2697 fn observe_frontiers(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2700 self.update_frontier_introspection(read_frontier, write_frontier);
2701 self.update_refresh_introspection(write_frontier);
2702 }
2703
2704 fn update_frontier_introspection(
2705 &mut self,
2706 read_frontier: &Antichain<T>,
2707 write_frontier: &Antichain<T>,
2708 ) {
2709 let Some(frontiers) = &mut self.frontiers else {
2710 return;
2711 };
2712
2713 if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2714 {
2715 return; };
2717
2718 let retraction = frontiers.row_for_collection(self.collection_id);
2719 frontiers.update(read_frontier, write_frontier);
2720 let insertion = frontiers.row_for_collection(self.collection_id);
2721 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2722 self.send(IntrospectionType::Frontiers, updates);
2723 }
2724
2725 fn update_refresh_introspection(&mut self, write_frontier: &Antichain<T>) {
2726 let Some(refresh) = &mut self.refresh else {
2727 return;
2728 };
2729
2730 let retraction = refresh.row_for_collection(self.collection_id);
2731 refresh.frontier_update(write_frontier);
2732 let insertion = refresh.row_for_collection(self.collection_id);
2733
2734 if retraction == insertion {
2735 return; }
2737
2738 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2739 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2740 }
2741
2742 fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2743 let _ = self.introspection_tx.send((introspection_type, updates));
2746 }
2747}
2748
2749impl<T: ComputeControllerTimestamp> Drop for CollectionIntrospection<T> {
2750 fn drop(&mut self) {
2751 if let Some(frontiers) = &self.frontiers {
2753 let row = frontiers.row_for_collection(self.collection_id);
2754 let updates = vec![(row, Diff::MINUS_ONE)];
2755 self.send(IntrospectionType::Frontiers, updates);
2756 }
2757
2758 if let Some(refresh) = &self.refresh {
2760 let retraction = refresh.row_for_collection(self.collection_id);
2761 let updates = vec![(retraction, Diff::MINUS_ONE)];
2762 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2763 }
2764 }
2765}
2766
2767#[derive(Debug)]
2768struct FrontiersIntrospectionState<T> {
2769 read_frontier: Antichain<T>,
2770 write_frontier: Antichain<T>,
2771}
2772
2773impl<T: ComputeControllerTimestamp> FrontiersIntrospectionState<T> {
2774 fn new(as_of: Antichain<T>) -> Self {
2775 Self {
2776 read_frontier: as_of.clone(),
2777 write_frontier: as_of,
2778 }
2779 }
2780
2781 fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2783 let read_frontier = self
2784 .read_frontier
2785 .as_option()
2786 .map_or(Datum::Null, |ts| ts.clone().into());
2787 let write_frontier = self
2788 .write_frontier
2789 .as_option()
2790 .map_or(Datum::Null, |ts| ts.clone().into());
2791 Row::pack_slice(&[
2792 Datum::String(&collection_id.to_string()),
2793 read_frontier,
2794 write_frontier,
2795 ])
2796 }
2797
2798 fn update(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2800 if read_frontier != &self.read_frontier {
2801 self.read_frontier.clone_from(read_frontier);
2802 }
2803 if write_frontier != &self.write_frontier {
2804 self.write_frontier.clone_from(write_frontier);
2805 }
2806 }
2807}
2808
2809#[derive(Debug)]
2812struct RefreshIntrospectionState<T> {
2813 refresh_schedule: RefreshSchedule,
2815 initial_as_of: Antichain<T>,
2816 next_refresh: Datum<'static>, last_completed_refresh: Datum<'static>, }
2820
2821impl<T> RefreshIntrospectionState<T> {
2822 fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2824 Row::pack_slice(&[
2825 Datum::String(&collection_id.to_string()),
2826 self.last_completed_refresh,
2827 self.next_refresh,
2828 ])
2829 }
2830}
2831
2832impl<T: ComputeControllerTimestamp> RefreshIntrospectionState<T> {
2833 fn new(
2836 refresh_schedule: RefreshSchedule,
2837 initial_as_of: Antichain<T>,
2838 upper: &Antichain<T>,
2839 ) -> Self {
2840 let mut self_ = Self {
2841 refresh_schedule: refresh_schedule.clone(),
2842 initial_as_of: initial_as_of.clone(),
2843 next_refresh: Datum::Null,
2844 last_completed_refresh: Datum::Null,
2845 };
2846 self_.frontier_update(upper);
2847 self_
2848 }
2849
2850 fn frontier_update(&mut self, write_frontier: &Antichain<T>) {
2853 if write_frontier.is_empty() {
2854 self.last_completed_refresh =
2855 if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2856 last_refresh.into()
2857 } else {
2858 T::maximum().into()
2861 };
2862 self.next_refresh = Datum::Null;
2863 } else {
2864 if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2865 self.last_completed_refresh = Datum::Null;
2867 let initial_as_of = self.initial_as_of.as_option().expect(
2868 "initial_as_of can't be [], because then there would be no refreshes at all",
2869 );
2870 let first_refresh = initial_as_of
2871 .round_up(&self.refresh_schedule)
2872 .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2873 soft_assert_or_log!(
2874 first_refresh == *initial_as_of,
2875 "initial_as_of should be set to the first refresh"
2876 );
2877 self.next_refresh = first_refresh.into();
2878 } else {
2879 let write_frontier = write_frontier.as_option().expect("checked above");
2881 self.last_completed_refresh = write_frontier
2882 .round_down_minus_1(&self.refresh_schedule)
2883 .map_or_else(
2884 || {
2885 soft_panic_or_log!(
2886 "rounding down should have returned the first refresh or later"
2887 );
2888 Datum::Null
2889 },
2890 |last_completed_refresh| last_completed_refresh.into(),
2891 );
2892 self.next_refresh = write_frontier.clone().into();
2893 }
2894 }
2895 }
2896}
2897
2898#[derive(Debug)]
2900struct PendingPeek<T: Timestamp> {
2901 target_replica: Option<ReplicaId>,
2905 otel_ctx: OpenTelemetryContext,
2907 requested_at: Instant,
2911 read_hold: ReadHold<T>,
2913 peek_response_tx: oneshot::Sender<PeekResponse>,
2915 limit: Option<usize>,
2917 offset: usize,
2919}
2920
2921#[derive(Debug, Clone)]
2922struct ActiveSubscribe<T> {
2923 frontier: Antichain<T>,
2925 target_replica: Option<ReplicaId>,
2929}
2930
2931impl<T: ComputeControllerTimestamp> ActiveSubscribe<T> {
2932 fn new(target_replica: Option<ReplicaId>) -> Self {
2933 Self {
2934 frontier: Antichain::from_elem(T::minimum()),
2935 target_replica,
2936 }
2937 }
2938}
2939
2940#[derive(Debug)]
2942struct ReplicaState<T: ComputeControllerTimestamp> {
2943 id: ReplicaId,
2945 client: ReplicaClient<T>,
2947 config: ReplicaConfig,
2949 metrics: ReplicaMetrics,
2951 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2953 collections: BTreeMap<GlobalId, ReplicaCollectionState<T>>,
2955 epoch: u64,
2957}
2958
2959impl<T: ComputeControllerTimestamp> ReplicaState<T> {
2960 fn new(
2961 id: ReplicaId,
2962 client: ReplicaClient<T>,
2963 config: ReplicaConfig,
2964 metrics: ReplicaMetrics,
2965 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2966 epoch: u64,
2967 ) -> Self {
2968 Self {
2969 id,
2970 client,
2971 config,
2972 metrics,
2973 introspection_tx,
2974 epoch,
2975 collections: Default::default(),
2976 }
2977 }
2978
2979 fn add_collection(
2985 &mut self,
2986 id: GlobalId,
2987 as_of: Antichain<T>,
2988 input_read_holds: Vec<ReadHold<T>>,
2989 ) {
2990 let metrics = self.metrics.for_collection(id);
2991 let introspection = ReplicaCollectionIntrospection::new(
2992 self.id,
2993 id,
2994 self.introspection_tx.clone(),
2995 as_of.clone(),
2996 );
2997 let mut state =
2998 ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
2999
3000 if id.is_transient() {
3004 state.wallclock_lag_max = None;
3005 }
3006
3007 if let Some(previous) = self.collections.insert(id, state) {
3008 panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
3009 }
3010 }
3011
3012 fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState<T>> {
3014 self.collections.remove(&id)
3015 }
3016
3017 fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
3019 self.collections.get(&id).map_or(true, |c| {
3020 c.write_frontier.is_empty()
3021 && c.input_frontier.is_empty()
3022 && c.output_frontier.is_empty()
3023 })
3024 }
3025
3026 #[mz_ore::instrument(level = "debug")]
3030 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3031 let Self {
3038 id,
3039 client: _,
3040 config: _,
3041 metrics: _,
3042 introspection_tx: _,
3043 epoch,
3044 collections,
3045 } = self;
3046
3047 fn field(
3048 key: &str,
3049 value: impl Serialize,
3050 ) -> Result<(String, serde_json::Value), anyhow::Error> {
3051 let value = serde_json::to_value(value)?;
3052 Ok((key.to_string(), value))
3053 }
3054
3055 let collections: BTreeMap<_, _> = collections
3056 .iter()
3057 .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
3058 .collect();
3059
3060 let map = serde_json::Map::from_iter([
3061 field("id", id.to_string())?,
3062 field("collections", collections)?,
3063 field("epoch", epoch)?,
3064 ]);
3065 Ok(serde_json::Value::Object(map))
3066 }
3067}
3068
3069#[derive(Debug)]
3070struct ReplicaCollectionState<T: ComputeControllerTimestamp> {
3071 write_frontier: Antichain<T>,
3075 input_frontier: Antichain<T>,
3079 output_frontier: Antichain<T>,
3083
3084 metrics: Option<ReplicaCollectionMetrics>,
3088 as_of: Antichain<T>,
3090 introspection: ReplicaCollectionIntrospection<T>,
3092 input_read_holds: Vec<ReadHold<T>>,
3098
3099 wallclock_lag_max: Option<WallclockLag>,
3103}
3104
3105impl<T: ComputeControllerTimestamp> ReplicaCollectionState<T> {
3106 fn new(
3107 metrics: Option<ReplicaCollectionMetrics>,
3108 as_of: Antichain<T>,
3109 introspection: ReplicaCollectionIntrospection<T>,
3110 input_read_holds: Vec<ReadHold<T>>,
3111 ) -> Self {
3112 Self {
3113 write_frontier: as_of.clone(),
3114 input_frontier: as_of.clone(),
3115 output_frontier: as_of.clone(),
3116 metrics,
3117 as_of,
3118 introspection,
3119 input_read_holds,
3120 wallclock_lag_max: Some(WallclockLag::MIN),
3121 }
3122 }
3123
3124 fn hydrated(&self) -> bool {
3126 self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3142 }
3143
3144 fn update_write_frontier(&mut self, new_frontier: Antichain<T>) {
3146 if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3147 soft_panic_or_log!(
3148 "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3149 self.write_frontier,
3150 );
3151 return;
3152 } else if new_frontier == self.write_frontier {
3153 return;
3154 }
3155
3156 self.write_frontier = new_frontier;
3157 }
3158
3159 fn update_input_frontier(&mut self, new_frontier: Antichain<T>) {
3161 if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3162 soft_panic_or_log!(
3163 "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3164 self.input_frontier,
3165 );
3166 return;
3167 } else if new_frontier == self.input_frontier {
3168 return;
3169 }
3170
3171 self.input_frontier = new_frontier;
3172
3173 for read_hold in &mut self.input_read_holds {
3175 let result = read_hold.try_downgrade(self.input_frontier.clone());
3176 soft_assert_or_log!(
3177 result.is_ok(),
3178 "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3179 self.input_frontier,
3180 );
3181 }
3182 }
3183
3184 fn update_output_frontier(&mut self, new_frontier: Antichain<T>) {
3186 if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3187 soft_panic_or_log!(
3188 "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3189 self.output_frontier,
3190 );
3191 return;
3192 } else if new_frontier == self.output_frontier {
3193 return;
3194 }
3195
3196 self.output_frontier = new_frontier;
3197 }
3198}
3199
3200#[derive(Debug)]
3203struct ReplicaCollectionIntrospection<T: ComputeControllerTimestamp> {
3204 replica_id: ReplicaId,
3206 collection_id: GlobalId,
3208 operators: BTreeMap<(LirId, usize), bool>,
3211 write_frontier: Antichain<T>,
3213 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3215}
3216
3217impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
3218 fn new(
3220 replica_id: ReplicaId,
3221 collection_id: GlobalId,
3222 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3223 as_of: Antichain<T>,
3224 ) -> Self {
3225 let self_ = Self {
3226 replica_id,
3227 collection_id,
3228 operators: Default::default(),
3229 write_frontier: as_of,
3230 introspection_tx,
3231 };
3232
3233 self_.report_initial_state();
3234 self_
3235 }
3236
3237 fn report_initial_state(&self) {
3239 let row = self.write_frontier_row();
3240 let updates = vec![(row, Diff::ONE)];
3241 self.send(IntrospectionType::ReplicaFrontiers, updates);
3242 }
3243
3244 fn operator_hydrated(&mut self, lir_id: LirId, worker_id: usize, hydrated: bool) {
3246 let retraction = self.operator_hydration_row(lir_id, worker_id);
3247 self.operators.insert((lir_id, worker_id), hydrated);
3248 let insertion = self.operator_hydration_row(lir_id, worker_id);
3249
3250 if retraction == insertion {
3251 return; }
3253
3254 let updates = retraction
3255 .map(|r| (r, Diff::MINUS_ONE))
3256 .into_iter()
3257 .chain(insertion.map(|r| (r, Diff::ONE)))
3258 .collect();
3259 self.send(IntrospectionType::ComputeOperatorHydrationStatus, updates);
3260 }
3261
3262 fn operator_hydration_row(&self, lir_id: LirId, worker_id: usize) -> Option<Row> {
3266 self.operators.get(&(lir_id, worker_id)).map(|hydrated| {
3267 Row::pack_slice(&[
3268 Datum::String(&self.collection_id.to_string()),
3269 Datum::UInt64(lir_id.into()),
3270 Datum::String(&self.replica_id.to_string()),
3271 Datum::UInt64(u64::cast_from(worker_id)),
3272 Datum::from(*hydrated),
3273 ])
3274 })
3275 }
3276
3277 fn observe_frontier(&mut self, write_frontier: &Antichain<T>) {
3279 if self.write_frontier == *write_frontier {
3280 return; }
3282
3283 let retraction = self.write_frontier_row();
3284 self.write_frontier.clone_from(write_frontier);
3285 let insertion = self.write_frontier_row();
3286
3287 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3288 self.send(IntrospectionType::ReplicaFrontiers, updates);
3289 }
3290
3291 fn write_frontier_row(&self) -> Row {
3293 let write_frontier = self
3294 .write_frontier
3295 .as_option()
3296 .map_or(Datum::Null, |ts| ts.clone().into());
3297 Row::pack_slice(&[
3298 Datum::String(&self.collection_id.to_string()),
3299 Datum::String(&self.replica_id.to_string()),
3300 write_frontier,
3301 ])
3302 }
3303
3304 fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3305 let _ = self.introspection_tx.send((introspection_type, updates));
3308 }
3309}
3310
3311impl<T: ComputeControllerTimestamp> Drop for ReplicaCollectionIntrospection<T> {
3312 fn drop(&mut self) {
3313 let operators: Vec<_> = self.operators.keys().collect();
3315 let updates: Vec<_> = operators
3316 .into_iter()
3317 .flat_map(|(lir_id, worker_id)| self.operator_hydration_row(*lir_id, *worker_id))
3318 .map(|r| (r, Diff::MINUS_ONE))
3319 .collect();
3320 if !updates.is_empty() {
3321 self.send(IntrospectionType::ComputeOperatorHydrationStatus, updates);
3322 }
3323
3324 let row = self.write_frontier_row();
3326 let updates = vec![(row, Diff::MINUS_ONE)];
3327 self.send(IntrospectionType::ReplicaFrontiers, updates);
3328 }
3329}