1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::ComputeInstanceId;
21use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
22use mz_compute_types::plan::render_plan::RenderPlan;
23use mz_compute_types::sinks::{
24 ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, MaterializedViewSinkConnection,
25};
26use mz_compute_types::sources::SourceInstanceDesc;
27use mz_controller_types::dyncfgs::{
28 ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, WALLCLOCK_LAG_RECORDING_INTERVAL,
29};
30use mz_dyncfg::ConfigSet;
31use mz_expr::RowSetFinishing;
32use mz_ore::cast::CastFrom;
33use mz_ore::channel::instrumented_unbounded_channel;
34use mz_ore::now::NowFn;
35use mz_ore::tracing::OpenTelemetryContext;
36use mz_ore::{soft_assert_or_log, soft_panic_or_log};
37use mz_persist_types::PersistLocation;
38use mz_repr::adt::timestamp::CheckedTimestamp;
39use mz_repr::refresh_schedule::RefreshSchedule;
40use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row};
41use mz_storage_client::controller::{IntrospectionType, WallclockLag, WallclockLagHistogramPeriod};
42use mz_storage_types::read_holds::{self, ReadHold};
43use mz_storage_types::read_policy::ReadPolicy;
44use serde::Serialize;
45use thiserror::Error;
46use timely::PartialOrder;
47use timely::progress::frontier::MutableAntichain;
48use timely::progress::{Antichain, ChangeBatch, Timestamp};
49use tokio::sync::mpsc::error::SendError;
50use tokio::sync::{mpsc, oneshot};
51use tracing::debug_span;
52use uuid::Uuid;
53
54use crate::controller::error::{
55 CollectionLookupError, CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
56};
57use crate::controller::replica::{ReplicaClient, ReplicaConfig};
58use crate::controller::{
59 ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, PeekNotification,
60 ReplicaId, StorageCollections,
61};
62use crate::logging::LogVariant;
63use crate::metrics::IntCounter;
64use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
65use crate::protocol::command::{
66 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
67};
68use crate::protocol::history::ComputeCommandHistory;
69use crate::protocol::response::{
70 ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StatusResponse,
71 SubscribeBatch, SubscribeResponse,
72};
73
74#[derive(Error, Debug)]
75#[error("replica exists already: {0}")]
76pub(super) struct ReplicaExists(pub ReplicaId);
77
78#[derive(Error, Debug)]
79#[error("replica does not exist: {0}")]
80pub(super) struct ReplicaMissing(pub ReplicaId);
81
82#[derive(Error, Debug)]
83pub(super) enum DataflowCreationError {
84 #[error("collection does not exist: {0}")]
85 CollectionMissing(GlobalId),
86 #[error("replica does not exist: {0}")]
87 ReplicaMissing(ReplicaId),
88 #[error("dataflow definition lacks an as_of value")]
89 MissingAsOf,
90 #[error("subscribe dataflow has an empty as_of")]
91 EmptyAsOfForSubscribe,
92 #[error("copy to dataflow has an empty as_of")]
93 EmptyAsOfForCopyTo,
94 #[error("no read hold provided for dataflow import: {0}")]
95 ReadHoldMissing(GlobalId),
96 #[error("insufficient read hold provided for dataflow import: {0}")]
97 ReadHoldInsufficient(GlobalId),
98}
99
100impl From<CollectionMissing> for DataflowCreationError {
101 fn from(error: CollectionMissing) -> Self {
102 Self::CollectionMissing(error.0)
103 }
104}
105
106#[derive(Error, Debug)]
107#[error("the instance has shut down")]
108pub(super) struct InstanceShutDown;
109
110#[derive(Error, Debug)]
112pub enum PeekError {
113 #[error("replica does not exist: {0}")]
115 ReplicaMissing(ReplicaId),
116 #[error("read hold ID does not match peeked collection: {0}")]
118 ReadHoldIdMismatch(GlobalId),
119 #[error("insufficient read hold provided: {0}")]
121 ReadHoldInsufficient(GlobalId),
122 #[error("the instance has shut down")]
124 InstanceShutDown,
125}
126
127impl From<InstanceShutDown> for PeekError {
128 fn from(_error: InstanceShutDown) -> Self {
129 Self::InstanceShutDown
130 }
131}
132
133#[derive(Error, Debug)]
134pub(super) enum ReadPolicyError {
135 #[error("collection does not exist: {0}")]
136 CollectionMissing(GlobalId),
137 #[error("collection is write-only: {0}")]
138 WriteOnlyCollection(GlobalId),
139}
140
141impl From<CollectionMissing> for ReadPolicyError {
142 fn from(error: CollectionMissing) -> Self {
143 Self::CollectionMissing(error.0)
144 }
145}
146
147type Command<T> = Box<dyn FnOnce(&mut Instance<T>) + Send>;
149
150#[derive(Clone, derivative::Derivative)]
152#[derivative(Debug)]
153pub struct Client<T: ComputeControllerTimestamp> {
154 command_tx: mpsc::UnboundedSender<Command<T>>,
156 #[derivative(Debug = "ignore")]
158 read_hold_tx: read_holds::ChangeTx<T>,
159}
160
161impl<T: ComputeControllerTimestamp> Client<T> {
162 pub(super) fn read_hold_tx(&self) -> read_holds::ChangeTx<T> {
163 Arc::clone(&self.read_hold_tx)
164 }
165
166 pub(super) fn call<F>(&self, f: F) -> Result<(), InstanceShutDown>
169 where
170 F: FnOnce(&mut Instance<T>) + Send + 'static,
171 {
172 let otel_ctx = OpenTelemetryContext::obtain();
173 self.command_tx
174 .send(Box::new(move |instance| {
175 let _span = debug_span!("instance::call").entered();
176 otel_ctx.attach_as_parent();
177
178 f(instance)
179 }))
180 .map_err(|_send_error| InstanceShutDown)
181 }
182
183 pub(super) async fn call_sync<F, R>(&self, f: F) -> Result<R, InstanceShutDown>
186 where
187 F: FnOnce(&mut Instance<T>) -> R + Send + 'static,
188 R: Send + 'static,
189 {
190 let (tx, rx) = oneshot::channel();
191 let otel_ctx = OpenTelemetryContext::obtain();
192 self.command_tx
193 .send(Box::new(move |instance| {
194 let _span = debug_span!("instance::call_sync").entered();
195 otel_ctx.attach_as_parent();
196 let result = f(instance);
197 let _ = tx.send(result);
198 }))
199 .map_err(|_send_error| InstanceShutDown)?;
200
201 rx.await.map_err(|_| InstanceShutDown)
202 }
203}
204
205impl<T> Client<T>
206where
207 T: ComputeControllerTimestamp,
208{
209 pub(super) fn spawn(
210 id: ComputeInstanceId,
211 build_info: &'static BuildInfo,
212 storage: StorageCollections<T>,
213 peek_stash_persist_location: PersistLocation,
214 arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
215 metrics: InstanceMetrics,
216 now: NowFn,
217 wallclock_lag: WallclockLagFn<T>,
218 dyncfg: Arc<ConfigSet>,
219 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
220 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
221 read_only: bool,
222 ) -> Self {
223 let (command_tx, command_rx) = mpsc::unbounded_channel();
224
225 let read_hold_tx: read_holds::ChangeTx<_> = {
226 let command_tx = command_tx.clone();
227 Arc::new(move |id, change: ChangeBatch<_>| {
228 let cmd: Command<_> = {
229 let change = change.clone();
230 Box::new(move |i| i.apply_read_hold_change(id, change.clone()))
231 };
232 command_tx.send(cmd).map_err(|_| SendError((id, change)))
233 })
234 };
235
236 mz_ore::task::spawn(
237 || format!("compute-instance-{id}"),
238 Instance::new(
239 build_info,
240 storage,
241 peek_stash_persist_location,
242 arranged_logs,
243 metrics,
244 now,
245 wallclock_lag,
246 dyncfg,
247 command_rx,
248 response_tx,
249 Arc::clone(&read_hold_tx),
250 introspection_tx,
251 read_only,
252 )
253 .run(),
254 );
255
256 Self {
257 command_tx,
258 read_hold_tx,
259 }
260 }
261
262 pub async fn acquire_read_holds_and_collection_write_frontiers(
265 &self,
266 ids: Vec<GlobalId>,
267 ) -> Result<Vec<(GlobalId, ReadHold<T>, Antichain<T>)>, CollectionLookupError> {
268 self.call_sync(move |i| {
269 let mut result = Vec::new();
270 for id in ids.into_iter() {
271 result.push((
272 id,
273 i.acquire_read_hold(id)?,
274 i.collection_write_frontier(id)?,
275 ));
276 }
277 Ok(result)
278 })
279 .await?
280 }
281
282 pub async fn peek(
284 &self,
285 peek_target: PeekTarget,
286 literal_constraints: Option<Vec<Row>>,
287 uuid: Uuid,
288 timestamp: T,
289 result_desc: RelationDesc,
290 finishing: RowSetFinishing,
291 map_filter_project: mz_expr::SafeMfpPlan,
292 target_read_hold: ReadHold<T>,
293 target_replica: Option<ReplicaId>,
294 peek_response_tx: oneshot::Sender<PeekResponse>,
295 ) -> Result<(), PeekError> {
296 self.call_sync(move |i| {
297 i.peek(
298 peek_target,
299 literal_constraints,
300 uuid,
301 timestamp,
302 result_desc,
303 finishing,
304 map_filter_project,
305 target_read_hold,
306 target_replica,
307 peek_response_tx,
308 )
309 })
310 .await?
311 }
312}
313
314pub(super) type ReplicaResponse<T> = (ReplicaId, u64, ComputeResponse<T>);
317
318pub(super) struct Instance<T: ComputeControllerTimestamp> {
320 build_info: &'static BuildInfo,
322 storage_collections: StorageCollections<T>,
324 initialized: bool,
326 read_only: bool,
331 workload_class: Option<String>,
335 replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
337 collections: BTreeMap<GlobalId, CollectionState<T>>,
345 log_sources: BTreeMap<LogVariant, GlobalId>,
347 peeks: BTreeMap<Uuid, PendingPeek<T>>,
356 subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>,
370 copy_tos: BTreeSet<GlobalId>,
378 history: ComputeCommandHistory<UIntGauge, T>,
380 command_rx: mpsc::UnboundedReceiver<Command<T>>,
382 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
384 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
386 metrics: InstanceMetrics,
388 dyncfg: Arc<ConfigSet>,
390
391 peek_stash_persist_location: PersistLocation,
393
394 now: NowFn,
396 wallclock_lag: WallclockLagFn<T>,
398 wallclock_lag_last_recorded: DateTime<Utc>,
400
401 read_hold_tx: read_holds::ChangeTx<T>,
406 replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
408 replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse<T>, IntCounter>,
410}
411
412impl<T: ComputeControllerTimestamp> Instance<T> {
413 fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, CollectionMissing> {
415 self.collections.get(&id).ok_or(CollectionMissing(id))
416 }
417
418 fn collection_mut(
420 &mut self,
421 id: GlobalId,
422 ) -> Result<&mut CollectionState<T>, CollectionMissing> {
423 self.collections.get_mut(&id).ok_or(CollectionMissing(id))
424 }
425
426 fn expect_collection(&self, id: GlobalId) -> &CollectionState<T> {
432 self.collections.get(&id).expect("collection must exist")
433 }
434
435 fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T> {
441 self.collections
442 .get_mut(&id)
443 .expect("collection must exist")
444 }
445
446 fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState<T>)> {
447 self.collections.iter().map(|(id, coll)| (*id, coll))
448 }
449
450 fn add_collection(
456 &mut self,
457 id: GlobalId,
458 as_of: Antichain<T>,
459 shared: SharedCollectionState<T>,
460 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
461 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
462 replica_input_read_holds: Vec<ReadHold<T>>,
463 write_only: bool,
464 storage_sink: bool,
465 initial_as_of: Option<Antichain<T>>,
466 refresh_schedule: Option<RefreshSchedule>,
467 ) {
468 let introspection = CollectionIntrospection::new(
470 id,
471 self.introspection_tx.clone(),
472 as_of.clone(),
473 storage_sink,
474 initial_as_of,
475 refresh_schedule,
476 );
477 let mut state = CollectionState::new(
478 id,
479 as_of.clone(),
480 shared,
481 storage_dependencies,
482 compute_dependencies,
483 Arc::clone(&self.read_hold_tx),
484 introspection,
485 );
486 if write_only {
488 state.read_policy = None;
489 }
490
491 if let Some(previous) = self.collections.insert(id, state) {
492 panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
493 }
494
495 for replica in self.replicas.values_mut() {
497 replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
498 }
499
500 self.report_dependency_updates(id, Diff::ONE);
502 }
503
504 fn remove_collection(&mut self, id: GlobalId) {
505 self.report_dependency_updates(id, Diff::MINUS_ONE);
507
508 for replica in self.replicas.values_mut() {
510 replica.remove_collection(id);
511 }
512
513 self.collections.remove(&id);
515 }
516
517 fn add_replica_state(
518 &mut self,
519 id: ReplicaId,
520 client: ReplicaClient<T>,
521 config: ReplicaConfig,
522 epoch: u64,
523 ) {
524 let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
525
526 let metrics = self.metrics.for_replica(id);
527 let mut replica = ReplicaState::new(
528 id,
529 client,
530 config,
531 metrics,
532 self.introspection_tx.clone(),
533 epoch,
534 );
535
536 for (collection_id, collection) in &self.collections {
538 if collection.log_collection && !log_ids.contains(collection_id) {
540 continue;
541 }
542
543 let as_of = if collection.log_collection {
544 Antichain::from_elem(T::minimum())
549 } else {
550 collection.read_frontier().to_owned()
551 };
552
553 let input_read_holds = collection.storage_dependencies.values().cloned().collect();
554 replica.add_collection(*collection_id, as_of, input_read_holds);
555 }
556
557 self.replicas.insert(id, replica);
558 }
559
560 fn deliver_response(&self, response: ComputeControllerResponse<T>) {
562 let _ = self.response_tx.send(response);
565 }
566
567 fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
569 let _ = self.introspection_tx.send((type_, updates));
572 }
573
574 fn replica_exists(&self, id: ReplicaId) -> bool {
576 self.replicas.contains_key(&id)
577 }
578
579 fn peeks_targeting(
581 &self,
582 replica_id: ReplicaId,
583 ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)> {
584 self.peeks.iter().filter_map(move |(uuid, peek)| {
585 if peek.target_replica == Some(replica_id) {
586 Some((*uuid, peek))
587 } else {
588 None
589 }
590 })
591 }
592
593 fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
595 self.subscribes.iter().filter_map(move |(id, subscribe)| {
596 let targeting = subscribe.target_replica == Some(replica_id);
597 targeting.then_some(*id)
598 })
599 }
600
601 fn update_frontier_introspection(&mut self) {
610 for collection in self.collections.values_mut() {
611 collection
612 .introspection
613 .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
614 }
615
616 for replica in self.replicas.values_mut() {
617 for collection in replica.collections.values_mut() {
618 collection
619 .introspection
620 .observe_frontier(&collection.write_frontier);
621 }
622 }
623 }
624
625 fn refresh_state_metrics(&self) {
634 let unscheduled_collections_count =
635 self.collections.values().filter(|c| !c.scheduled).count();
636 let connected_replica_count = self
637 .replicas
638 .values()
639 .filter(|r| r.client.is_connected())
640 .count();
641
642 self.metrics
643 .replica_count
644 .set(u64::cast_from(self.replicas.len()));
645 self.metrics
646 .collection_count
647 .set(u64::cast_from(self.collections.len()));
648 self.metrics
649 .collection_unscheduled_count
650 .set(u64::cast_from(unscheduled_collections_count));
651 self.metrics
652 .peek_count
653 .set(u64::cast_from(self.peeks.len()));
654 self.metrics
655 .subscribe_count
656 .set(u64::cast_from(self.subscribes.len()));
657 self.metrics
658 .copy_to_count
659 .set(u64::cast_from(self.copy_tos.len()));
660 self.metrics
661 .connected_replica_count
662 .set(u64::cast_from(connected_replica_count));
663 }
664
665 fn refresh_wallclock_lag(&mut self) {
684 let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
685 Some(ts) => (self.wallclock_lag)(ts.clone()),
686 None => Duration::ZERO,
687 };
688
689 let now_ms = (self.now)();
690 let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
691 let histogram_labels = match &self.workload_class {
692 Some(wc) => [("workload_class", wc.clone())].into(),
693 None => BTreeMap::new(),
694 };
695
696 let mut unreadable_collections = BTreeSet::new();
700 for (id, collection) in &mut self.collections {
701 let read_frontier = match self.storage_collections.collection_frontiers(*id) {
703 Ok(f) => f.read_capabilities,
704 Err(_) => collection.read_frontier(),
705 };
706 let write_frontier = collection.write_frontier();
707 let collection_unreadable = PartialOrder::less_equal(&write_frontier, &read_frontier);
708 if collection_unreadable {
709 unreadable_collections.insert(id);
710 }
711
712 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
713 let bucket = if collection_unreadable {
714 WallclockLag::Undefined
715 } else {
716 let lag = frontier_lag(&write_frontier);
717 let lag = lag.as_secs().next_power_of_two();
718 WallclockLag::Seconds(lag)
719 };
720
721 let key = (histogram_period, bucket, histogram_labels.clone());
722 *stash.entry(key).or_default() += Diff::ONE;
723 }
724 }
725
726 for replica in self.replicas.values_mut() {
728 for (id, collection) in &mut replica.collections {
729 let lag = if unreadable_collections.contains(&id) {
730 WallclockLag::Undefined
731 } else {
732 let lag = frontier_lag(&collection.write_frontier);
733 WallclockLag::Seconds(lag.as_secs())
734 };
735
736 if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
737 *wallclock_lag_max = (*wallclock_lag_max).max(lag);
738 }
739
740 if let Some(metrics) = &mut collection.metrics {
741 let secs = lag.unwrap_seconds_or(u64::MAX);
744 metrics.wallclock_lag.observe(secs);
745 };
746 }
747 }
748
749 self.maybe_record_wallclock_lag();
751 }
752
753 fn maybe_record_wallclock_lag(&mut self) {
761 if self.read_only {
762 return;
763 }
764
765 let duration_trunc = |datetime: DateTime<_>, interval| {
766 let td = TimeDelta::from_std(interval).ok()?;
767 datetime.duration_trunc(td).ok()
768 };
769
770 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
771 let now_dt = mz_ore::now::to_datetime((self.now)());
772 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
773 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
774 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
775 duration_trunc(now_dt, *default).unwrap()
776 });
777 if now_trunc <= self.wallclock_lag_last_recorded {
778 return;
779 }
780
781 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
782
783 let mut history_updates = Vec::new();
784 for (replica_id, replica) in &mut self.replicas {
785 for (collection_id, collection) in &mut replica.collections {
786 let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
787 continue;
788 };
789
790 let max_lag = std::mem::replace(wallclock_lag_max, WallclockLag::MIN);
791 let row = Row::pack_slice(&[
792 Datum::String(&collection_id.to_string()),
793 Datum::String(&replica_id.to_string()),
794 max_lag.into_interval_datum(),
795 Datum::TimestampTz(now_ts),
796 ]);
797 history_updates.push((row, Diff::ONE));
798 }
799 }
800 if !history_updates.is_empty() {
801 self.deliver_introspection_updates(
802 IntrospectionType::WallclockLagHistory,
803 history_updates,
804 );
805 }
806
807 let mut histogram_updates = Vec::new();
808 let mut row_buf = Row::default();
809 for (collection_id, collection) in &mut self.collections {
810 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
811 continue;
812 };
813
814 for ((period, lag, labels), count) in std::mem::take(stash) {
815 let mut packer = row_buf.packer();
816 packer.extend([
817 Datum::TimestampTz(period.start),
818 Datum::TimestampTz(period.end),
819 Datum::String(&collection_id.to_string()),
820 lag.into_uint64_datum(),
821 ]);
822 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
823 packer.push_dict(labels);
824
825 histogram_updates.push((row_buf.clone(), count));
826 }
827 }
828 if !histogram_updates.is_empty() {
829 self.deliver_introspection_updates(
830 IntrospectionType::WallclockLagHistogram,
831 histogram_updates,
832 );
833 }
834
835 self.wallclock_lag_last_recorded = now_trunc;
836 }
837
838 fn report_dependency_updates(&self, id: GlobalId, diff: Diff) {
844 let collection = self.expect_collection(id);
845 let dependencies = collection.dependency_ids();
846
847 let updates = dependencies
848 .map(|dependency_id| {
849 let row = Row::pack_slice(&[
850 Datum::String(&id.to_string()),
851 Datum::String(&dependency_id.to_string()),
852 ]);
853 (row, diff)
854 })
855 .collect();
856
857 self.deliver_introspection_updates(IntrospectionType::ComputeDependencies, updates);
858 }
859
860 #[mz_ore::instrument(level = "debug")]
866 pub fn collection_hydrated(
867 &self,
868 collection_id: GlobalId,
869 ) -> Result<bool, CollectionLookupError> {
870 if self.replicas.is_empty() {
871 return Ok(true);
872 }
873
874 for replica_state in self.replicas.values() {
875 let collection_state = replica_state
876 .collections
877 .get(&collection_id)
878 .ok_or(CollectionLookupError::CollectionMissing(collection_id))?;
879
880 if collection_state.hydrated() {
881 return Ok(true);
882 }
883 }
884
885 Ok(false)
886 }
887
888 #[mz_ore::instrument(level = "debug")]
894 pub fn collections_hydrated_on_replicas(
895 &self,
896 target_replica_ids: Option<Vec<ReplicaId>>,
897 exclude_collections: &BTreeSet<GlobalId>,
898 ) -> Result<bool, HydrationCheckBadTarget> {
899 if self.replicas.is_empty() {
900 return Ok(true);
901 }
902 let mut all_hydrated = true;
903 let target_replicas: BTreeSet<ReplicaId> = self
904 .replicas
905 .keys()
906 .filter_map(|id| match target_replica_ids {
907 None => Some(id.clone()),
908 Some(ref ids) if ids.contains(id) => Some(id.clone()),
909 Some(_) => None,
910 })
911 .collect();
912 if let Some(targets) = target_replica_ids {
913 if target_replicas.is_empty() {
914 return Err(HydrationCheckBadTarget(targets));
915 }
916 }
917
918 for (id, _collection) in self.collections_iter() {
919 if id.is_transient() || exclude_collections.contains(&id) {
920 continue;
921 }
922
923 let mut collection_hydrated = false;
924 for replica_state in self.replicas.values() {
925 if !target_replicas.contains(&replica_state.id) {
926 continue;
927 }
928 let collection_state = replica_state
929 .collections
930 .get(&id)
931 .expect("missing collection state");
932
933 if collection_state.hydrated() {
934 collection_hydrated = true;
935 break;
936 }
937 }
938
939 if !collection_hydrated {
940 tracing::info!("collection {id} is not hydrated on any replica");
941 all_hydrated = false;
942 }
945 }
946
947 Ok(all_hydrated)
948 }
949
950 #[mz_ore::instrument(level = "debug")]
956 pub fn collections_hydrated(&self, exclude_collections: &BTreeSet<GlobalId>) -> bool {
957 self.collections_hydrated_on_replicas(None, exclude_collections)
958 .expect("Cannot error if target_replica_ids is None")
959 }
960
961 fn cleanup_collections(&mut self) {
977 let to_remove: Vec<_> = self
978 .collections_iter()
979 .filter(|(id, collection)| {
980 collection.dropped
981 && collection.shared.lock_read_capabilities(|c| c.is_empty())
982 && self
983 .replicas
984 .values()
985 .all(|r| r.collection_frontiers_empty(*id))
986 })
987 .map(|(id, _collection)| id)
988 .collect();
989
990 for id in to_remove {
991 self.remove_collection(id);
992 }
993 }
994
995 #[mz_ore::instrument(level = "debug")]
999 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1000 let Self {
1007 build_info: _,
1008 storage_collections: _,
1009 peek_stash_persist_location: _,
1010 initialized,
1011 read_only,
1012 workload_class,
1013 replicas,
1014 collections,
1015 log_sources: _,
1016 peeks,
1017 subscribes,
1018 copy_tos,
1019 history: _,
1020 command_rx: _,
1021 response_tx: _,
1022 introspection_tx: _,
1023 metrics: _,
1024 dyncfg: _,
1025 now: _,
1026 wallclock_lag: _,
1027 wallclock_lag_last_recorded,
1028 read_hold_tx: _,
1029 replica_tx: _,
1030 replica_rx: _,
1031 } = self;
1032
1033 fn field(
1034 key: &str,
1035 value: impl Serialize,
1036 ) -> Result<(String, serde_json::Value), anyhow::Error> {
1037 let value = serde_json::to_value(value)?;
1038 Ok((key.to_string(), value))
1039 }
1040
1041 let replicas: BTreeMap<_, _> = replicas
1042 .iter()
1043 .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
1044 .collect::<Result<_, anyhow::Error>>()?;
1045 let collections: BTreeMap<_, _> = collections
1046 .iter()
1047 .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
1048 .collect();
1049 let peeks: BTreeMap<_, _> = peeks
1050 .iter()
1051 .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
1052 .collect();
1053 let subscribes: BTreeMap<_, _> = subscribes
1054 .iter()
1055 .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
1056 .collect();
1057 let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
1058 let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
1059
1060 let map = serde_json::Map::from_iter([
1061 field("initialized", initialized)?,
1062 field("read_only", read_only)?,
1063 field("workload_class", workload_class)?,
1064 field("replicas", replicas)?,
1065 field("collections", collections)?,
1066 field("peeks", peeks)?,
1067 field("subscribes", subscribes)?,
1068 field("copy_tos", copy_tos)?,
1069 field("wallclock_lag_last_recorded", wallclock_lag_last_recorded)?,
1070 ]);
1071 Ok(serde_json::Value::Object(map))
1072 }
1073
1074 fn collection_write_frontier(&self, id: GlobalId) -> Result<Antichain<T>, CollectionMissing> {
1076 Ok(self.collection(id)?.write_frontier())
1077 }
1078}
1079
1080impl<T> Instance<T>
1081where
1082 T: ComputeControllerTimestamp,
1083{
1084 fn new(
1085 build_info: &'static BuildInfo,
1086 storage: StorageCollections<T>,
1087 peek_stash_persist_location: PersistLocation,
1088 arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
1089 metrics: InstanceMetrics,
1090 now: NowFn,
1091 wallclock_lag: WallclockLagFn<T>,
1092 dyncfg: Arc<ConfigSet>,
1093 command_rx: mpsc::UnboundedReceiver<Command<T>>,
1094 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
1095 read_hold_tx: read_holds::ChangeTx<T>,
1096 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
1097 read_only: bool,
1098 ) -> Self {
1099 let mut collections = BTreeMap::new();
1100 let mut log_sources = BTreeMap::new();
1101 for (log, id, shared) in arranged_logs {
1102 let collection = CollectionState::new_log_collection(
1103 id,
1104 shared,
1105 Arc::clone(&read_hold_tx),
1106 introspection_tx.clone(),
1107 );
1108 collections.insert(id, collection);
1109 log_sources.insert(log, id);
1110 }
1111
1112 let history = ComputeCommandHistory::new(metrics.for_history());
1113
1114 let send_count = metrics.response_send_count.clone();
1115 let recv_count = metrics.response_recv_count.clone();
1116 let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
1117
1118 let now_dt = mz_ore::now::to_datetime(now());
1119
1120 Self {
1121 build_info,
1122 storage_collections: storage,
1123 peek_stash_persist_location,
1124 initialized: false,
1125 read_only,
1126 workload_class: None,
1127 replicas: Default::default(),
1128 collections,
1129 log_sources,
1130 peeks: Default::default(),
1131 subscribes: Default::default(),
1132 copy_tos: Default::default(),
1133 history,
1134 command_rx,
1135 response_tx,
1136 introspection_tx,
1137 metrics,
1138 dyncfg,
1139 now,
1140 wallclock_lag,
1141 wallclock_lag_last_recorded: now_dt,
1142 read_hold_tx,
1143 replica_tx,
1144 replica_rx,
1145 }
1146 }
1147
1148 async fn run(mut self) {
1149 self.send(ComputeCommand::Hello {
1150 nonce: Uuid::default(),
1153 });
1154
1155 let instance_config = InstanceConfig {
1156 peek_stash_persist_location: self.peek_stash_persist_location.clone(),
1157 logging: Default::default(),
1160 expiration_offset: Default::default(),
1161 };
1162
1163 self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
1164
1165 loop {
1166 tokio::select! {
1167 command = self.command_rx.recv() => match command {
1168 Some(cmd) => cmd(&mut self),
1169 None => break,
1170 },
1171 response = self.replica_rx.recv() => match response {
1172 Some(response) => self.handle_response(response),
1173 None => unreachable!("self owns a sender side of the channel"),
1174 }
1175 }
1176 }
1177 }
1178
1179 #[mz_ore::instrument(level = "debug")]
1181 pub fn update_configuration(&mut self, config_params: ComputeParameters) {
1182 if let Some(workload_class) = &config_params.workload_class {
1183 self.workload_class = workload_class.clone();
1184 }
1185
1186 let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
1187 self.send(command);
1188 }
1189
1190 #[mz_ore::instrument(level = "debug")]
1195 pub fn initialization_complete(&mut self) {
1196 if !self.initialized {
1198 self.send(ComputeCommand::InitializationComplete);
1199 self.initialized = true;
1200 }
1201 }
1202
1203 #[mz_ore::instrument(level = "debug")]
1207 pub fn allow_writes(&mut self, collection_id: GlobalId) -> Result<(), CollectionMissing> {
1208 let collection = self.collection_mut(collection_id)?;
1209
1210 if !collection.read_only {
1212 return Ok(());
1213 }
1214
1215 let as_of = collection.read_frontier();
1217
1218 if as_of.is_empty() {
1221 return Ok(());
1222 }
1223
1224 collection.read_only = false;
1225 self.send(ComputeCommand::AllowWrites(collection_id));
1226
1227 Ok(())
1228 }
1229
1230 #[mz_ore::instrument(level = "debug")]
1241 pub fn shutdown(&mut self) {
1242 let (_tx, rx) = mpsc::unbounded_channel();
1244 let mut command_rx = std::mem::replace(&mut self.command_rx, rx);
1245
1246 while let Ok(cmd) = command_rx.try_recv() {
1252 cmd(self);
1253 }
1254
1255 self.cleanup_collections();
1257
1258 let stray_replicas: Vec<_> = self.replicas.keys().collect();
1259 soft_assert_or_log!(
1260 stray_replicas.is_empty(),
1261 "dropped instance still has provisioned replicas: {stray_replicas:?}",
1262 );
1263
1264 let collections = self.collections.iter();
1265 let stray_collections: Vec<_> = collections
1266 .filter(|(_, c)| !c.log_collection)
1267 .map(|(id, _)| id)
1268 .collect();
1269 soft_assert_or_log!(
1270 stray_collections.is_empty(),
1271 "dropped instance still has installed collections: {stray_collections:?}",
1272 );
1273 }
1274
1275 #[mz_ore::instrument(level = "debug")]
1277 fn send(&mut self, cmd: ComputeCommand<T>) {
1278 self.history.push(cmd.clone());
1280
1281 for replica in self.replicas.values_mut() {
1283 let _ = replica.client.send(cmd.clone());
1285 }
1286 }
1287
1288 #[mz_ore::instrument(level = "debug")]
1290 pub fn add_replica(
1291 &mut self,
1292 id: ReplicaId,
1293 mut config: ReplicaConfig,
1294 epoch: Option<u64>,
1295 ) -> Result<(), ReplicaExists> {
1296 if self.replica_exists(id) {
1297 return Err(ReplicaExists(id));
1298 }
1299
1300 config.logging.index_logs = self.log_sources.clone();
1301
1302 let epoch = epoch.unwrap_or(1);
1303 let metrics = self.metrics.for_replica(id);
1304 let client = ReplicaClient::spawn(
1305 id,
1306 self.build_info,
1307 config.clone(),
1308 epoch,
1309 metrics.clone(),
1310 Arc::clone(&self.dyncfg),
1311 self.replica_tx.clone(),
1312 );
1313
1314 self.history.reduce();
1316
1317 self.history.update_source_uppers(&self.storage_collections);
1319
1320 for command in self.history.iter() {
1322 if client.send(command.clone()).is_err() {
1323 tracing::warn!("Replica {:?} connection terminated during hydration", id);
1326 break;
1327 }
1328 }
1329
1330 self.add_replica_state(id, client, config, epoch);
1332
1333 Ok(())
1334 }
1335
1336 #[mz_ore::instrument(level = "debug")]
1338 pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1339 self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1340
1341 let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1345 for subscribe_id in to_drop {
1346 let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1347 let response = ComputeControllerResponse::SubscribeResponse(
1348 subscribe_id,
1349 SubscribeBatch {
1350 lower: subscribe.frontier.clone(),
1351 upper: subscribe.frontier,
1352 updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1353 },
1354 );
1355 self.deliver_response(response);
1356 }
1357
1358 let mut peek_responses = Vec::new();
1363 let mut to_drop = Vec::new();
1364 for (uuid, peek) in self.peeks_targeting(id) {
1365 peek_responses.push(ComputeControllerResponse::PeekNotification(
1366 uuid,
1367 PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1368 peek.otel_ctx.clone(),
1369 ));
1370 to_drop.push(uuid);
1371 }
1372 for response in peek_responses {
1373 self.deliver_response(response);
1374 }
1375 for uuid in to_drop {
1376 let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1377 self.finish_peek(uuid, response);
1378 }
1379
1380 self.forward_implied_capabilities();
1383
1384 Ok(())
1385 }
1386
1387 fn rehydrate_replica(&mut self, id: ReplicaId) {
1393 let config = self.replicas[&id].config.clone();
1394 let epoch = self.replicas[&id].epoch + 1;
1395
1396 self.remove_replica(id).expect("replica must exist");
1397 let result = self.add_replica(id, config, Some(epoch));
1398
1399 match result {
1400 Ok(()) => (),
1401 Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1402 }
1403 }
1404
1405 fn rehydrate_failed_replicas(&mut self) {
1407 let replicas = self.replicas.iter();
1408 let failed_replicas: Vec<_> = replicas
1409 .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1410 .collect();
1411
1412 for replica_id in failed_replicas {
1413 self.rehydrate_replica(replica_id);
1414 }
1415 }
1416
1417 #[mz_ore::instrument(level = "debug")]
1426 pub fn create_dataflow(
1427 &mut self,
1428 dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1429 import_read_holds: Vec<ReadHold<T>>,
1430 subscribe_target_replica: Option<ReplicaId>,
1431 mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState<T>>,
1432 ) -> Result<(), DataflowCreationError> {
1433 use DataflowCreationError::*;
1434
1435 if let Some(replica_id) = subscribe_target_replica {
1436 if !self.replica_exists(replica_id) {
1437 return Err(ReplicaMissing(replica_id));
1438 }
1439 }
1440
1441 let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1443 if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1444 return Err(EmptyAsOfForSubscribe);
1445 }
1446 if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1447 return Err(EmptyAsOfForCopyTo);
1448 }
1449
1450 let mut storage_dependencies = BTreeMap::new();
1452 let mut compute_dependencies = BTreeMap::new();
1453
1454 let mut replica_input_read_holds = Vec::new();
1459
1460 let mut import_read_holds: BTreeMap<_, _> =
1461 import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1462
1463 for &id in dataflow.source_imports.keys() {
1464 let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1465 replica_input_read_holds.push(read_hold.clone());
1466
1467 read_hold
1468 .try_downgrade(as_of.clone())
1469 .map_err(|_| ReadHoldInsufficient(id))?;
1470 storage_dependencies.insert(id, read_hold);
1471 }
1472
1473 for &id in dataflow.index_imports.keys() {
1474 let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1475 read_hold
1476 .try_downgrade(as_of.clone())
1477 .map_err(|_| ReadHoldInsufficient(id))?;
1478 compute_dependencies.insert(id, read_hold);
1479 }
1480
1481 if as_of.is_empty() {
1484 replica_input_read_holds = Default::default();
1485 }
1486
1487 for export_id in dataflow.export_ids() {
1489 let shared = shared_collection_state
1490 .remove(&export_id)
1491 .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1492 let write_only = dataflow.sink_exports.contains_key(&export_id);
1493 let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1494
1495 self.add_collection(
1496 export_id,
1497 as_of.clone(),
1498 shared,
1499 storage_dependencies.clone(),
1500 compute_dependencies.clone(),
1501 replica_input_read_holds.clone(),
1502 write_only,
1503 storage_sink,
1504 dataflow.initial_storage_as_of.clone(),
1505 dataflow.refresh_schedule.clone(),
1506 );
1507
1508 if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1511 self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1512 }
1513 }
1514
1515 for subscribe_id in dataflow.subscribe_ids() {
1517 self.subscribes
1518 .insert(subscribe_id, ActiveSubscribe::new(subscribe_target_replica));
1519 }
1520
1521 for copy_to_id in dataflow.copy_to_ids() {
1523 self.copy_tos.insert(copy_to_id);
1524 }
1525
1526 let mut source_imports = BTreeMap::new();
1529 for (id, (si, monotonic, _upper)) in dataflow.source_imports {
1530 let frontiers = self
1531 .storage_collections
1532 .collection_frontiers(id)
1533 .expect("collection exists");
1534
1535 let collection_metadata = self
1536 .storage_collections
1537 .collection_metadata(id)
1538 .expect("we have a read hold on this collection");
1539
1540 let desc = SourceInstanceDesc {
1541 storage_metadata: collection_metadata.clone(),
1542 arguments: si.arguments,
1543 typ: si.typ.clone(),
1544 };
1545 source_imports.insert(id, (desc, monotonic, frontiers.write_frontier));
1546 }
1547
1548 let mut sink_exports = BTreeMap::new();
1549 for (id, se) in dataflow.sink_exports {
1550 let connection = match se.connection {
1551 ComputeSinkConnection::MaterializedView(conn) => {
1552 let metadata = self
1553 .storage_collections
1554 .collection_metadata(id)
1555 .map_err(|_| CollectionMissing(id))?
1556 .clone();
1557 let conn = MaterializedViewSinkConnection {
1558 value_desc: conn.value_desc,
1559 storage_metadata: metadata,
1560 };
1561 ComputeSinkConnection::MaterializedView(conn)
1562 }
1563 ComputeSinkConnection::ContinualTask(conn) => {
1564 let metadata = self
1565 .storage_collections
1566 .collection_metadata(id)
1567 .map_err(|_| DataflowCreationError::CollectionMissing(id))?
1568 .clone();
1569 let conn = ContinualTaskConnection {
1570 input_id: conn.input_id,
1571 storage_metadata: metadata,
1572 };
1573 ComputeSinkConnection::ContinualTask(conn)
1574 }
1575 ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1576 ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1577 ComputeSinkConnection::CopyToS3Oneshot(conn)
1578 }
1579 };
1580 let desc = ComputeSinkDesc {
1581 from: se.from,
1582 from_desc: se.from_desc,
1583 connection,
1584 with_snapshot: se.with_snapshot,
1585 up_to: se.up_to,
1586 non_null_assertions: se.non_null_assertions,
1587 refresh_schedule: se.refresh_schedule,
1588 };
1589 sink_exports.insert(id, desc);
1590 }
1591
1592 let objects_to_build = dataflow
1594 .objects_to_build
1595 .into_iter()
1596 .map(|object| BuildDesc {
1597 id: object.id,
1598 plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1599 })
1600 .collect();
1601
1602 let augmented_dataflow = DataflowDescription {
1603 source_imports,
1604 sink_exports,
1605 objects_to_build,
1606 index_imports: dataflow.index_imports,
1608 index_exports: dataflow.index_exports,
1609 as_of: dataflow.as_of.clone(),
1610 until: dataflow.until,
1611 initial_storage_as_of: dataflow.initial_storage_as_of,
1612 refresh_schedule: dataflow.refresh_schedule,
1613 debug_name: dataflow.debug_name,
1614 time_dependence: dataflow.time_dependence,
1615 };
1616
1617 if augmented_dataflow.is_transient() {
1618 tracing::debug!(
1619 name = %augmented_dataflow.debug_name,
1620 import_ids = %augmented_dataflow.display_import_ids(),
1621 export_ids = %augmented_dataflow.display_export_ids(),
1622 as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1623 until = ?augmented_dataflow.until.elements(),
1624 "creating dataflow",
1625 );
1626 } else {
1627 tracing::info!(
1628 name = %augmented_dataflow.debug_name,
1629 import_ids = %augmented_dataflow.display_import_ids(),
1630 export_ids = %augmented_dataflow.display_export_ids(),
1631 as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1632 until = ?augmented_dataflow.until.elements(),
1633 "creating dataflow",
1634 );
1635 }
1636
1637 if as_of.is_empty() {
1640 tracing::info!(
1641 name = %augmented_dataflow.debug_name,
1642 "not sending `CreateDataflow`, because of empty `as_of`",
1643 );
1644 } else {
1645 let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1646 let dataflow = Box::new(augmented_dataflow);
1647 self.send(ComputeCommand::CreateDataflow(dataflow));
1648
1649 for id in collections {
1650 self.maybe_schedule_collection(id);
1651 }
1652 }
1653
1654 Ok(())
1655 }
1656
1657 fn maybe_schedule_collection(&mut self, id: GlobalId) {
1663 let collection = self.expect_collection(id);
1664
1665 if collection.scheduled {
1667 return;
1668 }
1669
1670 let as_of = collection.read_frontier();
1671
1672 if as_of.is_empty() {
1675 return;
1676 }
1677
1678 let ready = if id.is_transient() {
1679 true
1685 } else {
1686 let not_self_dep = |x: &GlobalId| *x != id;
1692
1693 let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1698 let compute_frontiers = compute_deps.map(|id| {
1699 let dep = &self.expect_collection(id);
1700 dep.write_frontier()
1701 });
1702
1703 let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1704 let storage_frontiers = self
1705 .storage_collections
1706 .collections_frontiers(storage_deps.collect())
1707 .expect("must exist");
1708 let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1709
1710 let ready = compute_frontiers
1711 .chain(storage_frontiers)
1712 .all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1713
1714 ready
1715 };
1716
1717 if ready {
1718 self.send(ComputeCommand::Schedule(id));
1719 let collection = self.expect_collection_mut(id);
1720 collection.scheduled = true;
1721 }
1722 }
1723
1724 fn schedule_collections(&mut self) {
1726 let ids: Vec<_> = self.collections.keys().copied().collect();
1727 for id in ids {
1728 self.maybe_schedule_collection(id);
1729 }
1730 }
1731
1732 #[mz_ore::instrument(level = "debug")]
1735 pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1736 for id in &ids {
1737 let collection = self.collection_mut(*id)?;
1738
1739 collection.dropped = true;
1741
1742 collection.implied_read_hold.release();
1745 collection.warmup_read_hold.release();
1746
1747 self.subscribes.remove(id);
1750 self.copy_tos.remove(id);
1753 }
1754
1755 Ok(())
1756 }
1757
1758 #[mz_ore::instrument(level = "debug")]
1760 pub fn peek(
1761 &mut self,
1762 peek_target: PeekTarget,
1763 literal_constraints: Option<Vec<Row>>,
1764 uuid: Uuid,
1765 timestamp: T,
1766 result_desc: RelationDesc,
1767 finishing: RowSetFinishing,
1768 map_filter_project: mz_expr::SafeMfpPlan,
1769 mut read_hold: ReadHold<T>,
1770 target_replica: Option<ReplicaId>,
1771 peek_response_tx: oneshot::Sender<PeekResponse>,
1772 ) -> Result<(), PeekError> {
1773 use PeekError::*;
1774
1775 let target_id = peek_target.id();
1776
1777 if read_hold.id() != target_id {
1779 return Err(ReadHoldIdMismatch(read_hold.id()));
1780 }
1781 read_hold
1782 .try_downgrade(Antichain::from_elem(timestamp.clone()))
1783 .map_err(|_| ReadHoldInsufficient(target_id))?;
1784
1785 if let Some(target) = target_replica {
1786 if !self.replica_exists(target) {
1787 return Err(ReplicaMissing(target));
1788 }
1789 }
1790
1791 let otel_ctx = OpenTelemetryContext::obtain();
1792
1793 self.peeks.insert(
1794 uuid,
1795 PendingPeek {
1796 target_replica,
1797 otel_ctx: otel_ctx.clone(),
1799 requested_at: Instant::now(),
1800 read_hold,
1801 peek_response_tx,
1802 limit: finishing.limit.map(usize::cast_from),
1803 offset: finishing.offset,
1804 },
1805 );
1806
1807 let peek = Peek {
1808 literal_constraints,
1809 uuid,
1810 timestamp,
1811 finishing,
1812 map_filter_project,
1813 otel_ctx,
1816 target: peek_target,
1817 result_desc,
1818 };
1819 self.send(ComputeCommand::Peek(Box::new(peek)));
1820
1821 Ok(())
1822 }
1823
1824 #[mz_ore::instrument(level = "debug")]
1826 pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1827 let Some(peek) = self.peeks.get_mut(&uuid) else {
1828 tracing::warn!("did not find pending peek for {uuid}");
1829 return;
1830 };
1831
1832 let duration = peek.requested_at.elapsed();
1833 self.metrics
1834 .observe_peek_response(&PeekResponse::Canceled, duration);
1835
1836 let otel_ctx = peek.otel_ctx.clone();
1838 otel_ctx.attach_as_parent();
1839
1840 self.deliver_response(ComputeControllerResponse::PeekNotification(
1841 uuid,
1842 PeekNotification::Canceled,
1843 otel_ctx,
1844 ));
1845
1846 self.finish_peek(uuid, reason);
1849 }
1850
1851 #[mz_ore::instrument(level = "debug")]
1863 pub fn set_read_policy(
1864 &mut self,
1865 policies: Vec<(GlobalId, ReadPolicy<T>)>,
1866 ) -> Result<(), ReadPolicyError> {
1867 for (id, _policy) in &policies {
1870 let collection = self.collection(*id)?;
1871 if collection.read_policy.is_none() {
1872 return Err(ReadPolicyError::WriteOnlyCollection(*id));
1873 }
1874 }
1875
1876 for (id, new_policy) in policies {
1877 let collection = self.expect_collection_mut(id);
1878 let new_since = new_policy.frontier(collection.write_frontier().borrow());
1879 let _ = collection.implied_read_hold.try_downgrade(new_since);
1880 collection.read_policy = Some(new_policy);
1881 }
1882
1883 Ok(())
1884 }
1885
1886 #[mz_ore::instrument(level = "debug")]
1894 fn maybe_update_global_write_frontier(&mut self, id: GlobalId, new_frontier: Antichain<T>) {
1895 let collection = self.expect_collection_mut(id);
1896
1897 let advanced = collection.shared.lock_write_frontier(|f| {
1898 let advanced = PartialOrder::less_than(f, &new_frontier);
1899 if advanced {
1900 f.clone_from(&new_frontier);
1901 }
1902 advanced
1903 });
1904
1905 if !advanced {
1906 return;
1907 }
1908
1909 let new_since = match &collection.read_policy {
1911 Some(read_policy) => {
1912 read_policy.frontier(new_frontier.borrow())
1915 }
1916 None => {
1917 Antichain::from_iter(
1926 new_frontier
1927 .iter()
1928 .map(|t| t.step_back().unwrap_or_else(T::minimum)),
1929 )
1930 }
1931 };
1932 let _ = collection.implied_read_hold.try_downgrade(new_since);
1933
1934 self.deliver_response(ComputeControllerResponse::FrontierUpper {
1936 id,
1937 upper: new_frontier,
1938 });
1939 }
1940
1941 fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch<T>) {
1943 let Some(collection) = self.collections.get_mut(&id) else {
1944 soft_panic_or_log!(
1945 "read hold change for absent collection (id={id}, changes={update:?})"
1946 );
1947 return;
1948 };
1949
1950 let new_since = collection.shared.lock_read_capabilities(|caps| {
1951 let read_frontier = caps.frontier();
1954 for (time, diff) in update.iter() {
1955 let count = caps.count_for(time) + diff;
1956 assert!(
1957 count >= 0,
1958 "invalid read capabilities update: negative capability \
1959 (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1960 );
1961 assert!(
1962 count == 0 || read_frontier.less_equal(time),
1963 "invalid read capabilities update: frontier regression \
1964 (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1965 );
1966 }
1967
1968 let changes = caps.update_iter(update.drain());
1971
1972 let changed = changes.count() > 0;
1973 changed.then(|| caps.frontier().to_owned())
1974 });
1975
1976 let Some(new_since) = new_since else {
1977 return; };
1979
1980 for read_hold in collection.compute_dependencies.values_mut() {
1982 read_hold
1983 .try_downgrade(new_since.clone())
1984 .expect("frontiers don't regress");
1985 }
1986 for read_hold in collection.storage_dependencies.values_mut() {
1987 read_hold
1988 .try_downgrade(new_since.clone())
1989 .expect("frontiers don't regress");
1990 }
1991
1992 self.send(ComputeCommand::AllowCompaction {
1994 id,
1995 frontier: new_since,
1996 });
1997 }
1998
1999 fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
2008 let Some(peek) = self.peeks.remove(&uuid) else {
2009 return;
2010 };
2011
2012 let _ = peek.peek_response_tx.send(response);
2014
2015 self.send(ComputeCommand::CancelPeek { uuid });
2018
2019 drop(peek.read_hold);
2020 }
2021
2022 fn handle_response(&mut self, (replica_id, epoch, response): ReplicaResponse<T>) {
2025 if self
2027 .replicas
2028 .get(&replica_id)
2029 .filter(|replica| replica.epoch == epoch)
2030 .is_none()
2031 {
2032 return;
2033 }
2034
2035 match response {
2038 ComputeResponse::Frontiers(id, frontiers) => {
2039 self.handle_frontiers_response(id, frontiers, replica_id);
2040 }
2041 ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
2042 self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
2043 }
2044 ComputeResponse::CopyToResponse(id, response) => {
2045 self.handle_copy_to_response(id, response, replica_id);
2046 }
2047 ComputeResponse::SubscribeResponse(id, response) => {
2048 self.handle_subscribe_response(id, response, replica_id);
2049 }
2050 ComputeResponse::Status(response) => {
2051 self.handle_status_response(response, replica_id);
2052 }
2053 }
2054 }
2055
2056 fn handle_frontiers_response(
2059 &mut self,
2060 id: GlobalId,
2061 frontiers: FrontiersResponse<T>,
2062 replica_id: ReplicaId,
2063 ) {
2064 if !self.collections.contains_key(&id) {
2065 soft_panic_or_log!(
2066 "frontiers update for an unknown collection \
2067 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
2068 );
2069 return;
2070 }
2071 let Some(replica) = self.replicas.get_mut(&replica_id) else {
2072 soft_panic_or_log!(
2073 "frontiers update for an unknown replica \
2074 (replica_id={replica_id}, frontiers={frontiers:?})"
2075 );
2076 return;
2077 };
2078 let Some(replica_collection) = replica.collections.get_mut(&id) else {
2079 soft_panic_or_log!(
2080 "frontiers update for an unknown replica collection \
2081 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
2082 );
2083 return;
2084 };
2085
2086 if let Some(new_frontier) = frontiers.input_frontier {
2087 replica_collection.update_input_frontier(new_frontier.clone());
2088 }
2089 if let Some(new_frontier) = frontiers.output_frontier {
2090 replica_collection.update_output_frontier(new_frontier.clone());
2091 }
2092 if let Some(new_frontier) = frontiers.write_frontier {
2093 replica_collection.update_write_frontier(new_frontier.clone());
2094 self.maybe_update_global_write_frontier(id, new_frontier);
2095 }
2096 }
2097
2098 #[mz_ore::instrument(level = "debug")]
2099 fn handle_peek_response(
2100 &mut self,
2101 uuid: Uuid,
2102 response: PeekResponse,
2103 otel_ctx: OpenTelemetryContext,
2104 replica_id: ReplicaId,
2105 ) {
2106 otel_ctx.attach_as_parent();
2107
2108 let Some(peek) = self.peeks.get(&uuid) else {
2111 return;
2112 };
2113
2114 let target_replica = peek.target_replica.unwrap_or(replica_id);
2116 if target_replica != replica_id {
2117 return;
2118 }
2119
2120 let duration = peek.requested_at.elapsed();
2121 self.metrics.observe_peek_response(&response, duration);
2122
2123 let notification = PeekNotification::new(&response, peek.offset, peek.limit);
2124 self.deliver_response(ComputeControllerResponse::PeekNotification(
2127 uuid,
2128 notification,
2129 otel_ctx,
2130 ));
2131
2132 self.finish_peek(uuid, response)
2133 }
2134
2135 fn handle_copy_to_response(
2136 &mut self,
2137 sink_id: GlobalId,
2138 response: CopyToResponse,
2139 replica_id: ReplicaId,
2140 ) {
2141 if !self.collections.contains_key(&sink_id) {
2142 soft_panic_or_log!(
2143 "received response for an unknown copy-to \
2144 (sink_id={sink_id}, replica_id={replica_id})",
2145 );
2146 return;
2147 }
2148 let Some(replica) = self.replicas.get_mut(&replica_id) else {
2149 soft_panic_or_log!("copy-to response for an unknown replica (replica_id={replica_id})");
2150 return;
2151 };
2152 let Some(replica_collection) = replica.collections.get_mut(&sink_id) else {
2153 soft_panic_or_log!(
2154 "copy-to response for an unknown replica collection \
2155 (sink_id={sink_id}, replica_id={replica_id})"
2156 );
2157 return;
2158 };
2159
2160 replica_collection.update_write_frontier(Antichain::new());
2164 replica_collection.update_input_frontier(Antichain::new());
2165 replica_collection.update_output_frontier(Antichain::new());
2166
2167 if !self.copy_tos.remove(&sink_id) {
2170 return;
2171 }
2172
2173 let result = match response {
2174 CopyToResponse::RowCount(count) => Ok(count),
2175 CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
2176 CopyToResponse::Dropped => {
2181 tracing::error!(
2182 %sink_id, %replica_id,
2183 "received `Dropped` response for a tracked copy to",
2184 );
2185 return;
2186 }
2187 };
2188
2189 self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2190 }
2191
2192 fn handle_subscribe_response(
2193 &mut self,
2194 subscribe_id: GlobalId,
2195 response: SubscribeResponse<T>,
2196 replica_id: ReplicaId,
2197 ) {
2198 if !self.collections.contains_key(&subscribe_id) {
2199 soft_panic_or_log!(
2200 "received response for an unknown subscribe \
2201 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2202 );
2203 return;
2204 }
2205 let Some(replica) = self.replicas.get_mut(&replica_id) else {
2206 soft_panic_or_log!(
2207 "subscribe response for an unknown replica (replica_id={replica_id})"
2208 );
2209 return;
2210 };
2211 let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2212 soft_panic_or_log!(
2213 "subscribe response for an unknown replica collection \
2214 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2215 );
2216 return;
2217 };
2218
2219 let write_frontier = match &response {
2223 SubscribeResponse::Batch(batch) => batch.upper.clone(),
2224 SubscribeResponse::DroppedAt(_) => Antichain::new(),
2225 };
2226
2227 replica_collection.update_write_frontier(write_frontier.clone());
2231 replica_collection.update_input_frontier(write_frontier.clone());
2232 replica_collection.update_output_frontier(write_frontier.clone());
2233
2234 let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2236 return;
2237 };
2238 let replica_targeted = subscribe.target_replica.unwrap_or(replica_id) == replica_id;
2239 if !replica_targeted {
2240 return;
2241 }
2242
2243 self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2249
2250 match response {
2251 SubscribeResponse::Batch(batch) => {
2252 let upper = batch.upper;
2253 let mut updates = batch.updates;
2254
2255 if PartialOrder::less_than(&subscribe.frontier, &upper) {
2258 let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2259
2260 if upper.is_empty() {
2261 self.subscribes.remove(&subscribe_id);
2263 } else {
2264 self.subscribes.insert(subscribe_id, subscribe);
2266 }
2267
2268 if let Ok(updates) = updates.as_mut() {
2269 updates.retain(|(time, _data, _diff)| lower.less_equal(time));
2270 }
2271 self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2272 subscribe_id,
2273 SubscribeBatch {
2274 lower,
2275 upper,
2276 updates,
2277 },
2278 ));
2279 }
2280 }
2281 SubscribeResponse::DroppedAt(frontier) => {
2282 tracing::error!(
2287 %subscribe_id,
2288 %replica_id,
2289 frontier = ?frontier.elements(),
2290 "received `DroppedAt` response for a tracked subscribe",
2291 );
2292 self.subscribes.remove(&subscribe_id);
2293 }
2294 }
2295 }
2296
2297 fn handle_status_response(&self, response: StatusResponse, _replica_id: ReplicaId) {
2298 match response {
2299 StatusResponse::Placeholder => {}
2300 }
2301 }
2302
2303 fn dependency_write_frontiers<'b>(
2305 &'b self,
2306 collection: &'b CollectionState<T>,
2307 ) -> impl Iterator<Item = Antichain<T>> + 'b {
2308 let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2309 let collection = self.collections.get(&dep_id);
2310 collection.map(|c| c.write_frontier())
2311 });
2312 let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2313 let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2314 frontiers.map(|f| f.write_frontier)
2315 });
2316
2317 compute_frontiers.chain(storage_frontiers)
2318 }
2319
2320 fn transitive_storage_dependency_write_frontiers<'b>(
2322 &'b self,
2323 collection: &'b CollectionState<T>,
2324 ) -> impl Iterator<Item = Antichain<T>> + 'b {
2325 let mut storage_ids: BTreeSet<_> = collection.storage_dependency_ids().collect();
2326 let mut todo: Vec<_> = collection.compute_dependency_ids().collect();
2327 let mut done = BTreeSet::new();
2328
2329 while let Some(id) = todo.pop() {
2330 if done.contains(&id) {
2331 continue;
2332 }
2333 if let Some(dep) = self.collections.get(&id) {
2334 storage_ids.extend(dep.storage_dependency_ids());
2335 todo.extend(dep.compute_dependency_ids())
2336 }
2337 done.insert(id);
2338 }
2339
2340 let storage_frontiers = storage_ids.into_iter().filter_map(|id| {
2341 let frontiers = self.storage_collections.collection_frontiers(id).ok();
2342 frontiers.map(|f| f.write_frontier)
2343 });
2344
2345 storage_frontiers
2346 }
2347
2348 fn downgrade_warmup_capabilities(&mut self) {
2361 let mut new_capabilities = BTreeMap::new();
2362 for (id, collection) in &self.collections {
2363 if collection.read_policy.is_none()
2367 && collection.shared.lock_write_frontier(|f| f.is_empty())
2368 {
2369 new_capabilities.insert(*id, Antichain::new());
2370 continue;
2371 }
2372
2373 let mut new_capability = Antichain::new();
2374 for frontier in self.dependency_write_frontiers(collection) {
2375 for time in frontier {
2376 new_capability.insert(time.step_back().unwrap_or(time));
2377 }
2378 }
2379
2380 new_capabilities.insert(*id, new_capability);
2381 }
2382
2383 for (id, new_capability) in new_capabilities {
2384 let collection = self.expect_collection_mut(id);
2385 let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2386 }
2387 }
2388
2389 fn forward_implied_capabilities(&mut self) {
2417 if !ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE.get(&self.dyncfg) {
2418 return;
2419 }
2420 if !self.replicas.is_empty() {
2421 return;
2422 }
2423
2424 let mut new_capabilities = BTreeMap::new();
2425 for (id, collection) in &self.collections {
2426 let Some(read_policy) = &collection.read_policy else {
2427 continue;
2429 };
2430
2431 let mut dep_frontier = Antichain::new();
2435 for frontier in self.transitive_storage_dependency_write_frontiers(collection) {
2436 dep_frontier.extend(frontier);
2437 }
2438
2439 let new_capability = read_policy.frontier(dep_frontier.borrow());
2440 if PartialOrder::less_than(collection.implied_read_hold.since(), &new_capability) {
2441 new_capabilities.insert(*id, new_capability);
2442 }
2443 }
2444
2445 for (id, new_capability) in new_capabilities {
2446 let collection = self.expect_collection_mut(id);
2447 let _ = collection.implied_read_hold.try_downgrade(new_capability);
2448 }
2449 }
2450
2451 fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
2456 let collection = self.collection(id)?;
2462 let since = collection.shared.lock_read_capabilities(|caps| {
2463 let since = caps.frontier().to_owned();
2464 caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
2465 since
2466 });
2467 let hold = ReadHold::new(id, since, Arc::clone(&self.read_hold_tx));
2468 Ok(hold)
2469 }
2470
2471 #[mz_ore::instrument(level = "debug")]
2477 pub fn maintain(&mut self) {
2478 self.rehydrate_failed_replicas();
2479 self.downgrade_warmup_capabilities();
2480 self.forward_implied_capabilities();
2481 self.schedule_collections();
2482 self.cleanup_collections();
2483 self.update_frontier_introspection();
2484 self.refresh_state_metrics();
2485 self.refresh_wallclock_lag();
2486 }
2487}
2488
2489#[derive(Debug)]
2494struct CollectionState<T: ComputeControllerTimestamp> {
2495 log_collection: bool,
2499 dropped: bool,
2505 scheduled: bool,
2508
2509 read_only: bool,
2513
2514 shared: SharedCollectionState<T>,
2516
2517 implied_read_hold: ReadHold<T>,
2524 warmup_read_hold: ReadHold<T>,
2532 read_policy: Option<ReadPolicy<T>>,
2538
2539 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2542 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2545
2546 introspection: CollectionIntrospection<T>,
2548
2549 wallclock_lag_histogram_stash: Option<
2556 BTreeMap<
2557 (
2558 WallclockLagHistogramPeriod,
2559 WallclockLag,
2560 BTreeMap<&'static str, String>,
2561 ),
2562 Diff,
2563 >,
2564 >,
2565}
2566
2567impl<T: ComputeControllerTimestamp> CollectionState<T> {
2568 fn new(
2570 collection_id: GlobalId,
2571 as_of: Antichain<T>,
2572 shared: SharedCollectionState<T>,
2573 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2574 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2575 read_hold_tx: read_holds::ChangeTx<T>,
2576 introspection: CollectionIntrospection<T>,
2577 ) -> Self {
2578 let since = as_of.clone();
2580 let upper = as_of;
2582
2583 assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2585 assert!(shared.lock_write_frontier(|f| f == &upper));
2586
2587 let implied_read_hold =
2591 ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2592 let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2593
2594 let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2595 shared.lock_read_capabilities(|c| {
2596 c.update_iter(updates);
2597 });
2598
2599 let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2603 true => None,
2604 false => Some(Default::default()),
2605 };
2606
2607 Self {
2608 log_collection: false,
2609 dropped: false,
2610 scheduled: false,
2611 read_only: true,
2612 shared,
2613 implied_read_hold,
2614 warmup_read_hold,
2615 read_policy: Some(ReadPolicy::ValidFrom(since)),
2616 storage_dependencies,
2617 compute_dependencies,
2618 introspection,
2619 wallclock_lag_histogram_stash,
2620 }
2621 }
2622
2623 fn new_log_collection(
2625 id: GlobalId,
2626 shared: SharedCollectionState<T>,
2627 read_hold_tx: read_holds::ChangeTx<T>,
2628 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2629 ) -> Self {
2630 let since = Antichain::from_elem(T::minimum());
2631 let introspection =
2632 CollectionIntrospection::new(id, introspection_tx, since.clone(), false, None, None);
2633 let mut state = Self::new(
2634 id,
2635 since,
2636 shared,
2637 Default::default(),
2638 Default::default(),
2639 read_hold_tx,
2640 introspection,
2641 );
2642 state.log_collection = true;
2643 state.scheduled = true;
2645 state
2646 }
2647
2648 fn read_frontier(&self) -> Antichain<T> {
2650 self.shared
2651 .lock_read_capabilities(|c| c.frontier().to_owned())
2652 }
2653
2654 fn write_frontier(&self) -> Antichain<T> {
2656 self.shared.lock_write_frontier(|f| f.clone())
2657 }
2658
2659 fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2660 self.storage_dependencies.keys().copied()
2661 }
2662
2663 fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2664 self.compute_dependencies.keys().copied()
2665 }
2666
2667 fn dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2669 self.compute_dependency_ids()
2670 .chain(self.storage_dependency_ids())
2671 }
2672}
2673
2674#[derive(Clone, Debug)]
2685pub(super) struct SharedCollectionState<T> {
2686 read_capabilities: Arc<Mutex<MutableAntichain<T>>>,
2699 write_frontier: Arc<Mutex<Antichain<T>>>,
2701}
2702
2703impl<T: Timestamp> SharedCollectionState<T> {
2704 pub fn new(as_of: Antichain<T>) -> Self {
2705 let since = as_of.clone();
2707 let upper = as_of;
2709
2710 let mut read_capabilities = MutableAntichain::new();
2714 read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2715
2716 Self {
2717 read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2718 write_frontier: Arc::new(Mutex::new(upper)),
2719 }
2720 }
2721
2722 pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2723 where
2724 F: FnOnce(&mut MutableAntichain<T>) -> R,
2725 {
2726 let mut caps = self.read_capabilities.lock().expect("poisoned");
2727 f(&mut *caps)
2728 }
2729
2730 pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2731 where
2732 F: FnOnce(&mut Antichain<T>) -> R,
2733 {
2734 let mut frontier = self.write_frontier.lock().expect("poisoned");
2735 f(&mut *frontier)
2736 }
2737}
2738
2739#[derive(Debug)]
2744struct CollectionIntrospection<T: ComputeControllerTimestamp> {
2745 collection_id: GlobalId,
2747 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2749 frontiers: Option<FrontiersIntrospectionState<T>>,
2754 refresh: Option<RefreshIntrospectionState<T>>,
2758}
2759
2760impl<T: ComputeControllerTimestamp> CollectionIntrospection<T> {
2761 fn new(
2762 collection_id: GlobalId,
2763 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2764 as_of: Antichain<T>,
2765 storage_sink: bool,
2766 initial_as_of: Option<Antichain<T>>,
2767 refresh_schedule: Option<RefreshSchedule>,
2768 ) -> Self {
2769 let refresh =
2770 match (refresh_schedule, initial_as_of) {
2771 (Some(refresh_schedule), Some(initial_as_of)) => Some(
2772 RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2773 ),
2774 (refresh_schedule, _) => {
2775 soft_assert_or_log!(
2778 refresh_schedule.is_none(),
2779 "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2780 );
2781 None
2782 }
2783 };
2784 let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2785
2786 let self_ = Self {
2787 collection_id,
2788 introspection_tx,
2789 frontiers,
2790 refresh,
2791 };
2792
2793 self_.report_initial_state();
2794 self_
2795 }
2796
2797 fn report_initial_state(&self) {
2799 if let Some(frontiers) = &self.frontiers {
2800 let row = frontiers.row_for_collection(self.collection_id);
2801 let updates = vec![(row, Diff::ONE)];
2802 self.send(IntrospectionType::Frontiers, updates);
2803 }
2804
2805 if let Some(refresh) = &self.refresh {
2806 let row = refresh.row_for_collection(self.collection_id);
2807 let updates = vec![(row, Diff::ONE)];
2808 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2809 }
2810 }
2811
2812 fn observe_frontiers(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2815 self.update_frontier_introspection(read_frontier, write_frontier);
2816 self.update_refresh_introspection(write_frontier);
2817 }
2818
2819 fn update_frontier_introspection(
2820 &mut self,
2821 read_frontier: &Antichain<T>,
2822 write_frontier: &Antichain<T>,
2823 ) {
2824 let Some(frontiers) = &mut self.frontiers else {
2825 return;
2826 };
2827
2828 if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2829 {
2830 return; };
2832
2833 let retraction = frontiers.row_for_collection(self.collection_id);
2834 frontiers.update(read_frontier, write_frontier);
2835 let insertion = frontiers.row_for_collection(self.collection_id);
2836 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2837 self.send(IntrospectionType::Frontiers, updates);
2838 }
2839
2840 fn update_refresh_introspection(&mut self, write_frontier: &Antichain<T>) {
2841 let Some(refresh) = &mut self.refresh else {
2842 return;
2843 };
2844
2845 let retraction = refresh.row_for_collection(self.collection_id);
2846 refresh.frontier_update(write_frontier);
2847 let insertion = refresh.row_for_collection(self.collection_id);
2848
2849 if retraction == insertion {
2850 return; }
2852
2853 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2854 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2855 }
2856
2857 fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2858 let _ = self.introspection_tx.send((introspection_type, updates));
2861 }
2862}
2863
2864impl<T: ComputeControllerTimestamp> Drop for CollectionIntrospection<T> {
2865 fn drop(&mut self) {
2866 if let Some(frontiers) = &self.frontiers {
2868 let row = frontiers.row_for_collection(self.collection_id);
2869 let updates = vec![(row, Diff::MINUS_ONE)];
2870 self.send(IntrospectionType::Frontiers, updates);
2871 }
2872
2873 if let Some(refresh) = &self.refresh {
2875 let retraction = refresh.row_for_collection(self.collection_id);
2876 let updates = vec![(retraction, Diff::MINUS_ONE)];
2877 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2878 }
2879 }
2880}
2881
2882#[derive(Debug)]
2883struct FrontiersIntrospectionState<T> {
2884 read_frontier: Antichain<T>,
2885 write_frontier: Antichain<T>,
2886}
2887
2888impl<T: ComputeControllerTimestamp> FrontiersIntrospectionState<T> {
2889 fn new(as_of: Antichain<T>) -> Self {
2890 Self {
2891 read_frontier: as_of.clone(),
2892 write_frontier: as_of,
2893 }
2894 }
2895
2896 fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2898 let read_frontier = self
2899 .read_frontier
2900 .as_option()
2901 .map_or(Datum::Null, |ts| ts.clone().into());
2902 let write_frontier = self
2903 .write_frontier
2904 .as_option()
2905 .map_or(Datum::Null, |ts| ts.clone().into());
2906 Row::pack_slice(&[
2907 Datum::String(&collection_id.to_string()),
2908 read_frontier,
2909 write_frontier,
2910 ])
2911 }
2912
2913 fn update(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2915 if read_frontier != &self.read_frontier {
2916 self.read_frontier.clone_from(read_frontier);
2917 }
2918 if write_frontier != &self.write_frontier {
2919 self.write_frontier.clone_from(write_frontier);
2920 }
2921 }
2922}
2923
2924#[derive(Debug)]
2927struct RefreshIntrospectionState<T> {
2928 refresh_schedule: RefreshSchedule,
2930 initial_as_of: Antichain<T>,
2931 next_refresh: Datum<'static>, last_completed_refresh: Datum<'static>, }
2935
2936impl<T> RefreshIntrospectionState<T> {
2937 fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2939 Row::pack_slice(&[
2940 Datum::String(&collection_id.to_string()),
2941 self.last_completed_refresh,
2942 self.next_refresh,
2943 ])
2944 }
2945}
2946
2947impl<T: ComputeControllerTimestamp> RefreshIntrospectionState<T> {
2948 fn new(
2951 refresh_schedule: RefreshSchedule,
2952 initial_as_of: Antichain<T>,
2953 upper: &Antichain<T>,
2954 ) -> Self {
2955 let mut self_ = Self {
2956 refresh_schedule: refresh_schedule.clone(),
2957 initial_as_of: initial_as_of.clone(),
2958 next_refresh: Datum::Null,
2959 last_completed_refresh: Datum::Null,
2960 };
2961 self_.frontier_update(upper);
2962 self_
2963 }
2964
2965 fn frontier_update(&mut self, write_frontier: &Antichain<T>) {
2968 if write_frontier.is_empty() {
2969 self.last_completed_refresh =
2970 if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2971 last_refresh.into()
2972 } else {
2973 T::maximum().into()
2976 };
2977 self.next_refresh = Datum::Null;
2978 } else {
2979 if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2980 self.last_completed_refresh = Datum::Null;
2982 let initial_as_of = self.initial_as_of.as_option().expect(
2983 "initial_as_of can't be [], because then there would be no refreshes at all",
2984 );
2985 let first_refresh = initial_as_of
2986 .round_up(&self.refresh_schedule)
2987 .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2988 soft_assert_or_log!(
2989 first_refresh == *initial_as_of,
2990 "initial_as_of should be set to the first refresh"
2991 );
2992 self.next_refresh = first_refresh.into();
2993 } else {
2994 let write_frontier = write_frontier.as_option().expect("checked above");
2996 self.last_completed_refresh = write_frontier
2997 .round_down_minus_1(&self.refresh_schedule)
2998 .map_or_else(
2999 || {
3000 soft_panic_or_log!(
3001 "rounding down should have returned the first refresh or later"
3002 );
3003 Datum::Null
3004 },
3005 |last_completed_refresh| last_completed_refresh.into(),
3006 );
3007 self.next_refresh = write_frontier.clone().into();
3008 }
3009 }
3010 }
3011}
3012
3013#[derive(Debug)]
3015struct PendingPeek<T: Timestamp> {
3016 target_replica: Option<ReplicaId>,
3020 otel_ctx: OpenTelemetryContext,
3022 requested_at: Instant,
3026 read_hold: ReadHold<T>,
3028 peek_response_tx: oneshot::Sender<PeekResponse>,
3030 limit: Option<usize>,
3032 offset: usize,
3034}
3035
3036#[derive(Debug, Clone)]
3037struct ActiveSubscribe<T> {
3038 frontier: Antichain<T>,
3040 target_replica: Option<ReplicaId>,
3044}
3045
3046impl<T: ComputeControllerTimestamp> ActiveSubscribe<T> {
3047 fn new(target_replica: Option<ReplicaId>) -> Self {
3048 Self {
3049 frontier: Antichain::from_elem(T::minimum()),
3050 target_replica,
3051 }
3052 }
3053}
3054
3055#[derive(Debug)]
3057struct ReplicaState<T: ComputeControllerTimestamp> {
3058 id: ReplicaId,
3060 client: ReplicaClient<T>,
3062 config: ReplicaConfig,
3064 metrics: ReplicaMetrics,
3066 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3068 collections: BTreeMap<GlobalId, ReplicaCollectionState<T>>,
3070 epoch: u64,
3072}
3073
3074impl<T: ComputeControllerTimestamp> ReplicaState<T> {
3075 fn new(
3076 id: ReplicaId,
3077 client: ReplicaClient<T>,
3078 config: ReplicaConfig,
3079 metrics: ReplicaMetrics,
3080 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3081 epoch: u64,
3082 ) -> Self {
3083 Self {
3084 id,
3085 client,
3086 config,
3087 metrics,
3088 introspection_tx,
3089 epoch,
3090 collections: Default::default(),
3091 }
3092 }
3093
3094 fn add_collection(
3100 &mut self,
3101 id: GlobalId,
3102 as_of: Antichain<T>,
3103 input_read_holds: Vec<ReadHold<T>>,
3104 ) {
3105 let metrics = self.metrics.for_collection(id);
3106 let introspection = ReplicaCollectionIntrospection::new(
3107 self.id,
3108 id,
3109 self.introspection_tx.clone(),
3110 as_of.clone(),
3111 );
3112 let mut state =
3113 ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
3114
3115 if id.is_transient() {
3119 state.wallclock_lag_max = None;
3120 }
3121
3122 if let Some(previous) = self.collections.insert(id, state) {
3123 panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
3124 }
3125 }
3126
3127 fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState<T>> {
3129 self.collections.remove(&id)
3130 }
3131
3132 fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
3134 self.collections.get(&id).map_or(true, |c| {
3135 c.write_frontier.is_empty()
3136 && c.input_frontier.is_empty()
3137 && c.output_frontier.is_empty()
3138 })
3139 }
3140
3141 #[mz_ore::instrument(level = "debug")]
3145 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3146 let Self {
3153 id,
3154 client: _,
3155 config: _,
3156 metrics: _,
3157 introspection_tx: _,
3158 epoch,
3159 collections,
3160 } = self;
3161
3162 fn field(
3163 key: &str,
3164 value: impl Serialize,
3165 ) -> Result<(String, serde_json::Value), anyhow::Error> {
3166 let value = serde_json::to_value(value)?;
3167 Ok((key.to_string(), value))
3168 }
3169
3170 let collections: BTreeMap<_, _> = collections
3171 .iter()
3172 .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
3173 .collect();
3174
3175 let map = serde_json::Map::from_iter([
3176 field("id", id.to_string())?,
3177 field("collections", collections)?,
3178 field("epoch", epoch)?,
3179 ]);
3180 Ok(serde_json::Value::Object(map))
3181 }
3182}
3183
3184#[derive(Debug)]
3185struct ReplicaCollectionState<T: ComputeControllerTimestamp> {
3186 write_frontier: Antichain<T>,
3190 input_frontier: Antichain<T>,
3194 output_frontier: Antichain<T>,
3198
3199 metrics: Option<ReplicaCollectionMetrics>,
3203 as_of: Antichain<T>,
3205 introspection: ReplicaCollectionIntrospection<T>,
3207 input_read_holds: Vec<ReadHold<T>>,
3213
3214 wallclock_lag_max: Option<WallclockLag>,
3218}
3219
3220impl<T: ComputeControllerTimestamp> ReplicaCollectionState<T> {
3221 fn new(
3222 metrics: Option<ReplicaCollectionMetrics>,
3223 as_of: Antichain<T>,
3224 introspection: ReplicaCollectionIntrospection<T>,
3225 input_read_holds: Vec<ReadHold<T>>,
3226 ) -> Self {
3227 Self {
3228 write_frontier: as_of.clone(),
3229 input_frontier: as_of.clone(),
3230 output_frontier: as_of.clone(),
3231 metrics,
3232 as_of,
3233 introspection,
3234 input_read_holds,
3235 wallclock_lag_max: Some(WallclockLag::MIN),
3236 }
3237 }
3238
3239 fn hydrated(&self) -> bool {
3241 self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
3257 }
3258
3259 fn update_write_frontier(&mut self, new_frontier: Antichain<T>) {
3261 if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
3262 soft_panic_or_log!(
3263 "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
3264 self.write_frontier,
3265 );
3266 return;
3267 } else if new_frontier == self.write_frontier {
3268 return;
3269 }
3270
3271 self.write_frontier = new_frontier;
3272 }
3273
3274 fn update_input_frontier(&mut self, new_frontier: Antichain<T>) {
3276 if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
3277 soft_panic_or_log!(
3278 "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
3279 self.input_frontier,
3280 );
3281 return;
3282 } else if new_frontier == self.input_frontier {
3283 return;
3284 }
3285
3286 self.input_frontier = new_frontier;
3287
3288 for read_hold in &mut self.input_read_holds {
3290 let result = read_hold.try_downgrade(self.input_frontier.clone());
3291 soft_assert_or_log!(
3292 result.is_ok(),
3293 "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
3294 self.input_frontier,
3295 );
3296 }
3297 }
3298
3299 fn update_output_frontier(&mut self, new_frontier: Antichain<T>) {
3301 if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3302 soft_panic_or_log!(
3303 "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3304 self.output_frontier,
3305 );
3306 return;
3307 } else if new_frontier == self.output_frontier {
3308 return;
3309 }
3310
3311 self.output_frontier = new_frontier;
3312 }
3313}
3314
3315#[derive(Debug)]
3318struct ReplicaCollectionIntrospection<T: ComputeControllerTimestamp> {
3319 replica_id: ReplicaId,
3321 collection_id: GlobalId,
3323 write_frontier: Antichain<T>,
3325 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3327}
3328
3329impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
3330 fn new(
3332 replica_id: ReplicaId,
3333 collection_id: GlobalId,
3334 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3335 as_of: Antichain<T>,
3336 ) -> Self {
3337 let self_ = Self {
3338 replica_id,
3339 collection_id,
3340 write_frontier: as_of,
3341 introspection_tx,
3342 };
3343
3344 self_.report_initial_state();
3345 self_
3346 }
3347
3348 fn report_initial_state(&self) {
3350 let row = self.write_frontier_row();
3351 let updates = vec![(row, Diff::ONE)];
3352 self.send(IntrospectionType::ReplicaFrontiers, updates);
3353 }
3354
3355 fn observe_frontier(&mut self, write_frontier: &Antichain<T>) {
3357 if self.write_frontier == *write_frontier {
3358 return; }
3360
3361 let retraction = self.write_frontier_row();
3362 self.write_frontier.clone_from(write_frontier);
3363 let insertion = self.write_frontier_row();
3364
3365 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3366 self.send(IntrospectionType::ReplicaFrontiers, updates);
3367 }
3368
3369 fn write_frontier_row(&self) -> Row {
3371 let write_frontier = self
3372 .write_frontier
3373 .as_option()
3374 .map_or(Datum::Null, |ts| ts.clone().into());
3375 Row::pack_slice(&[
3376 Datum::String(&self.collection_id.to_string()),
3377 Datum::String(&self.replica_id.to_string()),
3378 write_frontier,
3379 ])
3380 }
3381
3382 fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3383 let _ = self.introspection_tx.send((introspection_type, updates));
3386 }
3387}
3388
3389impl<T: ComputeControllerTimestamp> Drop for ReplicaCollectionIntrospection<T> {
3390 fn drop(&mut self) {
3391 let row = self.write_frontier_row();
3393 let updates = vec![(row, Diff::MINUS_ONE)];
3394 self.send(IntrospectionType::ReplicaFrontiers, updates);
3395 }
3396}