1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::ComputeInstanceId;
21use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
22use mz_compute_types::plan::render_plan::RenderPlan;
23use mz_compute_types::sinks::{
24 ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, MaterializedViewSinkConnection,
25};
26use mz_compute_types::sources::SourceInstanceDesc;
27use mz_controller_types::dyncfgs::{
28 ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, WALLCLOCK_LAG_RECORDING_INTERVAL,
29};
30use mz_dyncfg::ConfigSet;
31use mz_expr::RowSetFinishing;
32use mz_ore::cast::CastFrom;
33use mz_ore::channel::instrumented_unbounded_channel;
34use mz_ore::now::NowFn;
35use mz_ore::tracing::OpenTelemetryContext;
36use mz_ore::{soft_assert_or_log, soft_panic_or_log};
37use mz_persist_types::PersistLocation;
38use mz_repr::adt::timestamp::CheckedTimestamp;
39use mz_repr::refresh_schedule::RefreshSchedule;
40use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row};
41use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
42use mz_storage_types::read_holds::{self, ReadHold};
43use mz_storage_types::read_policy::ReadPolicy;
44use thiserror::Error;
45use timely::PartialOrder;
46use timely::progress::frontier::MutableAntichain;
47use timely::progress::{Antichain, ChangeBatch, Timestamp};
48use tokio::sync::mpsc::error::SendError;
49use tokio::sync::{mpsc, oneshot};
50use tracing::debug_span;
51use uuid::Uuid;
52
53use crate::controller::error::{
54 CollectionLookupError, CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
55};
56use crate::controller::replica::{ReplicaClient, ReplicaConfig};
57use crate::controller::{
58 ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, PeekNotification,
59 ReplicaId, StorageCollections,
60};
61use crate::logging::LogVariant;
62use crate::metrics::IntCounter;
63use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
64use crate::protocol::command::{
65 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
66};
67use crate::protocol::history::ComputeCommandHistory;
68use crate::protocol::response::{
69 ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StatusResponse,
70 SubscribeBatch, SubscribeResponse,
71};
72
73#[derive(Error, Debug)]
74#[error("replica exists already: {0}")]
75pub(super) struct ReplicaExists(pub ReplicaId);
76
77#[derive(Error, Debug)]
78#[error("replica does not exist: {0}")]
79pub(super) struct ReplicaMissing(pub ReplicaId);
80
81#[derive(Error, Debug)]
82pub(super) enum DataflowCreationError {
83 #[error("collection does not exist: {0}")]
84 CollectionMissing(GlobalId),
85 #[error("replica does not exist: {0}")]
86 ReplicaMissing(ReplicaId),
87 #[error("dataflow definition lacks an as_of value")]
88 MissingAsOf,
89 #[error("subscribe dataflow has an empty as_of")]
90 EmptyAsOfForSubscribe,
91 #[error("copy to dataflow has an empty as_of")]
92 EmptyAsOfForCopyTo,
93 #[error("no read hold provided for dataflow import: {0}")]
94 ReadHoldMissing(GlobalId),
95 #[error("insufficient read hold provided for dataflow import: {0}")]
96 ReadHoldInsufficient(GlobalId),
97}
98
99impl From<CollectionMissing> for DataflowCreationError {
100 fn from(error: CollectionMissing) -> Self {
101 Self::CollectionMissing(error.0)
102 }
103}
104
105#[derive(Error, Debug)]
106#[error("the instance has shut down")]
107pub(super) struct InstanceShutDown;
108
109#[derive(Error, Debug)]
111pub enum PeekError {
112 #[error("replica does not exist: {0}")]
114 ReplicaMissing(ReplicaId),
115 #[error("read hold ID does not match peeked collection: {0}")]
117 ReadHoldIdMismatch(GlobalId),
118 #[error("insufficient read hold provided: {0}")]
120 ReadHoldInsufficient(GlobalId),
121 #[error("the instance has shut down")]
123 InstanceShutDown,
124}
125
126impl From<InstanceShutDown> for PeekError {
127 fn from(_error: InstanceShutDown) -> Self {
128 Self::InstanceShutDown
129 }
130}
131
132#[derive(Error, Debug)]
133pub(super) enum ReadPolicyError {
134 #[error("collection does not exist: {0}")]
135 CollectionMissing(GlobalId),
136 #[error("collection is write-only: {0}")]
137 WriteOnlyCollection(GlobalId),
138}
139
140impl From<CollectionMissing> for ReadPolicyError {
141 fn from(error: CollectionMissing) -> Self {
142 Self::CollectionMissing(error.0)
143 }
144}
145
146type Command<T> = Box<dyn FnOnce(&mut Instance<T>) + Send>;
148
149#[derive(Clone, derivative::Derivative)]
151#[derivative(Debug)]
152pub struct Client<T: ComputeControllerTimestamp> {
153 command_tx: mpsc::UnboundedSender<Command<T>>,
155 #[derivative(Debug = "ignore")]
157 read_hold_tx: read_holds::ChangeTx<T>,
158}
159
160impl<T: ComputeControllerTimestamp> Client<T> {
161 pub(super) fn read_hold_tx(&self) -> read_holds::ChangeTx<T> {
162 Arc::clone(&self.read_hold_tx)
163 }
164
165 pub(super) fn call<F>(&self, f: F) -> Result<(), InstanceShutDown>
168 where
169 F: FnOnce(&mut Instance<T>) + Send + 'static,
170 {
171 let otel_ctx = OpenTelemetryContext::obtain();
172 self.command_tx
173 .send(Box::new(move |instance| {
174 let _span = debug_span!("instance::call").entered();
175 otel_ctx.attach_as_parent();
176
177 f(instance)
178 }))
179 .map_err(|_send_error| InstanceShutDown)
180 }
181
182 pub(super) async fn call_sync<F, R>(&self, f: F) -> Result<R, InstanceShutDown>
185 where
186 F: FnOnce(&mut Instance<T>) -> R + Send + 'static,
187 R: Send + 'static,
188 {
189 let (tx, rx) = oneshot::channel();
190 let otel_ctx = OpenTelemetryContext::obtain();
191 self.command_tx
192 .send(Box::new(move |instance| {
193 let _span = debug_span!("instance::call_sync").entered();
194 otel_ctx.attach_as_parent();
195 let result = f(instance);
196 let _ = tx.send(result);
197 }))
198 .map_err(|_send_error| InstanceShutDown)?;
199
200 rx.await.map_err(|_| InstanceShutDown)
201 }
202}
203
204impl<T> Client<T>
205where
206 T: ComputeControllerTimestamp,
207{
208 pub(super) fn spawn(
209 id: ComputeInstanceId,
210 build_info: &'static BuildInfo,
211 storage: StorageCollections<T>,
212 peek_stash_persist_location: PersistLocation,
213 arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
214 metrics: InstanceMetrics,
215 now: NowFn,
216 wallclock_lag: WallclockLagFn<T>,
217 dyncfg: Arc<ConfigSet>,
218 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
219 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
220 read_only: bool,
221 ) -> Self {
222 let (command_tx, command_rx) = mpsc::unbounded_channel();
223
224 let read_hold_tx: read_holds::ChangeTx<_> = {
225 let command_tx = command_tx.clone();
226 Arc::new(move |id, change: ChangeBatch<_>| {
227 let cmd: Command<_> = {
228 let change = change.clone();
229 Box::new(move |i| i.apply_read_hold_change(id, change.clone()))
230 };
231 command_tx.send(cmd).map_err(|_| SendError((id, change)))
232 })
233 };
234
235 mz_ore::task::spawn(
236 || format!("compute-instance-{id}"),
237 Instance::new(
238 build_info,
239 storage,
240 peek_stash_persist_location,
241 arranged_logs,
242 metrics,
243 now,
244 wallclock_lag,
245 dyncfg,
246 command_rx,
247 response_tx,
248 Arc::clone(&read_hold_tx),
249 introspection_tx,
250 read_only,
251 )
252 .run(),
253 );
254
255 Self {
256 command_tx,
257 read_hold_tx,
258 }
259 }
260
261 pub async fn acquire_read_holds_and_collection_write_frontiers(
264 &self,
265 ids: Vec<GlobalId>,
266 ) -> Result<Vec<(GlobalId, ReadHold<T>, Antichain<T>)>, CollectionLookupError> {
267 self.call_sync(move |i| {
268 let mut result = Vec::new();
269 for id in ids.into_iter() {
270 result.push((
271 id,
272 i.acquire_read_hold(id)?,
273 i.collection_write_frontier(id)?,
274 ));
275 }
276 Ok(result)
277 })
278 .await?
279 }
280
281 pub async fn peek(
285 &self,
286 peek_target: PeekTarget,
287 literal_constraints: Option<Vec<Row>>,
288 uuid: Uuid,
289 timestamp: T,
290 result_desc: RelationDesc,
291 finishing: RowSetFinishing,
292 map_filter_project: mz_expr::SafeMfpPlan,
293 target_read_hold: ReadHold<T>,
294 target_replica: Option<ReplicaId>,
295 peek_response_tx: oneshot::Sender<PeekResponse>,
296 ) -> Result<(), PeekError> {
297 self.call_sync(move |i| {
298 i.peek(
299 peek_target,
300 literal_constraints,
301 uuid,
302 timestamp,
303 result_desc,
304 finishing,
305 map_filter_project,
306 target_read_hold,
307 target_replica,
308 peek_response_tx,
309 )
310 })
311 .await?
312 }
313}
314
315pub(super) type ReplicaResponse<T> = (ReplicaId, u64, ComputeResponse<T>);
318
319pub(super) struct Instance<T: ComputeControllerTimestamp> {
321 build_info: &'static BuildInfo,
323 storage_collections: StorageCollections<T>,
325 initialized: bool,
327 read_only: bool,
332 workload_class: Option<String>,
336 replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
338 collections: BTreeMap<GlobalId, CollectionState<T>>,
346 log_sources: BTreeMap<LogVariant, GlobalId>,
348 peeks: BTreeMap<Uuid, PendingPeek<T>>,
357 subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>,
371 copy_tos: BTreeSet<GlobalId>,
379 history: ComputeCommandHistory<UIntGauge, T>,
381 command_rx: mpsc::UnboundedReceiver<Command<T>>,
383 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
385 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
387 metrics: InstanceMetrics,
389 dyncfg: Arc<ConfigSet>,
391
392 peek_stash_persist_location: PersistLocation,
394
395 now: NowFn,
397 wallclock_lag: WallclockLagFn<T>,
399 wallclock_lag_last_recorded: DateTime<Utc>,
401
402 read_hold_tx: read_holds::ChangeTx<T>,
407 replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
409 replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse<T>, IntCounter>,
411}
412
413impl<T: ComputeControllerTimestamp> Instance<T> {
414 fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, CollectionMissing> {
416 self.collections.get(&id).ok_or(CollectionMissing(id))
417 }
418
419 fn collection_mut(
421 &mut self,
422 id: GlobalId,
423 ) -> Result<&mut CollectionState<T>, CollectionMissing> {
424 self.collections.get_mut(&id).ok_or(CollectionMissing(id))
425 }
426
427 fn expect_collection(&self, id: GlobalId) -> &CollectionState<T> {
433 self.collections.get(&id).expect("collection must exist")
434 }
435
436 fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T> {
442 self.collections
443 .get_mut(&id)
444 .expect("collection must exist")
445 }
446
447 fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState<T>)> {
448 self.collections.iter().map(|(id, coll)| (*id, coll))
449 }
450
451 fn add_collection(
457 &mut self,
458 id: GlobalId,
459 as_of: Antichain<T>,
460 shared: SharedCollectionState<T>,
461 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
462 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
463 replica_input_read_holds: Vec<ReadHold<T>>,
464 write_only: bool,
465 storage_sink: bool,
466 initial_as_of: Option<Antichain<T>>,
467 refresh_schedule: Option<RefreshSchedule>,
468 ) {
469 let introspection = CollectionIntrospection::new(
471 id,
472 self.introspection_tx.clone(),
473 as_of.clone(),
474 storage_sink,
475 initial_as_of,
476 refresh_schedule,
477 );
478 let mut state = CollectionState::new(
479 id,
480 as_of.clone(),
481 shared,
482 storage_dependencies,
483 compute_dependencies,
484 Arc::clone(&self.read_hold_tx),
485 introspection,
486 );
487 if write_only {
489 state.read_policy = None;
490 }
491
492 if let Some(previous) = self.collections.insert(id, state) {
493 panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
494 }
495
496 for replica in self.replicas.values_mut() {
498 replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
499 }
500
501 self.report_dependency_updates(id, Diff::ONE);
503 }
504
505 fn remove_collection(&mut self, id: GlobalId) {
506 self.report_dependency_updates(id, Diff::MINUS_ONE);
508
509 for replica in self.replicas.values_mut() {
511 replica.remove_collection(id);
512 }
513
514 self.collections.remove(&id);
516 }
517
518 fn add_replica_state(
519 &mut self,
520 id: ReplicaId,
521 client: ReplicaClient<T>,
522 config: ReplicaConfig,
523 epoch: u64,
524 ) {
525 let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
526
527 let metrics = self.metrics.for_replica(id);
528 let mut replica = ReplicaState::new(
529 id,
530 client,
531 config,
532 metrics,
533 self.introspection_tx.clone(),
534 epoch,
535 );
536
537 for (collection_id, collection) in &self.collections {
539 if collection.log_collection && !log_ids.contains(collection_id) {
541 continue;
542 }
543
544 let as_of = if collection.log_collection {
545 Antichain::from_elem(T::minimum())
550 } else {
551 collection.read_frontier().to_owned()
552 };
553
554 let input_read_holds = collection.storage_dependencies.values().cloned().collect();
555 replica.add_collection(*collection_id, as_of, input_read_holds);
556 }
557
558 self.replicas.insert(id, replica);
559 }
560
561 fn deliver_response(&self, response: ComputeControllerResponse<T>) {
563 let _ = self.response_tx.send(response);
566 }
567
568 fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
570 let _ = self.introspection_tx.send((type_, updates));
573 }
574
575 fn replica_exists(&self, id: ReplicaId) -> bool {
577 self.replicas.contains_key(&id)
578 }
579
580 fn peeks_targeting(
582 &self,
583 replica_id: ReplicaId,
584 ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)> {
585 self.peeks.iter().filter_map(move |(uuid, peek)| {
586 if peek.target_replica == Some(replica_id) {
587 Some((*uuid, peek))
588 } else {
589 None
590 }
591 })
592 }
593
594 fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
596 self.subscribes.iter().filter_map(move |(id, subscribe)| {
597 let targeting = subscribe.target_replica == Some(replica_id);
598 targeting.then_some(*id)
599 })
600 }
601
602 fn update_frontier_introspection(&mut self) {
611 for collection in self.collections.values_mut() {
612 collection
613 .introspection
614 .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
615 }
616
617 for replica in self.replicas.values_mut() {
618 for collection in replica.collections.values_mut() {
619 collection
620 .introspection
621 .observe_frontier(&collection.write_frontier);
622 }
623 }
624 }
625
626 fn refresh_state_metrics(&self) {
635 let unscheduled_collections_count =
636 self.collections.values().filter(|c| !c.scheduled).count();
637 let connected_replica_count = self
638 .replicas
639 .values()
640 .filter(|r| r.client.is_connected())
641 .count();
642
643 self.metrics
644 .replica_count
645 .set(u64::cast_from(self.replicas.len()));
646 self.metrics
647 .collection_count
648 .set(u64::cast_from(self.collections.len()));
649 self.metrics
650 .collection_unscheduled_count
651 .set(u64::cast_from(unscheduled_collections_count));
652 self.metrics
653 .peek_count
654 .set(u64::cast_from(self.peeks.len()));
655 self.metrics
656 .subscribe_count
657 .set(u64::cast_from(self.subscribes.len()));
658 self.metrics
659 .copy_to_count
660 .set(u64::cast_from(self.copy_tos.len()));
661 self.metrics
662 .connected_replica_count
663 .set(u64::cast_from(connected_replica_count));
664 }
665
666 fn refresh_wallclock_lag(&mut self) {
685 let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
686 Some(ts) => (self.wallclock_lag)(ts.clone()),
687 None => Duration::ZERO,
688 };
689
690 let now_ms = (self.now)();
691 let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
692 let histogram_labels = match &self.workload_class {
693 Some(wc) => [("workload_class", wc.clone())].into(),
694 None => BTreeMap::new(),
695 };
696
697 let mut unreadable_collections = BTreeSet::new();
701 for (id, collection) in &mut self.collections {
702 let read_frontier = match self.storage_collections.collection_frontiers(*id) {
704 Ok(f) => f.read_capabilities,
705 Err(_) => collection.read_frontier(),
706 };
707 let write_frontier = collection.write_frontier();
708 let collection_unreadable = PartialOrder::less_equal(&write_frontier, &read_frontier);
709 if collection_unreadable {
710 unreadable_collections.insert(id);
711 }
712
713 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
714 let bucket = if collection_unreadable {
715 WallclockLag::Undefined
716 } else {
717 let lag = frontier_lag(&write_frontier);
718 let lag = lag.as_secs().next_power_of_two();
719 WallclockLag::Seconds(lag)
720 };
721
722 let key = (histogram_period, bucket, histogram_labels.clone());
723 *stash.entry(key).or_default() += Diff::ONE;
724 }
725 }
726
727 for replica in self.replicas.values_mut() {
729 for (id, collection) in &mut replica.collections {
730 let lag = if unreadable_collections.contains(&id) {
731 WallclockLag::Undefined
732 } else {
733 let lag = frontier_lag(&collection.write_frontier);
734 WallclockLag::Seconds(lag.as_secs())
735 };
736
737 if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
738 *wallclock_lag_max = (*wallclock_lag_max).max(lag);
739 }
740
741 if let Some(metrics) = &mut collection.metrics {
742 let secs = lag.unwrap_seconds_or(u64::MAX);
745 metrics.wallclock_lag.observe(secs);
746 };
747 }
748 }
749
750 self.maybe_record_wallclock_lag();
752 }
753
754 fn maybe_record_wallclock_lag(&mut self) {
762 if self.read_only {
763 return;
764 }
765
766 let duration_trunc = |datetime: DateTime<_>, interval| {
767 let td = TimeDelta::from_std(interval).ok()?;
768 datetime.duration_trunc(td).ok()
769 };
770
771 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
772 let now_dt = mz_ore::now::to_datetime((self.now)());
773 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
774 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
775 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
776 duration_trunc(now_dt, *default).unwrap()
777 });
778 if now_trunc <= self.wallclock_lag_last_recorded {
779 return;
780 }
781
782 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
783
784 let mut history_updates = Vec::new();
785 for (replica_id, replica) in &mut self.replicas {
786 for (collection_id, collection) in &mut replica.collections {
787 let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
788 continue;
789 };
790
791 let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
792 let row = Row::pack_slice(&[
793 Datum::String(&collection_id.to_string()),
794 Datum::String(&replica_id.to_string()),
795 max_lag.into_interval_datum(),
796 Datum::TimestampTz(now_ts),
797 ]);
798 history_updates.push((row, Diff::ONE));
799 }
800 }
801 if !history_updates.is_empty() {
802 self.deliver_introspection_updates(
803 IntrospectionType::WallclockLagHistory,
804 history_updates,
805 );
806 }
807
808 let mut histogram_updates = Vec::new();
809 let mut row_buf = Row::default();
810 for (collection_id, collection) in &mut self.collections {
811 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
812 continue;
813 };
814
815 for ((period, lag, labels), count) in std::mem::take(stash) {
816 let mut packer = row_buf.packer();
817 packer.extend([
818 Datum::TimestampTz(period.start),
819 Datum::TimestampTz(period.end),
820 Datum::String(&collection_id.to_string()),
821 lag.into_uint64_datum(),
822 ]);
823 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
824 packer.push_dict(labels);
825
826 histogram_updates.push((row_buf.clone(), count));
827 }
828 }
829 if !histogram_updates.is_empty() {
830 self.deliver_introspection_updates(
831 IntrospectionType::WallclockLagHistogram,
832 histogram_updates,
833 );
834 }
835
836 self.wallclock_lag_last_recorded = now_trunc;
837 }
838
839 fn report_dependency_updates(&self, id: GlobalId, diff: Diff) {
845 let collection = self.expect_collection(id);
846 let dependencies = collection.dependency_ids();
847
848 let updates = dependencies
849 .map(|dependency_id| {
850 let row = Row::pack_slice(&[
851 Datum::String(&id.to_string()),
852 Datum::String(&dependency_id.to_string()),
853 ]);
854 (row, diff)
855 })
856 .collect();
857
858 self.deliver_introspection_updates(IntrospectionType::ComputeDependencies, updates);
859 }
860
861 #[mz_ore::instrument(level = "debug")]
867 pub fn collection_hydrated(
868 &self,
869 collection_id: GlobalId,
870 ) -> Result<bool, CollectionLookupError> {
871 if self.replicas.is_empty() {
872 return Ok(true);
873 }
874
875 for replica_state in self.replicas.values() {
876 let collection_state = replica_state
877 .collections
878 .get(&collection_id)
879 .ok_or(CollectionLookupError::CollectionMissing(collection_id))?;
880
881 if collection_state.hydrated() {
882 return Ok(true);
883 }
884 }
885
886 Ok(false)
887 }
888
889 #[mz_ore::instrument(level = "debug")]
895 pub fn collections_hydrated_on_replicas(
896 &self,
897 target_replica_ids: Option<Vec<ReplicaId>>,
898 exclude_collections: &BTreeSet<GlobalId>,
899 ) -> Result<bool, HydrationCheckBadTarget> {
900 if self.replicas.is_empty() {
901 return Ok(true);
902 }
903 let mut all_hydrated = true;
904 let target_replicas: BTreeSet<ReplicaId> = self
905 .replicas
906 .keys()
907 .filter_map(|id| match target_replica_ids {
908 None => Some(id.clone()),
909 Some(ref ids) if ids.contains(id) => Some(id.clone()),
910 Some(_) => None,
911 })
912 .collect();
913 if let Some(targets) = target_replica_ids {
914 if target_replicas.is_empty() {
915 return Err(HydrationCheckBadTarget(targets));
916 }
917 }
918
919 for (id, _collection) in self.collections_iter() {
920 if id.is_transient() || exclude_collections.contains(&id) {
921 continue;
922 }
923
924 let mut collection_hydrated = false;
925 for replica_state in self.replicas.values() {
926 if !target_replicas.contains(&replica_state.id) {
927 continue;
928 }
929 let collection_state = replica_state
930 .collections
931 .get(&id)
932 .expect("missing collection state");
933
934 if collection_state.hydrated() {
935 collection_hydrated = true;
936 break;
937 }
938 }
939
940 if !collection_hydrated {
941 tracing::info!("collection {id} is not hydrated on any replica");
942 all_hydrated = false;
943 }
946 }
947
948 Ok(all_hydrated)
949 }
950
951 fn cleanup_collections(&mut self) {
967 let to_remove: Vec<_> = self
968 .collections_iter()
969 .filter(|(id, collection)| {
970 collection.dropped
971 && collection.shared.lock_read_capabilities(|c| c.is_empty())
972 && self
973 .replicas
974 .values()
975 .all(|r| r.collection_frontiers_empty(*id))
976 })
977 .map(|(id, _collection)| id)
978 .collect();
979
980 for id in to_remove {
981 self.remove_collection(id);
982 }
983 }
984
985 #[mz_ore::instrument(level = "debug")]
989 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
990 let Self {
997 build_info: _,
998 storage_collections: _,
999 peek_stash_persist_location: _,
1000 initialized,
1001 read_only,
1002 workload_class,
1003 replicas,
1004 collections,
1005 log_sources: _,
1006 peeks,
1007 subscribes,
1008 copy_tos,
1009 history: _,
1010 command_rx: _,
1011 response_tx: _,
1012 introspection_tx: _,
1013 metrics: _,
1014 dyncfg: _,
1015 now: _,
1016 wallclock_lag: _,
1017 wallclock_lag_last_recorded,
1018 read_hold_tx: _,
1019 replica_tx: _,
1020 replica_rx: _,
1021 } = self;
1022
1023 let replicas: BTreeMap<_, _> = replicas
1024 .iter()
1025 .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
1026 .collect::<Result<_, anyhow::Error>>()?;
1027 let collections: BTreeMap<_, _> = collections
1028 .iter()
1029 .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
1030 .collect();
1031 let peeks: BTreeMap<_, _> = peeks
1032 .iter()
1033 .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
1034 .collect();
1035 let subscribes: BTreeMap<_, _> = subscribes
1036 .iter()
1037 .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
1038 .collect();
1039 let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
1040 let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
1041
1042 Ok(serde_json::json!({
1043 "initialized": initialized,
1044 "read_only": read_only,
1045 "workload_class": workload_class,
1046 "replicas": replicas,
1047 "collections": collections,
1048 "peeks": peeks,
1049 "subscribes": subscribes,
1050 "copy_tos": copy_tos,
1051 "wallclock_lag_last_recorded": wallclock_lag_last_recorded,
1052 }))
1053 }
1054
1055 fn collection_write_frontier(&self, id: GlobalId) -> Result<Antichain<T>, CollectionMissing> {
1057 Ok(self.collection(id)?.write_frontier())
1058 }
1059}
1060
1061impl<T> Instance<T>
1062where
1063 T: ComputeControllerTimestamp,
1064{
1065 fn new(
1066 build_info: &'static BuildInfo,
1067 storage: StorageCollections<T>,
1068 peek_stash_persist_location: PersistLocation,
1069 arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
1070 metrics: InstanceMetrics,
1071 now: NowFn,
1072 wallclock_lag: WallclockLagFn<T>,
1073 dyncfg: Arc<ConfigSet>,
1074 command_rx: mpsc::UnboundedReceiver<Command<T>>,
1075 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
1076 read_hold_tx: read_holds::ChangeTx<T>,
1077 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
1078 read_only: bool,
1079 ) -> Self {
1080 let mut collections = BTreeMap::new();
1081 let mut log_sources = BTreeMap::new();
1082 for (log, id, shared) in arranged_logs {
1083 let collection = CollectionState::new_log_collection(
1084 id,
1085 shared,
1086 Arc::clone(&read_hold_tx),
1087 introspection_tx.clone(),
1088 );
1089 collections.insert(id, collection);
1090 log_sources.insert(log, id);
1091 }
1092
1093 let history = ComputeCommandHistory::new(metrics.for_history());
1094
1095 let send_count = metrics.response_send_count.clone();
1096 let recv_count = metrics.response_recv_count.clone();
1097 let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
1098
1099 let now_dt = mz_ore::now::to_datetime(now());
1100
1101 Self {
1102 build_info,
1103 storage_collections: storage,
1104 peek_stash_persist_location,
1105 initialized: false,
1106 read_only,
1107 workload_class: None,
1108 replicas: Default::default(),
1109 collections,
1110 log_sources,
1111 peeks: Default::default(),
1112 subscribes: Default::default(),
1113 copy_tos: Default::default(),
1114 history,
1115 command_rx,
1116 response_tx,
1117 introspection_tx,
1118 metrics,
1119 dyncfg,
1120 now,
1121 wallclock_lag,
1122 wallclock_lag_last_recorded: now_dt,
1123 read_hold_tx,
1124 replica_tx,
1125 replica_rx,
1126 }
1127 }
1128
1129 async fn run(mut self) {
1130 self.send(ComputeCommand::Hello {
1131 nonce: Uuid::default(),
1134 });
1135
1136 let instance_config = InstanceConfig {
1137 peek_stash_persist_location: self.peek_stash_persist_location.clone(),
1138 logging: Default::default(),
1141 expiration_offset: Default::default(),
1142 };
1143
1144 self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
1145
1146 loop {
1147 tokio::select! {
1148 command = self.command_rx.recv() => match command {
1149 Some(cmd) => cmd(&mut self),
1150 None => break,
1151 },
1152 response = self.replica_rx.recv() => match response {
1153 Some(response) => self.handle_response(response),
1154 None => unreachable!("self owns a sender side of the channel"),
1155 }
1156 }
1157 }
1158 }
1159
1160 #[mz_ore::instrument(level = "debug")]
1162 pub fn update_configuration(&mut self, config_params: ComputeParameters) {
1163 if let Some(workload_class) = &config_params.workload_class {
1164 self.workload_class = workload_class.clone();
1165 }
1166
1167 let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
1168 self.send(command);
1169 }
1170
1171 #[mz_ore::instrument(level = "debug")]
1176 pub fn initialization_complete(&mut self) {
1177 if !self.initialized {
1179 self.send(ComputeCommand::InitializationComplete);
1180 self.initialized = true;
1181 }
1182 }
1183
1184 #[mz_ore::instrument(level = "debug")]
1188 pub fn allow_writes(&mut self, collection_id: GlobalId) -> Result<(), CollectionMissing> {
1189 let collection = self.collection_mut(collection_id)?;
1190
1191 if !collection.read_only {
1193 return Ok(());
1194 }
1195
1196 let as_of = collection.read_frontier();
1198
1199 if as_of.is_empty() {
1202 return Ok(());
1203 }
1204
1205 collection.read_only = false;
1206 self.send(ComputeCommand::AllowWrites(collection_id));
1207
1208 Ok(())
1209 }
1210
1211 #[mz_ore::instrument(level = "debug")]
1222 pub fn shutdown(&mut self) {
1223 let (_tx, rx) = mpsc::unbounded_channel();
1225 let mut command_rx = std::mem::replace(&mut self.command_rx, rx);
1226
1227 while let Ok(cmd) = command_rx.try_recv() {
1233 cmd(self);
1234 }
1235
1236 self.cleanup_collections();
1238
1239 let stray_replicas: Vec<_> = self.replicas.keys().collect();
1240 soft_assert_or_log!(
1241 stray_replicas.is_empty(),
1242 "dropped instance still has provisioned replicas: {stray_replicas:?}",
1243 );
1244
1245 let collections = self.collections.iter();
1246 let stray_collections: Vec<_> = collections
1247 .filter(|(_, c)| !c.log_collection)
1248 .map(|(id, _)| id)
1249 .collect();
1250 soft_assert_or_log!(
1251 stray_collections.is_empty(),
1252 "dropped instance still has installed collections: {stray_collections:?}",
1253 );
1254 }
1255
1256 #[mz_ore::instrument(level = "debug")]
1258 fn send(&mut self, cmd: ComputeCommand<T>) {
1259 self.history.push(cmd.clone());
1261
1262 for replica in self.replicas.values_mut() {
1264 let _ = replica.client.send(cmd.clone());
1266 }
1267 }
1268
1269 #[mz_ore::instrument(level = "debug")]
1271 pub fn add_replica(
1272 &mut self,
1273 id: ReplicaId,
1274 mut config: ReplicaConfig,
1275 epoch: Option<u64>,
1276 ) -> Result<(), ReplicaExists> {
1277 if self.replica_exists(id) {
1278 return Err(ReplicaExists(id));
1279 }
1280
1281 config.logging.index_logs = self.log_sources.clone();
1282
1283 let epoch = epoch.unwrap_or(1);
1284 let metrics = self.metrics.for_replica(id);
1285 let client = ReplicaClient::spawn(
1286 id,
1287 self.build_info,
1288 config.clone(),
1289 epoch,
1290 metrics.clone(),
1291 Arc::clone(&self.dyncfg),
1292 self.replica_tx.clone(),
1293 );
1294
1295 self.history.reduce();
1297
1298 self.history.update_source_uppers(&self.storage_collections);
1300
1301 for command in self.history.iter() {
1303 if client.send(command.clone()).is_err() {
1304 tracing::warn!("Replica {:?} connection terminated during hydration", id);
1307 break;
1308 }
1309 }
1310
1311 self.add_replica_state(id, client, config, epoch);
1313
1314 Ok(())
1315 }
1316
1317 #[mz_ore::instrument(level = "debug")]
1319 pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1320 self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1321
1322 let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1326 for subscribe_id in to_drop {
1327 let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1328 let response = ComputeControllerResponse::SubscribeResponse(
1329 subscribe_id,
1330 SubscribeBatch {
1331 lower: subscribe.frontier.clone(),
1332 upper: subscribe.frontier,
1333 updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1334 },
1335 );
1336 self.deliver_response(response);
1337 }
1338
1339 let mut peek_responses = Vec::new();
1344 let mut to_drop = Vec::new();
1345 for (uuid, peek) in self.peeks_targeting(id) {
1346 peek_responses.push(ComputeControllerResponse::PeekNotification(
1347 uuid,
1348 PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1349 peek.otel_ctx.clone(),
1350 ));
1351 to_drop.push(uuid);
1352 }
1353 for response in peek_responses {
1354 self.deliver_response(response);
1355 }
1356 for uuid in to_drop {
1357 let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1358 self.finish_peek(uuid, response);
1359 }
1360
1361 self.forward_implied_capabilities();
1364
1365 Ok(())
1366 }
1367
1368 fn rehydrate_replica(&mut self, id: ReplicaId) {
1374 let config = self.replicas[&id].config.clone();
1375 let epoch = self.replicas[&id].epoch + 1;
1376
1377 self.remove_replica(id).expect("replica must exist");
1378 let result = self.add_replica(id, config, Some(epoch));
1379
1380 match result {
1381 Ok(()) => (),
1382 Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1383 }
1384 }
1385
1386 fn rehydrate_failed_replicas(&mut self) {
1388 let replicas = self.replicas.iter();
1389 let failed_replicas: Vec<_> = replicas
1390 .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1391 .collect();
1392
1393 for replica_id in failed_replicas {
1394 self.rehydrate_replica(replica_id);
1395 }
1396 }
1397
1398 #[mz_ore::instrument(level = "debug")]
1407 pub fn create_dataflow(
1408 &mut self,
1409 dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1410 import_read_holds: Vec<ReadHold<T>>,
1411 subscribe_target_replica: Option<ReplicaId>,
1412 mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState<T>>,
1413 ) -> Result<(), DataflowCreationError> {
1414 use DataflowCreationError::*;
1415
1416 if let Some(replica_id) = subscribe_target_replica {
1417 if !self.replica_exists(replica_id) {
1418 return Err(ReplicaMissing(replica_id));
1419 }
1420 }
1421
1422 let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1424 if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1425 return Err(EmptyAsOfForSubscribe);
1426 }
1427 if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1428 return Err(EmptyAsOfForCopyTo);
1429 }
1430
1431 let mut storage_dependencies = BTreeMap::new();
1433 let mut compute_dependencies = BTreeMap::new();
1434
1435 let mut replica_input_read_holds = Vec::new();
1440
1441 let mut import_read_holds: BTreeMap<_, _> =
1442 import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1443
1444 for &id in dataflow.source_imports.keys() {
1445 let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1446 replica_input_read_holds.push(read_hold.clone());
1447
1448 read_hold
1449 .try_downgrade(as_of.clone())
1450 .map_err(|_| ReadHoldInsufficient(id))?;
1451 storage_dependencies.insert(id, read_hold);
1452 }
1453
1454 for &id in dataflow.index_imports.keys() {
1455 let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1456 read_hold
1457 .try_downgrade(as_of.clone())
1458 .map_err(|_| ReadHoldInsufficient(id))?;
1459 compute_dependencies.insert(id, read_hold);
1460 }
1461
1462 if as_of.is_empty() {
1465 replica_input_read_holds = Default::default();
1466 }
1467
1468 for export_id in dataflow.export_ids() {
1470 let shared = shared_collection_state
1471 .remove(&export_id)
1472 .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1473 let write_only = dataflow.sink_exports.contains_key(&export_id);
1474 let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1475
1476 self.add_collection(
1477 export_id,
1478 as_of.clone(),
1479 shared,
1480 storage_dependencies.clone(),
1481 compute_dependencies.clone(),
1482 replica_input_read_holds.clone(),
1483 write_only,
1484 storage_sink,
1485 dataflow.initial_storage_as_of.clone(),
1486 dataflow.refresh_schedule.clone(),
1487 );
1488
1489 if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1492 self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1493 }
1494 }
1495
1496 for subscribe_id in dataflow.subscribe_ids() {
1498 self.subscribes
1499 .insert(subscribe_id, ActiveSubscribe::new(subscribe_target_replica));
1500 }
1501
1502 for copy_to_id in dataflow.copy_to_ids() {
1504 self.copy_tos.insert(copy_to_id);
1505 }
1506
1507 let mut source_imports = BTreeMap::new();
1510 for (id, import) in dataflow.source_imports {
1511 let frontiers = self
1512 .storage_collections
1513 .collection_frontiers(id)
1514 .expect("collection exists");
1515
1516 let collection_metadata = self
1517 .storage_collections
1518 .collection_metadata(id)
1519 .expect("we have a read hold on this collection");
1520
1521 let desc = SourceInstanceDesc {
1522 storage_metadata: collection_metadata.clone(),
1523 arguments: import.desc.arguments,
1524 typ: import.desc.typ.clone(),
1525 };
1526 source_imports.insert(
1527 id,
1528 mz_compute_types::dataflows::SourceImport {
1529 desc,
1530 monotonic: import.monotonic,
1531 with_snapshot: import.with_snapshot,
1532 upper: frontiers.write_frontier,
1533 },
1534 );
1535 }
1536
1537 let mut sink_exports = BTreeMap::new();
1538 for (id, se) in dataflow.sink_exports {
1539 let connection = match se.connection {
1540 ComputeSinkConnection::MaterializedView(conn) => {
1541 let metadata = self
1542 .storage_collections
1543 .collection_metadata(id)
1544 .map_err(|_| CollectionMissing(id))?
1545 .clone();
1546 let conn = MaterializedViewSinkConnection {
1547 value_desc: conn.value_desc,
1548 storage_metadata: metadata,
1549 };
1550 ComputeSinkConnection::MaterializedView(conn)
1551 }
1552 ComputeSinkConnection::ContinualTask(conn) => {
1553 let metadata = self
1554 .storage_collections
1555 .collection_metadata(id)
1556 .map_err(|_| DataflowCreationError::CollectionMissing(id))?
1557 .clone();
1558 let conn = ContinualTaskConnection {
1559 input_id: conn.input_id,
1560 storage_metadata: metadata,
1561 };
1562 ComputeSinkConnection::ContinualTask(conn)
1563 }
1564 ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1565 ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1566 ComputeSinkConnection::CopyToS3Oneshot(conn)
1567 }
1568 };
1569 let desc = ComputeSinkDesc {
1570 from: se.from,
1571 from_desc: se.from_desc,
1572 connection,
1573 with_snapshot: se.with_snapshot,
1574 up_to: se.up_to,
1575 non_null_assertions: se.non_null_assertions,
1576 refresh_schedule: se.refresh_schedule,
1577 };
1578 sink_exports.insert(id, desc);
1579 }
1580
1581 let objects_to_build = dataflow
1583 .objects_to_build
1584 .into_iter()
1585 .map(|object| BuildDesc {
1586 id: object.id,
1587 plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1588 })
1589 .collect();
1590
1591 let augmented_dataflow = DataflowDescription {
1592 source_imports,
1593 sink_exports,
1594 objects_to_build,
1595 index_imports: dataflow.index_imports,
1597 index_exports: dataflow.index_exports,
1598 as_of: dataflow.as_of.clone(),
1599 until: dataflow.until,
1600 initial_storage_as_of: dataflow.initial_storage_as_of,
1601 refresh_schedule: dataflow.refresh_schedule,
1602 debug_name: dataflow.debug_name,
1603 time_dependence: dataflow.time_dependence,
1604 };
1605
1606 if augmented_dataflow.is_transient() {
1607 tracing::debug!(
1608 name = %augmented_dataflow.debug_name,
1609 import_ids = %augmented_dataflow.display_import_ids(),
1610 export_ids = %augmented_dataflow.display_export_ids(),
1611 as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1612 until = ?augmented_dataflow.until.elements(),
1613 "creating dataflow",
1614 );
1615 } else {
1616 tracing::info!(
1617 name = %augmented_dataflow.debug_name,
1618 import_ids = %augmented_dataflow.display_import_ids(),
1619 export_ids = %augmented_dataflow.display_export_ids(),
1620 as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1621 until = ?augmented_dataflow.until.elements(),
1622 "creating dataflow",
1623 );
1624 }
1625
1626 if as_of.is_empty() {
1629 tracing::info!(
1630 name = %augmented_dataflow.debug_name,
1631 "not sending `CreateDataflow`, because of empty `as_of`",
1632 );
1633 } else {
1634 let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1635 let dataflow = Box::new(augmented_dataflow);
1636 self.send(ComputeCommand::CreateDataflow(dataflow));
1637
1638 for id in collections {
1639 self.maybe_schedule_collection(id);
1640 }
1641 }
1642
1643 Ok(())
1644 }
1645
1646 fn maybe_schedule_collection(&mut self, id: GlobalId) {
1652 let collection = self.expect_collection(id);
1653
1654 if collection.scheduled {
1656 return;
1657 }
1658
1659 let as_of = collection.read_frontier();
1660
1661 if as_of.is_empty() {
1664 return;
1665 }
1666
1667 let ready = if id.is_transient() {
1668 true
1674 } else {
1675 let not_self_dep = |x: &GlobalId| *x != id;
1681
1682 let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1687 let compute_frontiers = compute_deps.map(|id| {
1688 let dep = &self.expect_collection(id);
1689 dep.write_frontier()
1690 });
1691
1692 let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1693 let storage_frontiers = self
1694 .storage_collections
1695 .collections_frontiers(storage_deps.collect())
1696 .expect("must exist");
1697 let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1698
1699 let ready = compute_frontiers
1700 .chain(storage_frontiers)
1701 .all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1702
1703 ready
1704 };
1705
1706 if ready {
1707 self.send(ComputeCommand::Schedule(id));
1708 let collection = self.expect_collection_mut(id);
1709 collection.scheduled = true;
1710 }
1711 }
1712
1713 fn schedule_collections(&mut self) {
1715 let ids: Vec<_> = self.collections.keys().copied().collect();
1716 for id in ids {
1717 self.maybe_schedule_collection(id);
1718 }
1719 }
1720
1721 #[mz_ore::instrument(level = "debug")]
1724 pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1725 for id in &ids {
1726 let collection = self.collection_mut(*id)?;
1727
1728 collection.dropped = true;
1730
1731 collection.implied_read_hold.release();
1734 collection.warmup_read_hold.release();
1735
1736 self.subscribes.remove(id);
1739 self.copy_tos.remove(id);
1742 }
1743
1744 Ok(())
1745 }
1746
1747 #[mz_ore::instrument(level = "debug")]
1751 pub fn peek(
1752 &mut self,
1753 peek_target: PeekTarget,
1754 literal_constraints: Option<Vec<Row>>,
1755 uuid: Uuid,
1756 timestamp: T,
1757 result_desc: RelationDesc,
1758 finishing: RowSetFinishing,
1759 map_filter_project: mz_expr::SafeMfpPlan,
1760 mut read_hold: ReadHold<T>,
1761 target_replica: Option<ReplicaId>,
1762 peek_response_tx: oneshot::Sender<PeekResponse>,
1763 ) -> Result<(), PeekError> {
1764 use PeekError::*;
1765
1766 let target_id = peek_target.id();
1767
1768 if read_hold.id() != target_id {
1770 return Err(ReadHoldIdMismatch(read_hold.id()));
1771 }
1772 read_hold
1773 .try_downgrade(Antichain::from_elem(timestamp.clone()))
1774 .map_err(|_| ReadHoldInsufficient(target_id))?;
1775
1776 if let Some(target) = target_replica {
1777 if !self.replica_exists(target) {
1778 return Err(ReplicaMissing(target));
1779 }
1780 }
1781
1782 let otel_ctx = OpenTelemetryContext::obtain();
1783
1784 self.peeks.insert(
1785 uuid,
1786 PendingPeek {
1787 target_replica,
1788 otel_ctx: otel_ctx.clone(),
1790 requested_at: Instant::now(),
1791 read_hold,
1792 peek_response_tx,
1793 limit: finishing.limit.map(usize::cast_from),
1794 offset: finishing.offset,
1795 },
1796 );
1797
1798 let peek = Peek {
1799 literal_constraints,
1800 uuid,
1801 timestamp,
1802 finishing,
1803 map_filter_project,
1804 otel_ctx,
1807 target: peek_target,
1808 result_desc,
1809 };
1810 self.send(ComputeCommand::Peek(Box::new(peek)));
1811
1812 Ok(())
1813 }
1814
1815 #[mz_ore::instrument(level = "debug")]
1817 pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1818 let Some(peek) = self.peeks.get_mut(&uuid) else {
1819 tracing::warn!("did not find pending peek for {uuid}");
1820 return;
1821 };
1822
1823 let duration = peek.requested_at.elapsed();
1824 self.metrics
1825 .observe_peek_response(&PeekResponse::Canceled, duration);
1826
1827 let otel_ctx = peek.otel_ctx.clone();
1829 otel_ctx.attach_as_parent();
1830
1831 self.deliver_response(ComputeControllerResponse::PeekNotification(
1832 uuid,
1833 PeekNotification::Canceled,
1834 otel_ctx,
1835 ));
1836
1837 self.finish_peek(uuid, reason);
1840 }
1841
1842 #[mz_ore::instrument(level = "debug")]
1854 pub fn set_read_policy(
1855 &mut self,
1856 policies: Vec<(GlobalId, ReadPolicy<T>)>,
1857 ) -> Result<(), ReadPolicyError> {
1858 for (id, _policy) in &policies {
1861 let collection = self.collection(*id)?;
1862 if collection.read_policy.is_none() {
1863 return Err(ReadPolicyError::WriteOnlyCollection(*id));
1864 }
1865 }
1866
1867 for (id, new_policy) in policies {
1868 let collection = self.expect_collection_mut(id);
1869 let new_since = new_policy.frontier(collection.write_frontier().borrow());
1870 let _ = collection.implied_read_hold.try_downgrade(new_since);
1871 collection.read_policy = Some(new_policy);
1872 }
1873
1874 Ok(())
1875 }
1876
1877 #[mz_ore::instrument(level = "debug")]
1885 fn maybe_update_global_write_frontier(&mut self, id: GlobalId, new_frontier: Antichain<T>) {
1886 let collection = self.expect_collection_mut(id);
1887
1888 let advanced = collection.shared.lock_write_frontier(|f| {
1889 let advanced = PartialOrder::less_than(f, &new_frontier);
1890 if advanced {
1891 f.clone_from(&new_frontier);
1892 }
1893 advanced
1894 });
1895
1896 if !advanced {
1897 return;
1898 }
1899
1900 let new_since = match &collection.read_policy {
1902 Some(read_policy) => {
1903 read_policy.frontier(new_frontier.borrow())
1906 }
1907 None => {
1908 Antichain::from_iter(
1917 new_frontier
1918 .iter()
1919 .map(|t| t.step_back().unwrap_or_else(T::minimum)),
1920 )
1921 }
1922 };
1923 let _ = collection.implied_read_hold.try_downgrade(new_since);
1924
1925 self.deliver_response(ComputeControllerResponse::FrontierUpper {
1927 id,
1928 upper: new_frontier,
1929 });
1930 }
1931
1932 fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch<T>) {
1934 let Some(collection) = self.collections.get_mut(&id) else {
1935 soft_panic_or_log!(
1936 "read hold change for absent collection (id={id}, changes={update:?})"
1937 );
1938 return;
1939 };
1940
1941 let new_since = collection.shared.lock_read_capabilities(|caps| {
1942 let read_frontier = caps.frontier();
1945 for (time, diff) in update.iter() {
1946 let count = caps.count_for(time) + diff;
1947 assert!(
1948 count >= 0,
1949 "invalid read capabilities update: negative capability \
1950 (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1951 );
1952 assert!(
1953 count == 0 || read_frontier.less_equal(time),
1954 "invalid read capabilities update: frontier regression \
1955 (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1956 );
1957 }
1958
1959 let changes = caps.update_iter(update.drain());
1962
1963 let changed = changes.count() > 0;
1964 changed.then(|| caps.frontier().to_owned())
1965 });
1966
1967 let Some(new_since) = new_since else {
1968 return; };
1970
1971 for read_hold in collection.compute_dependencies.values_mut() {
1973 read_hold
1974 .try_downgrade(new_since.clone())
1975 .expect("frontiers don't regress");
1976 }
1977 for read_hold in collection.storage_dependencies.values_mut() {
1978 read_hold
1979 .try_downgrade(new_since.clone())
1980 .expect("frontiers don't regress");
1981 }
1982
1983 self.send(ComputeCommand::AllowCompaction {
1985 id,
1986 frontier: new_since,
1987 });
1988 }
1989
1990 fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1999 let Some(peek) = self.peeks.remove(&uuid) else {
2000 return;
2001 };
2002
2003 let _ = peek.peek_response_tx.send(response);
2005
2006 self.send(ComputeCommand::CancelPeek { uuid });
2009
2010 drop(peek.read_hold);
2011 }
2012
2013 fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse<T>) {
2016 if self
2018 .replicas
2019 .get(&replica_id)
2020 .filter(|replica| replica.epoch == epoch)
2021 .is_none()
2022 {
2023 return;
2024 }
2025
2026 match response {
2029 ComputeResponse::Frontiers(id, frontiers) => {
2030 self.handle_frontiers_response(id, frontiers, replica_id);
2031 }
2032 ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
2033 self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
2034 }
2035 ComputeResponse::CopyToResponse(id, response) => {
2036 self.handle_copy_to_response(id, response, replica_id);
2037 }
2038 ComputeResponse::SubscribeResponse(id, response) => {
2039 self.handle_subscribe_response(id, response, replica_id);
2040 }
2041 ComputeResponse::Status(response) => {
2042 self.handle_status_response(response, replica_id);
2043 }
2044 }
2045 }
2046
2047 fn handle_frontiers_response(
2050 &mut self,
2051 id: GlobalId,
2052 frontiers: FrontiersResponse<T>,
2053 replica_id: ReplicaId,
2054 ) {
2055 if !self.collections.contains_key(&id) {
2056 soft_panic_or_log!(
2057 "frontiers update for an unknown collection \
2058 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
2059 );
2060 return;
2061 }
2062 let Some(replica) = self.replicas.get_mut(&replica_id) else {
2063 soft_panic_or_log!(
2064 "frontiers update for an unknown replica \
2065 (replica_id={replica_id}, frontiers={frontiers:?})"
2066 );
2067 return;
2068 };
2069 let Some(replica_collection) = replica.collections.get_mut(&id) else {
2070 soft_panic_or_log!(
2071 "frontiers update for an unknown replica collection \
2072 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
2073 );
2074 return;
2075 };
2076
2077 if let Some(new_frontier) = frontiers.input_frontier {
2078 replica_collection.update_input_frontier(new_frontier.clone());
2079 }
2080 if let Some(new_frontier) = frontiers.output_frontier {
2081 replica_collection.update_output_frontier(new_frontier.clone());
2082 }
2083 if let Some(new_frontier) = frontiers.write_frontier {
2084 replica_collection.update_write_frontier(new_frontier.clone());
2085 self.maybe_update_global_write_frontier(id, new_frontier);
2086 }
2087 }
2088
2089 #[mz_ore::instrument(level = "debug")]
2090 fn handle_peek_response(
2091 &mut self,
2092 uuid: Uuid,
2093 response: PeekResponse,
2094 otel_ctx: OpenTelemetryContext,
2095 replica_id: ReplicaId,
2096 ) {
2097 otel_ctx.attach_as_parent();
2098
2099 let Some(peek) = self.peeks.get(&uuid) else {
2102 return;
2103 };
2104
2105 let target_replica = peek.target_replica.unwrap_or(replica_id);
2107 if target_replica != replica_id {
2108 return;
2109 }
2110
2111 let duration = peek.requested_at.elapsed();
2112 self.metrics.observe_peek_response(&response, duration);
2113
2114 let notification = PeekNotification::new(&response, peek.offset, peek.limit);
2115 self.deliver_response(ComputeControllerResponse::PeekNotification(
2118 uuid,
2119 notification,
2120 otel_ctx,
2121 ));
2122
2123 self.finish_peek(uuid, response)
2124 }
2125
2126 fn handle_copy_to_response(
2127 &mut self,
2128 sink_id: GlobalId,
2129 response: CopyToResponse,
2130 replica_id: ReplicaId,
2131 ) {
2132 if !self.collections.contains_key(&sink_id) {
2133 soft_panic_or_log!(
2134 "received response for an unknown copy-to \
2135 (sink_id={sink_id}, replica_id={replica_id})",
2136 );
2137 return;
2138 }
2139 let Some(replica) = self.replicas.get_mut(&replica_id) else {
2140 soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
2141 return;
2142 };
2143 let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
2144 soft_panic_or_log!(
2145 "copy-to response for an unknown replica collection \
2146 (sink_id={sink_id}, replica_id={replica_id})"
2147 );
2148 return;
2149 };
2150
2151 replica_collection.update_write_frontier(Antichain::new());
2155 replica_collection.update_input_frontier(Antichain::new());
2156 replica_collection.update_output_frontier(Antichain::new());
2157
2158 if !self.copy_tos.remove(&sink_id) {
2161 return;
2162 }
2163
2164 let result = match response {
2165 CopyToResponse::RowCount(count) => Ok(count),
2166 CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
2167 CopyToResponse::Dropped => {
2172 tracing::error!(
2173 %sink_id, %replica_id,
2174 "received `Dropped` response for a tracked copy to",
2175 );
2176 return;
2177 }
2178 };
2179
2180 self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2181 }
2182
2183 fn handle_subscribe_response(
2184 &mut self,
2185 subscribe_id: GlobalId,
2186 response: SubscribeResponse<T>,
2187 replica_id: ReplicaId,
2188 ) {
2189 if !self.collections.contains_key(&subscribe_id) {
2190 soft_panic_or_log!(
2191 "received response for an unknown subscribe \
2192 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2193 );
2194 return;
2195 }
2196 let Some(replica) = self.replicas.get_mut(&replica_id) else {
2197 soft_panic_or_log!(
2198 "subscribe response for an unknown replica (replica_id={replica_id})"
2199 );
2200 return;
2201 };
2202 let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2203 soft_panic_or_log!(
2204 "subscribe response for an unknown replica collection \
2205 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2206 );
2207 return;
2208 };
2209
2210 let write_frontier = match &response {
2214 SubscribeResponse::Batch(batch) => batch.upper.clone(),
2215 SubscribeResponse::DroppedAt(_) => Antichain::new(),
2216 };
2217
2218 replica_collection.update_write_frontier(write_frontier.clone());
2222 replica_collection.update_input_frontier(write_frontier.clone());
2223 replica_collection.update_output_frontier(write_frontier.clone());
2224
2225 let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2227 return;
2228 };
2229 let replica_targeted = subscribe.target_replica.unwrap_or(replica_id) == replica_id;
2230 if !replica_targeted {
2231 return;
2232 }
2233
2234 self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2240
2241 match response {
2242 SubscribeResponse::Batch(batch) => {
2243 let upper = batch.upper;
2244 let mut updates = batch.updates;
2245
2246 if PartialOrder::less_than(&subscribe.frontier, &upper) {
2249 let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2250
2251 if upper.is_empty() {
2252 self.subscribes.remove(&subscribe_id);
2254 } else {
2255 self.subscribes.insert(subscribe_id, subscribe);
2257 }
2258
2259 if let Ok(updates) = updates.as_mut() {
2260 updates.retain(|(time, _data, _diff)| lower.less_equal(time));
2261 }
2262 self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2263 subscribe_id,
2264 SubscribeBatch {
2265 lower,
2266 upper,
2267 updates,
2268 },
2269 ));
2270 }
2271 }
2272 SubscribeResponse::DroppedAt(frontier) => {
2273 tracing::error!(
2278 %subscribe_id,
2279 %replica_id,
2280 frontier = ?frontier.elements(),
2281 "received `DroppedAt` response for a tracked subscribe",
2282 );
2283 self.subscribes.remove(&subscribe_id);
2284 }
2285 }
2286 }
2287
2288 fn handle_status_response(&self, response: StatusResponse, _replica_id: ReplicaId) {
2289 match response {
2290 StatusResponse::Placeholder => {}
2291 }
2292 }
2293
2294 fn dependency_write_frontiers<'b>(
2296 &'b self,
2297 collection: &'b CollectionState<T>,
2298 ) -> impl Iterator<Item = Antichain<T>> + 'b {
2299 let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2300 let collection = self.collections.get(&dep_id);
2301 collection.map(|c| c.write_frontier())
2302 });
2303 let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2304 let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2305 frontiers.map(|f| f.write_frontier)
2306 });
2307
2308 compute_frontiers.chain(storage_frontiers)
2309 }
2310
2311 fn transitive_storage_dependency_write_frontiers<'b>(
2313 &'b self,
2314 collection: &'b CollectionState<T>,
2315 ) -> impl Iterator<Item = Antichain<T>> + 'b {
2316 let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2317 let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2318 let mut done = BTreeSet::new();
2319
2320 while let Some(id) = todo.pop() {
2321 if done.contains(&id) {
2322 continue;
2323 }
2324 if let Some(dep) = self.collections.get(&id) {
2325 storage_ids.extend(dep.storage_dependency_ids());
2326 todo.extend(dep.compute_dependency_ids())
2327 }
2328 done.insert(id);
2329 }
2330
2331 let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2332 let frontiers = self.storage_collections.collection_frontiers(id).ok();
2333 frontiers.map(|f| f.write_frontier)
2334 });
2335
2336 storage_frontiers
2337 }
2338
2339 fn downgrade_warmup_capabilities(&mut self) {
2352 let mut new_capabilities = BTreeMap::new();
2353 for (id, collection) in &self.collections {
2354 if collection.read_policy.is_none()
2358 && collection.shared.lock_write_frontier(|f| f.is_empty())
2359 {
2360 new_capabilities.insert(*id, Antichain::new());
2361 continue;
2362 }
2363
2364 let mut new_capability = Antichain::new();
2365 for frontier in self.dependency_write_frontiers(collection) {
2366 for time in frontier {
2367 new_capability.insert(time.step_back().unwrap_or(time));
2368 }
2369 }
2370
2371 new_capabilities.insert(*id, new_capability);
2372 }
2373
2374 for (id, new_capability) in new_capabilities {
2375 let collection = self.expect_collection_mut(id);
2376 let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2377 }
2378 }
2379
2380 fn forward_implied_capabilities(&mut self) {
2408 if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2409 return;
2410 }
2411 if !self.replicas.is_empty() {
2412 return;
2413 }
2414
2415 let mut new_capabilities = BTreeMap::new();
2416 for (id, collection) in &self.collections {
2417 let Some(read_policy) = &collection.read_policy else {
2418 continue;
2420 };
2421
2422 let mut dep_frontier = Antichain::new();
2426 for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2427 dep_frontier.extend(frontier);
2428 }
2429
2430 let new_capability = read_policy.frontier(dep_frontier.borrow());
2431 if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2432 new_capabilities.insert(*id, new_capability);
2433 }
2434 }
2435
2436 for (id, new_capability) in new_capabilities {
2437 let collection = self.expect_collection_mut(id);
2438 let _ = collection.implied_read_hold.try_downgrade(new_capability);
2439 }
2440 }
2441
2442 fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
2447 let collection = self.collection(id)?;
2453 let since = collection.shared.lock_read_capabilities(|caps| {
2454 let since = caps.frontier().to_owned();
2455 caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
2456 since
2457 });
2458 let hold = ReadHold::new(id, since, Arc::clone(&self.read_hold_tx));
2459 Ok(hold)
2460 }
2461
2462 #[mz_ore::instrument(level = "debug")]
2468 pub fn maintain(&mut self) {
2469 self.rehydrate_failed_replicas();
2470 self.downgrade_warmup_capabilities();
2471 self.forward_implied_capabilities();
2472 self.schedule_collections();
2473 self.cleanup_collections();
2474 self.update_frontier_introspection();
2475 self.refresh_state_metrics();
2476 self.refresh_wallclock_lag();
2477 }
2478}
2479
2480#[derive(Debug)]
2485struct CollectionState<T: ComputeControllerTimestamp> {
2486 log_collection: bool,
2490 dropped: bool,
2496 scheduled: bool,
2499
2500 read_only: bool,
2504
2505 shared: SharedCollectionState<T>,
2507
2508 implied_read_hold: ReadHold<T>,
2515 warmup_read_hold: ReadHold<T>,
2523 read_policy: Option<ReadPolicy<T>>,
2529
2530 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2533 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2536
2537 introspection: CollectionIntrospection<T>,
2539
2540 wallclock_lag_histogram_stash: Option<
2547 BTreeMap<
2548 (
2549 WallclockLagHistogramPeriod,
2550 WallclockLag,
2551 BTreeMap<&'static str, String>,
2552 ),
2553 Diff,
2554 >,
2555 >,
2556}
2557
2558impl<T: ComputeControllerTimestamp> CollectionState<T> {
2559 fn new(
2561 collection_id: GlobalId,
2562 as_of: Antichain<T>,
2563 shared: SharedCollectionState<T>,
2564 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2565 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2566 read_hold_tx: read_holds::ChangeTx<T>,
2567 introspection: CollectionIntrospection<T>,
2568 ) -> Self {
2569 let since = as_of.clone();
2571 let upper = as_of;
2573
2574 assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2576 assert!(shared.lock_write_frontier(|f| f == &upper));
2577
2578 let implied_read_hold =
2582 ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2583 let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2584
2585 let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2586 shared.lock_read_capabilities(|c| {
2587 c.update_iter(updates);
2588 });
2589
2590 let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2594 true => None,
2595 false => Some(Default::default()),
2596 };
2597
2598 Self {
2599 log_collection: false,
2600 dropped: false,
2601 scheduled: false,
2602 read_only: true,
2603 shared,
2604 implied_read_hold,
2605 warmup_read_hold,
2606 read_policy: Some(ReadPolicy::ValidFrom(since)),
2607 storage_dependencies,
2608 compute_dependencies,
2609 introspection,
2610 wallclock_lag_histogram_stash,
2611 }
2612 }
2613
2614 fn new_log_collection(
2616 id: GlobalId,
2617 shared: SharedCollectionState<T>,
2618 read_hold_tx: read_holds::ChangeTx<T>,
2619 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2620 ) -> Self {
2621 let since = Antichain::from_elem(T::minimum());
2622 let introspection =
2623 CollectionIntrospection::new(id, introspection_tx, since.clone(), false, None, None);
2624 let mut state = Self::new(
2625 id,
2626 since,
2627 shared,
2628 Default::default(),
2629 Default::default(),
2630 read_hold_tx,
2631 introspection,
2632 );
2633 state.log_collection = true;
2634 state.scheduled = true;
2636 state
2637 }
2638
2639 fn read_frontier(&self) -> Antichain<T> {
2641 self.shared
2642 .lock_read_capabilities(|c| c.frontier().to_owned())
2643 }
2644
2645 fn write_frontier(&self) -> Antichain<T> {
2647 self.shared.lock_write_frontier(|f| f.clone())
2648 }
2649
2650 fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2651 self.storage_dependencies.keys().copied()
2652 }
2653
2654 fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2655 self.compute_dependencies.keys().copied()
2656 }
2657
2658 fn dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2660 self.compute_dependency_ids()
2661 .chain(self.storage_dependency_ids())
2662 }
2663}
2664
2665#[derive(Clone, Debug)]
2676pub(super) struct SharedCollectionState<T> {
2677 read_capabilities: Arc<Mutex<MutableAntichain<T>>>,
2690 write_frontier: Arc<Mutex<Antichain<T>>>,
2692}
2693
2694impl<T: Timestamp> SharedCollectionState<T> {
2695 pub fn new(as_of: Antichain<T>) -> Self {
2696 let since = as_of.clone();
2698 let upper = as_of;
2700
2701 let mut read_capabilities = MutableAntichain::new();
2705 read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2706
2707 Self {
2708 read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2709 write_frontier: Arc::new(Mutex::new(upper)),
2710 }
2711 }
2712
2713 pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2714 where
2715 F: FnOnce(&mut MutableAntichain<T>) -> R,
2716 {
2717 let mut caps = self.read_capabilities.lock().expect("poisoned");
2718 f(&mut *caps)
2719 }
2720
2721 pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2722 where
2723 F: FnOnce(&mut Antichain<T>) -> R,
2724 {
2725 let mut frontier = self.write_frontier.lock().expect("poisoned");
2726 f(&mut *frontier)
2727 }
2728}
2729
2730#[derive(Debug)]
2735struct CollectionIntrospection<T: ComputeControllerTimestamp> {
2736 collection_id: GlobalId,
2738 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2740 frontiers: Option<FrontiersIntrospectionState<T>>,
2745 refresh: Option<RefreshIntrospectionState<T>>,
2749}
2750
2751impl<T: ComputeControllerTimestamp> CollectionIntrospection<T> {
2752 fn new(
2753 collection_id: GlobalId,
2754 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2755 as_of: Antichain<T>,
2756 storage_sink: bool,
2757 initial_as_of: Option<Antichain<T>>,
2758 refresh_schedule: Option<RefreshSchedule>,
2759 ) -> Self {
2760 let refresh =
2761 match (refresh_schedule, initial_as_of) {
2762 (Some(refresh_schedule), Some(initial_as_of)) => Some(
2763 RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2764 ),
2765 (refresh_schedule, _) => {
2766 soft_assert_or_log!(
2769 refresh_schedule.is_none(),
2770 "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2771 );
2772 None
2773 }
2774 };
2775 let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2776
2777 let self_ = Self {
2778 collection_id,
2779 introspection_tx,
2780 frontiers,
2781 refresh,
2782 };
2783
2784 self_.report_initial_state();
2785 self_
2786 }
2787
2788 fn report_initial_state(&self) {
2790 if let Some(frontiers) = &self.frontiers {
2791 let row = frontiers.row_for_collection(self.collection_id);
2792 let updates = vec![(row, Diff::ONE)];
2793 self.send(IntrospectionType::Frontiers, updates);
2794 }
2795
2796 if let Some(refresh) = &self.refresh {
2797 let row = refresh.row_for_collection(self.collection_id);
2798 let updates = vec![(row, Diff::ONE)];
2799 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2800 }
2801 }
2802
2803 fn observe_frontiers(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2806 self.update_frontier_introspection(read_frontier, write_frontier);
2807 self.update_refresh_introspection(write_frontier);
2808 }
2809
2810 fn update_frontier_introspection(
2811 &mut self,
2812 read_frontier: &Antichain<T>,
2813 write_frontier: &Antichain<T>,
2814 ) {
2815 let Some(frontiers) = &mut self.frontiers else {
2816 return;
2817 };
2818
2819 if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2820 {
2821 return; };
2823
2824 let retraction = frontiers.row_for_collection(self.collection_id);
2825 frontiers.update(read_frontier, write_frontier);
2826 let insertion = frontiers.row_for_collection(self.collection_id);
2827 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2828 self.send(IntrospectionType::Frontiers, updates);
2829 }
2830
2831 fn update_refresh_introspection(&mut self, write_frontier: &Antichain<T>) {
2832 let Some(refresh) = &mut self.refresh else {
2833 return;
2834 };
2835
2836 let retraction = refresh.row_for_collection(self.collection_id);
2837 refresh.frontier_update(write_frontier);
2838 let insertion = refresh.row_for_collection(self.collection_id);
2839
2840 if retraction == insertion {
2841 return; }
2843
2844 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2845 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2846 }
2847
2848 fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2849 let _ = self.introspection_tx.send((introspection_type, updates));
2852 }
2853}
2854
2855impl<T: ComputeControllerTimestamp> Drop for CollectionIntrospection<T> {
2856 fn drop(&mut self) {
2857 if let Some(frontiers) = &self.frontiers {
2859 let row = frontiers.row_for_collection(self.collection_id);
2860 let updates = vec![(row, Diff::MINUS_ONE)];
2861 self.send(IntrospectionType::Frontiers, updates);
2862 }
2863
2864 if let Some(refresh) = &self.refresh {
2866 let retraction = refresh.row_for_collection(self.collection_id);
2867 let updates = vec![(retraction, Diff::MINUS_ONE)];
2868 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2869 }
2870 }
2871}
2872
2873#[derive(Debug)]
2874struct FrontiersIntrospectionState<T> {
2875 read_frontier: Antichain<T>,
2876 write_frontier: Antichain<T>,
2877}
2878
2879impl<T: ComputeControllerTimestamp> FrontiersIntrospectionState<T> {
2880 fn new(as_of: Antichain<T>) -> Self {
2881 Self {
2882 read_frontier: as_of.clone(),
2883 write_frontier: as_of,
2884 }
2885 }
2886
2887 fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2889 let read_frontier = self
2890 .read_frontier
2891 .as_option()
2892 .map_or(Datum::Null, |ts| ts.clone().into());
2893 let write_frontier = self
2894 .write_frontier
2895 .as_option()
2896 .map_or(Datum::Null, |ts| ts.clone().into());
2897 Row::pack_slice(&[
2898 Datum::String(&collection_id.to_string()),
2899 read_frontier,
2900 write_frontier,
2901 ])
2902 }
2903
2904 fn update(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2906 if read_frontier != &self.read_frontier {
2907 self.read_frontier.clone_from(read_frontier);
2908 }
2909 if write_frontier != &self.write_frontier {
2910 self.write_frontier.clone_from(write_frontier);
2911 }
2912 }
2913}
2914
2915#[derive(Debug)]
2918struct RefreshIntrospectionState<T> {
2919 refresh_schedule: RefreshSchedule,
2921 initial_as_of: Antichain<T>,
2922 next_refresh: Datum<'static>, last_completed_refresh: Datum<'static>, }
2926
2927impl<T> RefreshIntrospectionState<T> {
2928 fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2930 Row::pack_slice(&[
2931 Datum::String(&collection_id.to_string()),
2932 self.last_completed_refresh,
2933 self.next_refresh,
2934 ])
2935 }
2936}
2937
2938impl<T: ComputeControllerTimestamp> RefreshIntrospectionState<T> {
2939 fn new(
2942 refresh_schedule: RefreshSchedule,
2943 initial_as_of: Antichain<T>,
2944 upper: &Antichain<T>,
2945 ) -> Self {
2946 let mut self_ = Self {
2947 refresh_schedule: refresh_schedule.clone(),
2948 initial_as_of: initial_as_of.clone(),
2949 next_refresh: Datum::Null,
2950 last_completed_refresh: Datum::Null,
2951 };
2952 self_.frontier_update(upper);
2953 self_
2954 }
2955
2956 fn frontier_update(&mut self, write_frontier: &Antichain<T>) {
2959 if write_frontier.is_empty() {
2960 self.last_completed_refresh =
2961 if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2962 last_refresh.into()
2963 } else {
2964 T::maximum().into()
2967 };
2968 self.next_refresh = Datum::Null;
2969 } else {
2970 if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2971 self.last_completed_refresh = Datum::Null;
2973 let initial_as_of = self.initial_as_of.as_option().expect(
2974 "initial_as_of can't be [], because then there would be no refreshes at all",
2975 );
2976 let first_refresh = initial_as_of
2977 .round_up(&self.refresh_schedule)
2978 .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2979 soft_assert_or_log!(
2980 first_refresh == *initial_as_of,
2981 "initial_as_of should be set to the first refresh"
2982 );
2983 self.next_refresh = first_refresh.into();
2984 } else {
2985 let write_frontier = write_frontier.as_option().expect("checked above");
2987 self.last_completed_refresh = write_frontier
2988 .round_down_minus_1(&self.refresh_schedule)
2989 .map_or_else(
2990 || {
2991 soft_panic_or_log!(
2992 "rounding down should have returned the first refresh or later"
2993 );
2994 Datum::Null
2995 },
2996 |last_completed_refresh| last_completed_refresh.into(),
2997 );
2998 self.next_refresh = write_frontier.clone().into();
2999 }
3000 }
3001 }
3002}
3003
3004#[derive(Debug)]
3006struct PendingPeek<T: Timestamp> {
3007 target_replica: Option<ReplicaId>,
3011 otel_ctx: OpenTelemetryContext,
3013 requested_at: Instant,
3017 read_hold: ReadHold<T>,
3019 peek_response_tx: oneshot::Sender<PeekResponse>,
3021 limit: Option<usize>,
3023 offset: usize,
3025}
3026
3027#[derive(Debug, Clone)]
3028struct ActiveSubscribe<T> {
3029 frontier: Antichain<T>,
3031 target_replica: Option<ReplicaId>,
3035}
3036
3037impl<T: ComputeControllerTimestamp> ActiveSubscribe<T> {
3038 fn new(target_replica: Option<ReplicaId>) -> Self {
3039 Self {
3040 frontier: Antichain::from_elem(T::minimum()),
3041 target_replica,
3042 }
3043 }
3044}
3045
3046#[derive(Debug)]
3048struct ReplicaState<T: ComputeControllerTimestamp> {
3049 id: ReplicaId,
3051 client: ReplicaClient<T>,
3053 config: ReplicaConfig,
3055 metrics: ReplicaMetrics,
3057 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3059 collections: BTreeMap<GlobalId, ReplicaCollectionState<T>>,
3061 epoch: u64,
3063}
3064
3065impl<T: ComputeControllerTimestamp> ReplicaState<T> {
3066 fn new(
3067 id: ReplicaId,
3068 client: ReplicaClient<T>,
3069 config: ReplicaConfig,
3070 metrics: ReplicaMetrics,
3071 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3072 epoch: u64,
3073 ) -> Self {
3074 Self {
3075 id,
3076 client,
3077 config,
3078 metrics,
3079 introspection_tx,
3080 epoch,
3081 collections: Default::default(),
3082 }
3083 }
3084
3085 fn add_collection(
3091 &mut self,
3092 id: GlobalId,
3093 as_of: Antichain<T>,
3094 input_read_holds: Vec<ReadHold<T>>,
3095 ) {
3096 let metrics = self.metrics.for_collection(id);
3097 let introspection = ReplicaCollectionIntrospection::new(
3098 self.id,
3099 id,
3100 self.introspection_tx.clone(),
3101 as_of.clone(),
3102 );
3103 let mut state =
3104 ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
3105
3106 if id.is_transient() {
3110 state.wallclock_lag_max = None;
3111 }
3112
3113 if let Some(previous) = self.collections.insert(id, state) {
3114 panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
3115 }
3116 }
3117
3118 fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState<T>> {
3120 self.collections.remove(&id)
3121 }
3122
3123 fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
3125 self.collections.get(&id).map_or(true, |c| {
3126 c.write_frontier.is_empty()
3127 && c.input_frontier.is_empty()
3128 && c.output_frontier.is_empty()
3129 })
3130 }
3131
3132 #[mz_ore::instrument(level = "debug")]
3136 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3137 let Self {
3144 id,
3145 client: _,
3146 config: _,
3147 metrics: _,
3148 introspection_tx: _,
3149 epoch,
3150 collections,
3151 } = self;
3152
3153 let collections: BTreeMap<_, _> = collections
3154 .iter()
3155 .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
3156 .collect();
3157
3158 Ok(serde_json::json!({
3159 "id": id.to_string(),
3160 "collections": collections,
3161 "epoch": epoch,
3162 }))
3163 }
3164}
3165
3166#[derive(Debug)]
3167struct ReplicaCollectionState<T: ComputeControllerTimestamp> {
3168 write_frontier: Antichain<T>,
3172 input_frontier: Antichain<T>,
3176 output_frontier: Antichain<T>,
3180
3181 metrics: Option<ReplicaCollectionMetrics>,
3185 as_of: Antichain<T>,
3187 introspection: ReplicaCollectionIntrospection<T>,
3189 input_read_holds: Vec<ReadHold<T>>,
3195
3196 wallclock_lag_max: Option<WallclockLag>,
3200}
3201
3202impl<T: ComputeControllerTimestamp> ReplicaCollectionState<T> {
3203 fn new(
3204 metrics: Option<ReplicaCollectionMetrics>,
3205 as_of: Antichain<T>,
3206 introspection: ReplicaCollectionIntrospection<T>,
3207 input_read_holds: Vec<ReadHold<T>>,
3208 ) -> Self {
3209 Self {
3210 write_frontier: as_of.clone(),
3211 input_frontier: as_of.clone(),
3212 output_frontier: as_of.clone(),
3213 metrics,
3214 as_of,
3215 introspection,
3216 input_read_holds,
3217 wallclock_lag_max: Some(WallclockLag::MIN),
3218 }
3219 }
3220
3221 fn hydrated(&self) -> bool {
3223 self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3239 }
3240
3241 fn update_write_frontier(&mut self, new_frontier: Antichain<T>) {
3243 if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3244 soft_panic_or_log!(
3245 "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3246 self.write_frontier,
3247 );
3248 return;
3249 } else if new_frontier == self.write_frontier {
3250 return;
3251 }
3252
3253 self.write_frontier = new_frontier;
3254 }
3255
3256 fn update_input_frontier(&mut self, new_frontier: Antichain<T>) {
3258 if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3259 soft_panic_or_log!(
3260 "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3261 self.input_frontier,
3262 );
3263 return;
3264 } else if new_frontier == self.input_frontier {
3265 return;
3266 }
3267
3268 self.input_frontier = new_frontier;
3269
3270 for read_hold in &mut self.input_read_holds {
3272 let result = read_hold.try_downgrade(self.input_frontier.clone());
3273 soft_assert_or_log!(
3274 result.is_ok(),
3275 "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3276 self.input_frontier,
3277 );
3278 }
3279 }
3280
3281 fn update_output_frontier(&mut self, new_frontier: Antichain<T>) {
3283 if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3284 soft_panic_or_log!(
3285 "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3286 self.output_frontier,
3287 );
3288 return;
3289 } else if new_frontier == self.output_frontier {
3290 return;
3291 }
3292
3293 self.output_frontier = new_frontier;
3294 }
3295}
3296
3297#[derive(Debug)]
3300struct ReplicaCollectionIntrospection<T: ComputeControllerTimestamp> {
3301 replica_id: ReplicaId,
3303 collection_id: GlobalId,
3305 write_frontier: Antichain<T>,
3307 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3309}
3310
3311impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
3312 fn new(
3314 replica_id: ReplicaId,
3315 collection_id: GlobalId,
3316 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3317 as_of: Antichain<T>,
3318 ) -> Self {
3319 let self_ = Self {
3320 replica_id,
3321 collection_id,
3322 write_frontier: as_of,
3323 introspection_tx,
3324 };
3325
3326 self_.report_initial_state();
3327 self_
3328 }
3329
3330 fn report_initial_state(&self) {
3332 let row = self.write_frontier_row();
3333 let updates = vec![(row, Diff::ONE)];
3334 self.send(IntrospectionType::ReplicaFrontiers, updates);
3335 }
3336
3337 fn observe_frontier(&mut self, write_frontier: &Antichain<T>) {
3339 if self.write_frontier == *write_frontier {
3340 return; }
3342
3343 let retraction = self.write_frontier_row();
3344 self.write_frontier.clone_from(write_frontier);
3345 let insertion = self.write_frontier_row();
3346
3347 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3348 self.send(IntrospectionType::ReplicaFrontiers, updates);
3349 }
3350
3351 fn write_frontier_row(&self) -> Row {
3353 let write_frontier = self
3354 .write_frontier
3355 .as_option()
3356 .map_or(Datum::Null, |ts| ts.clone().into());
3357 Row::pack_slice(&[
3358 Datum::String(&self.collection_id.to_string()),
3359 Datum::String(&self.replica_id.to_string()),
3360 write_frontier,
3361 ])
3362 }
3363
3364 fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3365 let _ = self.introspection_tx.send((introspection_type, updates));
3368 }
3369}
3370
3371impl<T: ComputeControllerTimestamp> Drop for ReplicaCollectionIntrospection<T> {
3372 fn drop(&mut self) {
3373 let row = self.write_frontier_row();
3375 let updates = vec![(row, Diff::MINUS_ONE)];
3376 self.send(IntrospectionType::ReplicaFrontiers, updates);
3377 }
3378}