1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt::Debug;
14use std::num::NonZeroI64;
15use std::sync::{Arc, Mutex};
16use std::time::{Duration, Instant};
17
18use chrono::{DateTime, DurationRound, TimeDelta, Utc};
19use mz_build_info::BuildInfo;
20use mz_cluster_client::WallclockLagFn;
21use mz_cluster_client::client::ClusterStartupEpoch;
22use mz_compute_types::ComputeInstanceId;
23use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
24use mz_compute_types::plan::LirId;
25use mz_compute_types::plan::render_plan::RenderPlan;
26use mz_compute_types::sinks::{
27 ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, MaterializedViewSinkConnection,
28};
29use mz_compute_types::sources::SourceInstanceDesc;
30use mz_controller_types::dyncfgs::{
31 ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION, WALLCLOCK_LAG_RECORDING_INTERVAL,
32};
33use mz_dyncfg::ConfigSet;
34use mz_expr::RowSetFinishing;
35use mz_ore::cast::CastFrom;
36use mz_ore::channel::instrumented_unbounded_channel;
37use mz_ore::now::NowFn;
38use mz_ore::tracing::OpenTelemetryContext;
39use mz_ore::{soft_assert_or_log, soft_panic_or_log};
40use mz_repr::adt::interval::Interval;
41use mz_repr::adt::timestamp::CheckedTimestamp;
42use mz_repr::refresh_schedule::RefreshSchedule;
43use mz_repr::{Datum, Diff, GlobalId, Row};
44use mz_storage_client::controller::{IntrospectionType, WallclockLagHistogramPeriod};
45use mz_storage_types::read_holds::{self, ReadHold};
46use mz_storage_types::read_policy::ReadPolicy;
47use serde::Serialize;
48use thiserror::Error;
49use timely::PartialOrder;
50use timely::progress::frontier::MutableAntichain;
51use timely::progress::{Antichain, ChangeBatch, Timestamp};
52use tokio::sync::mpsc::error::SendError;
53use tokio::sync::{mpsc, oneshot};
54use uuid::Uuid;
55
56use crate::controller::error::{
57 CollectionLookupError, CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget,
58};
59use crate::controller::replica::{ReplicaClient, ReplicaConfig};
60use crate::controller::{
61 ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, PeekNotification,
62 ReplicaId, StorageCollections,
63};
64use crate::logging::LogVariant;
65use crate::metrics::IntCounter;
66use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
67use crate::protocol::command::{ComputeCommand, ComputeParameters, Peek, PeekTarget};
68use crate::protocol::history::ComputeCommandHistory;
69use crate::protocol::response::{
70 ComputeResponse, CopyToResponse, FrontiersResponse, OperatorHydrationStatus, PeekResponse,
71 StatusResponse, SubscribeBatch, SubscribeResponse,
72};
73use crate::service::{ComputeClient, ComputeGrpcClient};
74
75#[derive(Error, Debug)]
76#[error("replica exists already: {0}")]
77pub(super) struct ReplicaExists(pub ReplicaId);
78
79#[derive(Error, Debug)]
80#[error("replica does not exist: {0}")]
81pub(super) struct ReplicaMissing(pub ReplicaId);
82
83#[derive(Error, Debug)]
84pub(super) enum DataflowCreationError {
85 #[error("collection does not exist: {0}")]
86 CollectionMissing(GlobalId),
87 #[error("replica does not exist: {0}")]
88 ReplicaMissing(ReplicaId),
89 #[error("dataflow definition lacks an as_of value")]
90 MissingAsOf,
91 #[error("subscribe dataflow has an empty as_of")]
92 EmptyAsOfForSubscribe,
93 #[error("copy to dataflow has an empty as_of")]
94 EmptyAsOfForCopyTo,
95 #[error("no read hold provided for dataflow import: {0}")]
96 ReadHoldMissing(GlobalId),
97 #[error("insufficient read hold provided for dataflow import: {0}")]
98 ReadHoldInsufficient(GlobalId),
99}
100
101impl From<CollectionMissing> for DataflowCreationError {
102 fn from(error: CollectionMissing) -> Self {
103 Self::CollectionMissing(error.0)
104 }
105}
106
107#[derive(Error, Debug)]
108pub(super) enum PeekError {
109 #[error("replica does not exist: {0}")]
110 ReplicaMissing(ReplicaId),
111 #[error("read hold ID does not match peeked collection: {0}")]
112 ReadHoldIdMismatch(GlobalId),
113 #[error("insufficient read hold provided: {0}")]
114 ReadHoldInsufficient(GlobalId),
115}
116
117#[derive(Error, Debug)]
118pub(super) enum ReadPolicyError {
119 #[error("collection does not exist: {0}")]
120 CollectionMissing(GlobalId),
121 #[error("collection is write-only: {0}")]
122 WriteOnlyCollection(GlobalId),
123}
124
125impl From<CollectionMissing> for ReadPolicyError {
126 fn from(error: CollectionMissing) -> Self {
127 Self::CollectionMissing(error.0)
128 }
129}
130
131pub type Command<T> = Box<dyn FnOnce(&mut Instance<T>) + Send>;
133
134#[derive(Clone, derivative::Derivative)]
136#[derivative(Debug)]
137pub(super) struct Client<T: ComputeControllerTimestamp> {
138 command_tx: mpsc::UnboundedSender<Command<T>>,
140 #[derivative(Debug = "ignore")]
142 read_hold_tx: read_holds::ChangeTx<T>,
143}
144
145impl<T: ComputeControllerTimestamp> Client<T> {
146 pub fn send(&self, command: Command<T>) -> Result<(), SendError<Command<T>>> {
147 self.command_tx.send(command)
148 }
149
150 pub fn read_hold_tx(&self) -> read_holds::ChangeTx<T> {
151 Arc::clone(&self.read_hold_tx)
152 }
153}
154
155impl<T> Client<T>
156where
157 T: ComputeControllerTimestamp,
158 ComputeGrpcClient: ComputeClient<T>,
159{
160 pub fn spawn(
161 id: ComputeInstanceId,
162 build_info: &'static BuildInfo,
163 storage: StorageCollections<T>,
164 arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
165 envd_epoch: NonZeroI64,
166 metrics: InstanceMetrics,
167 now: NowFn,
168 wallclock_lag: WallclockLagFn<T>,
169 dyncfg: Arc<ConfigSet>,
170 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
171 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
172 ) -> Self {
173 let (command_tx, command_rx) = mpsc::unbounded_channel();
174
175 let read_hold_tx: read_holds::ChangeTx<_> = {
176 let command_tx = command_tx.clone();
177 Arc::new(move |id, change: ChangeBatch<_>| {
178 let cmd: Command<_> = {
179 let change = change.clone();
180 Box::new(move |i| i.apply_read_hold_change(id, change.clone()))
181 };
182 command_tx.send(cmd).map_err(|_| SendError((id, change)))
183 })
184 };
185
186 mz_ore::task::spawn(
187 || format!("compute-instance-{id}"),
188 Instance::new(
189 build_info,
190 storage,
191 arranged_logs,
192 envd_epoch,
193 metrics,
194 now,
195 wallclock_lag,
196 dyncfg,
197 command_rx,
198 response_tx,
199 Arc::clone(&read_hold_tx),
200 introspection_tx,
201 )
202 .run(),
203 );
204
205 Self {
206 command_tx,
207 read_hold_tx,
208 }
209 }
210}
211
212pub(super) type ReplicaResponse<T> = (ReplicaId, u64, ComputeResponse<T>);
215
216pub(super) struct Instance<T: ComputeControllerTimestamp> {
218 build_info: &'static BuildInfo,
220 storage_collections: StorageCollections<T>,
222 initialized: bool,
224 read_only: bool,
230 workload_class: Option<String>,
234 replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
236 collections: BTreeMap<GlobalId, CollectionState<T>>,
244 log_sources: BTreeMap<LogVariant, GlobalId>,
246 peeks: BTreeMap<Uuid, PendingPeek<T>>,
255 subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>,
269 copy_tos: BTreeSet<GlobalId>,
277 history: ComputeCommandHistory<UIntGauge, T>,
279 command_rx: mpsc::UnboundedReceiver<Command<T>>,
281 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
283 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
285 envd_epoch: NonZeroI64,
287 replica_epochs: BTreeMap<ReplicaId, u64>,
289 metrics: InstanceMetrics,
291 dyncfg: Arc<ConfigSet>,
293
294 now: NowFn,
296 wallclock_lag: WallclockLagFn<T>,
298 wallclock_lag_last_recorded: DateTime<Utc>,
300
301 read_hold_tx: read_holds::ChangeTx<T>,
306 replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
308 replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse<T>, IntCounter>,
310}
311
312impl<T: ComputeControllerTimestamp> Instance<T> {
313 fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, CollectionMissing> {
315 self.collections.get(&id).ok_or(CollectionMissing(id))
316 }
317
318 fn collection_mut(
320 &mut self,
321 id: GlobalId,
322 ) -> Result<&mut CollectionState<T>, CollectionMissing> {
323 self.collections.get_mut(&id).ok_or(CollectionMissing(id))
324 }
325
326 fn expect_collection(&self, id: GlobalId) -> &CollectionState<T> {
332 self.collections.get(&id).expect("collection must exist")
333 }
334
335 fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T> {
341 self.collections
342 .get_mut(&id)
343 .expect("collection must exist")
344 }
345
346 fn collections_iter(&self) -> impl Iterator<Item = (GlobalId, &CollectionState<T>)> {
347 self.collections.iter().map(|(id, coll)| (*id, coll))
348 }
349
350 fn add_collection(
356 &mut self,
357 id: GlobalId,
358 as_of: Antichain<T>,
359 shared: SharedCollectionState<T>,
360 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
361 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
362 replica_input_read_holds: Vec<ReadHold<T>>,
363 write_only: bool,
364 storage_sink: bool,
365 initial_as_of: Option<Antichain<T>>,
366 refresh_schedule: Option<RefreshSchedule>,
367 ) {
368 let introspection = CollectionIntrospection::new(
370 id,
371 self.introspection_tx.clone(),
372 as_of.clone(),
373 storage_sink,
374 initial_as_of,
375 refresh_schedule,
376 );
377 let mut state = CollectionState::new(
378 id,
379 as_of.clone(),
380 shared,
381 storage_dependencies,
382 compute_dependencies,
383 Arc::clone(&self.read_hold_tx),
384 introspection,
385 );
386 if write_only {
388 state.read_policy = None;
389 }
390
391 if let Some(previous) = self.collections.insert(id, state) {
392 panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
393 }
394
395 for replica in self.replicas.values_mut() {
397 replica.add_collection(id, as_of.clone(), replica_input_read_holds.clone());
398 }
399
400 self.report_dependency_updates(id, Diff::ONE);
402 }
403
404 fn remove_collection(&mut self, id: GlobalId) {
405 self.report_dependency_updates(id, Diff::MINUS_ONE);
407
408 for replica in self.replicas.values_mut() {
410 replica.remove_collection(id);
411 }
412
413 self.collections.remove(&id);
415 }
416
417 fn add_replica_state(
418 &mut self,
419 id: ReplicaId,
420 client: ReplicaClient<T>,
421 config: ReplicaConfig,
422 epoch: ClusterStartupEpoch,
423 ) {
424 let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();
425
426 let metrics = self.metrics.for_replica(id);
427 let mut replica = ReplicaState::new(
428 id,
429 client,
430 config,
431 metrics,
432 self.introspection_tx.clone(),
433 epoch,
434 );
435
436 for (collection_id, collection) in &self.collections {
438 if collection.log_collection && !log_ids.contains(collection_id) {
440 continue;
441 }
442
443 let as_of = collection.read_frontier().to_owned();
444 let input_read_holds = collection.storage_dependencies.values().cloned().collect();
445 replica.add_collection(*collection_id, as_of, input_read_holds);
446 }
447
448 self.replicas.insert(id, replica);
449 }
450
451 fn deliver_response(&self, response: ComputeControllerResponse<T>) {
453 let _ = self.response_tx.send(response);
456 }
457
458 fn deliver_introspection_updates(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
460 let _ = self.introspection_tx.send((type_, updates));
463 }
464
465 fn replica_exists(&self, id: ReplicaId) -> bool {
467 self.replicas.contains_key(&id)
468 }
469
470 fn peeks_targeting(
472 &self,
473 replica_id: ReplicaId,
474 ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)> {
475 self.peeks.iter().filter_map(move |(uuid, peek)| {
476 if peek.target_replica == Some(replica_id) {
477 Some((*uuid, peek))
478 } else {
479 None
480 }
481 })
482 }
483
484 fn subscribes_targeting(&self, replica_id: ReplicaId) -> impl Iterator<Item = GlobalId> + '_ {
486 self.subscribes.iter().filter_map(move |(id, subscribe)| {
487 let targeting = subscribe.target_replica == Some(replica_id);
488 targeting.then_some(*id)
489 })
490 }
491
492 fn update_frontier_introspection(&mut self) {
501 for collection in self.collections.values_mut() {
502 collection
503 .introspection
504 .observe_frontiers(&collection.read_frontier(), &collection.write_frontier());
505 }
506
507 for replica in self.replicas.values_mut() {
508 for collection in replica.collections.values_mut() {
509 collection
510 .introspection
511 .observe_frontier(&collection.write_frontier);
512 }
513 }
514 }
515
516 fn refresh_state_metrics(&self) {
525 let unscheduled_collections_count =
526 self.collections.values().filter(|c| !c.scheduled).count();
527 let connected_replica_count = self
528 .replicas
529 .values()
530 .filter(|r| r.client.is_connected())
531 .count();
532
533 self.metrics
534 .replica_count
535 .set(u64::cast_from(self.replicas.len()));
536 self.metrics
537 .collection_count
538 .set(u64::cast_from(self.collections.len()));
539 self.metrics
540 .collection_unscheduled_count
541 .set(u64::cast_from(unscheduled_collections_count));
542 self.metrics
543 .peek_count
544 .set(u64::cast_from(self.peeks.len()));
545 self.metrics
546 .subscribe_count
547 .set(u64::cast_from(self.subscribes.len()));
548 self.metrics
549 .copy_to_count
550 .set(u64::cast_from(self.copy_tos.len()));
551 self.metrics
552 .connected_replica_count
553 .set(u64::cast_from(connected_replica_count));
554 }
555
556 fn refresh_wallclock_lag(&mut self) {
561 self.maybe_record_wallclock_lag();
563
564 let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
565 Some(ts) => (self.wallclock_lag)(ts.clone()),
566 None => Duration::ZERO,
567 };
568
569 for replica in self.replicas.values_mut() {
570 for collection in replica.collections.values_mut() {
571 let lag = frontier_lag(&collection.write_frontier);
572
573 if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
574 *wallclock_lag_max = std::cmp::max(*wallclock_lag_max, lag);
575 }
576
577 if let Some(metrics) = &mut collection.metrics {
578 metrics.wallclock_lag.observe(lag);
579 };
580 }
581 }
582
583 let now_ms = (self.now)();
584 let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
585 let histogram_labels = match &self.workload_class {
586 Some(wc) => [("workload_class", wc.clone())].into(),
587 None => BTreeMap::new(),
588 };
589
590 for collection in self.collections.values_mut() {
591 if !ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION.get(&self.dyncfg) {
592 continue;
593 }
594
595 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
596 let lag = collection.shared.lock_write_frontier(|f| frontier_lag(f));
597 let bucket = lag.as_secs().next_power_of_two();
598
599 let key = (histogram_period, bucket, histogram_labels.clone());
600 *stash.entry(key).or_default() += Diff::ONE;
601 }
602 }
603 }
604
605 fn maybe_record_wallclock_lag(&mut self) {
613 if self.read_only {
614 return;
615 }
616
617 let duration_trunc = |datetime: DateTime<_>, interval| {
618 let td = TimeDelta::from_std(interval).ok()?;
619 datetime.duration_trunc(td).ok()
620 };
621
622 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(&self.dyncfg);
623 let now_dt = mz_ore::now::to_datetime((self.now)());
624 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
625 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
626 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
627 duration_trunc(now_dt, *default).unwrap()
628 });
629 if now_trunc <= self.wallclock_lag_last_recorded {
630 return;
631 }
632
633 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
634
635 let mut history_updates = Vec::new();
636 for (replica_id, replica) in &mut self.replicas {
637 for (collection_id, collection) in &mut replica.collections {
638 let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max else {
639 continue;
640 };
641
642 let max_lag = std::mem::take(wallclock_lag_max);
643 let max_lag_us = i64::try_from(max_lag.as_micros()).expect("must fit");
644 let row = Row::pack_slice(&[
645 Datum::String(&collection_id.to_string()),
646 Datum::String(&replica_id.to_string()),
647 Datum::Interval(Interval::new(0, 0, max_lag_us)),
648 Datum::TimestampTz(now_ts),
649 ]);
650 history_updates.push((row, Diff::ONE));
651 }
652 }
653 if !history_updates.is_empty() {
654 self.deliver_introspection_updates(
655 IntrospectionType::WallclockLagHistory,
656 history_updates,
657 );
658 }
659
660 let mut histogram_updates = Vec::new();
661 let mut row_buf = Row::default();
662 for (collection_id, collection) in &mut self.collections {
663 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
664 continue;
665 };
666
667 for ((period, lag, labels), count) in std::mem::take(stash) {
668 let mut packer = row_buf.packer();
669 packer.extend([
670 Datum::TimestampTz(period.start),
671 Datum::TimestampTz(period.end),
672 Datum::String(&collection_id.to_string()),
673 Datum::UInt64(lag),
674 ]);
675 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
676 packer.push_dict(labels);
677
678 histogram_updates.push((row_buf.clone(), count));
679 }
680 }
681 if !histogram_updates.is_empty() {
682 self.deliver_introspection_updates(
683 IntrospectionType::WallclockLagHistogram,
684 histogram_updates,
685 );
686 }
687
688 self.wallclock_lag_last_recorded = now_trunc;
689 }
690
691 fn report_dependency_updates(&self, id: GlobalId, diff: Diff) {
697 let collection = self.expect_collection(id);
698 let dependencies = collection.dependency_ids();
699
700 let updates = dependencies
701 .map(|dependency_id| {
702 let row = Row::pack_slice(&[
703 Datum::String(&id.to_string()),
704 Datum::String(&dependency_id.to_string()),
705 ]);
706 (row, diff)
707 })
708 .collect();
709
710 self.deliver_introspection_updates(IntrospectionType::ComputeDependencies, updates);
711 }
712
713 fn update_operator_hydration_status(
715 &mut self,
716 replica_id: ReplicaId,
717 status: OperatorHydrationStatus,
718 ) {
719 let Some(replica) = self.replicas.get_mut(&replica_id) else {
720 tracing::error!(
721 %replica_id, ?status,
722 "status update for an unknown replica"
723 );
724 return;
725 };
726 let Some(collection) = replica.collections.get_mut(&status.collection_id) else {
727 tracing::error!(
728 %replica_id, ?status,
729 "status update for an unknown collection"
730 );
731 return;
732 };
733
734 collection.introspection.operator_hydrated(
735 status.lir_id,
736 status.worker_id,
737 status.hydrated,
738 );
739 }
740
741 #[mz_ore::instrument(level = "debug")]
747 pub fn collection_hydrated(
748 &self,
749 collection_id: GlobalId,
750 ) -> Result<bool, CollectionLookupError> {
751 if self.replicas.is_empty() {
752 return Ok(true);
753 }
754
755 for replica_state in self.replicas.values() {
756 let collection_state = replica_state
757 .collections
758 .get(&collection_id)
759 .ok_or(CollectionLookupError::CollectionMissing(collection_id))?;
760
761 if collection_state.hydrated() {
762 return Ok(true);
763 }
764 }
765
766 Ok(false)
767 }
768
769 #[mz_ore::instrument(level = "debug")]
775 pub fn collections_hydrated_on_replicas(
776 &self,
777 target_replica_ids: Option<Vec<ReplicaId>>,
778 exclude_collections: &BTreeSet<GlobalId>,
779 ) -> Result<bool, HydrationCheckBadTarget> {
780 if self.replicas.is_empty() {
781 return Ok(true);
782 }
783 let mut all_hydrated = true;
784 let target_replicas: BTreeSet<ReplicaId> = self
785 .replicas
786 .keys()
787 .filter_map(|id| match target_replica_ids {
788 None => Some(id.clone()),
789 Some(ref ids) if ids.contains(id) => Some(id.clone()),
790 Some(_) => None,
791 })
792 .collect();
793 if let Some(targets) = target_replica_ids {
794 if target_replicas.is_empty() {
795 return Err(HydrationCheckBadTarget(targets));
796 }
797 }
798
799 for (id, _collection) in self.collections_iter() {
800 if id.is_transient() || exclude_collections.contains(&id) {
801 continue;
802 }
803
804 let mut collection_hydrated = false;
805 for replica_state in self.replicas.values() {
806 if !target_replicas.contains(&replica_state.id) {
807 continue;
808 }
809 let collection_state = replica_state
810 .collections
811 .get(&id)
812 .expect("missing collection state");
813
814 if collection_state.hydrated() {
815 collection_hydrated = true;
816 break;
817 }
818 }
819
820 if !collection_hydrated {
821 tracing::info!("collection {id} is not hydrated on any replica");
822 all_hydrated = false;
823 }
826 }
827
828 Ok(all_hydrated)
829 }
830
831 #[mz_ore::instrument(level = "debug")]
837 pub fn collections_hydrated(&self, exclude_collections: &BTreeSet<GlobalId>) -> bool {
838 self.collections_hydrated_on_replicas(None, exclude_collections)
839 .expect("Cannot error if target_replica_ids is None")
840 }
841
842 fn cleanup_collections(&mut self) {
858 let to_remove: Vec<_> = self
859 .collections_iter()
860 .filter(|(id, collection)| {
861 collection.dropped
862 && collection.shared.lock_read_capabilities(|c| c.is_empty())
863 && self
864 .replicas
865 .values()
866 .all(|r| r.collection_frontiers_empty(*id))
867 })
868 .map(|(id, _collection)| id)
869 .collect();
870
871 for id in to_remove {
872 self.remove_collection(id);
873 }
874 }
875
876 #[mz_ore::instrument(level = "debug")]
880 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
881 let Self {
888 build_info: _,
889 storage_collections: _,
890 initialized,
891 read_only,
892 workload_class,
893 replicas,
894 collections,
895 log_sources: _,
896 peeks,
897 subscribes,
898 copy_tos,
899 history: _,
900 command_rx: _,
901 response_tx: _,
902 introspection_tx: _,
903 envd_epoch,
904 replica_epochs,
905 metrics: _,
906 dyncfg: _,
907 now: _,
908 wallclock_lag: _,
909 wallclock_lag_last_recorded,
910 read_hold_tx: _,
911 replica_tx: _,
912 replica_rx: _,
913 } = self;
914
915 fn field(
916 key: &str,
917 value: impl Serialize,
918 ) -> Result<(String, serde_json::Value), anyhow::Error> {
919 let value = serde_json::to_value(value)?;
920 Ok((key.to_string(), value))
921 }
922
923 let replicas: BTreeMap<_, _> = replicas
924 .iter()
925 .map(|(id, replica)| Ok((id.to_string(), replica.dump()?)))
926 .collect::<Result<_, anyhow::Error>>()?;
927 let collections: BTreeMap<_, _> = collections
928 .iter()
929 .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
930 .collect();
931 let peeks: BTreeMap<_, _> = peeks
932 .iter()
933 .map(|(uuid, peek)| (uuid.to_string(), format!("{peek:?}")))
934 .collect();
935 let subscribes: BTreeMap<_, _> = subscribes
936 .iter()
937 .map(|(id, subscribe)| (id.to_string(), format!("{subscribe:?}")))
938 .collect();
939 let copy_tos: Vec<_> = copy_tos.iter().map(|id| id.to_string()).collect();
940 let replica_epochs: BTreeMap<_, _> = replica_epochs
941 .iter()
942 .map(|(id, epoch)| (id.to_string(), epoch))
943 .collect();
944 let wallclock_lag_last_recorded = format!("{wallclock_lag_last_recorded:?}");
945
946 let map = serde_json::Map::from_iter([
947 field("initialized", initialized)?,
948 field("read_only", read_only)?,
949 field("workload_class", workload_class)?,
950 field("replicas", replicas)?,
951 field("collections", collections)?,
952 field("peeks", peeks)?,
953 field("subscribes", subscribes)?,
954 field("copy_tos", copy_tos)?,
955 field("envd_epoch", envd_epoch)?,
956 field("replica_epochs", replica_epochs)?,
957 field("wallclock_lag_last_recorded", wallclock_lag_last_recorded)?,
958 ]);
959 Ok(serde_json::Value::Object(map))
960 }
961}
962
963impl<T> Instance<T>
964where
965 T: ComputeControllerTimestamp,
966 ComputeGrpcClient: ComputeClient<T>,
967{
968 fn new(
969 build_info: &'static BuildInfo,
970 storage: StorageCollections<T>,
971 arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
972 envd_epoch: NonZeroI64,
973 metrics: InstanceMetrics,
974 now: NowFn,
975 wallclock_lag: WallclockLagFn<T>,
976 dyncfg: Arc<ConfigSet>,
977 command_rx: mpsc::UnboundedReceiver<Command<T>>,
978 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
979 read_hold_tx: read_holds::ChangeTx<T>,
980 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
981 ) -> Self {
982 let mut collections = BTreeMap::new();
983 let mut log_sources = BTreeMap::new();
984 for (log, id, shared) in arranged_logs {
985 let collection = CollectionState::new_log_collection(
986 id,
987 shared,
988 Arc::clone(&read_hold_tx),
989 introspection_tx.clone(),
990 );
991 collections.insert(id, collection);
992 log_sources.insert(log, id);
993 }
994
995 let history = ComputeCommandHistory::new(metrics.for_history());
996
997 let send_count = metrics.response_send_count.clone();
998 let recv_count = metrics.response_recv_count.clone();
999 let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);
1000
1001 let now_dt = mz_ore::now::to_datetime(now());
1002
1003 Self {
1004 build_info,
1005 storage_collections: storage,
1006 initialized: false,
1007 read_only: true,
1008 workload_class: None,
1009 replicas: Default::default(),
1010 collections,
1011 log_sources,
1012 peeks: Default::default(),
1013 subscribes: Default::default(),
1014 copy_tos: Default::default(),
1015 history,
1016 command_rx,
1017 response_tx,
1018 introspection_tx,
1019 envd_epoch,
1020 replica_epochs: Default::default(),
1021 metrics,
1022 dyncfg,
1023 now,
1024 wallclock_lag,
1025 wallclock_lag_last_recorded: now_dt,
1026 read_hold_tx,
1027 replica_tx,
1028 replica_rx,
1029 }
1030 }
1031
1032 async fn run(mut self) {
1033 self.send(ComputeCommand::CreateTimely {
1034 config: Default::default(),
1035 epoch: ClusterStartupEpoch::new(self.envd_epoch, 0),
1036 });
1037
1038 let dummy_instance_config = Default::default();
1040 self.send(ComputeCommand::CreateInstance(dummy_instance_config));
1041
1042 loop {
1043 tokio::select! {
1044 command = self.command_rx.recv() => match command {
1045 Some(cmd) => cmd(&mut self),
1046 None => break,
1047 },
1048 response = self.replica_rx.recv() => match response {
1049 Some(response) => self.handle_response(response),
1050 None => unreachable!("self owns a sender side of the channel"),
1051 }
1052 }
1053 }
1054 }
1055
1056 #[mz_ore::instrument(level = "debug")]
1058 pub fn update_configuration(&mut self, config_params: ComputeParameters) {
1059 if let Some(workload_class) = &config_params.workload_class {
1060 self.workload_class = workload_class.clone();
1061 }
1062
1063 let command = ComputeCommand::UpdateConfiguration(Box::new(config_params));
1064 self.send(command);
1065 }
1066
1067 #[mz_ore::instrument(level = "debug")]
1072 pub fn initialization_complete(&mut self) {
1073 if !self.initialized {
1075 self.send(ComputeCommand::InitializationComplete);
1076 self.initialized = true;
1077 }
1078 }
1079
1080 #[mz_ore::instrument(level = "debug")]
1084 pub fn allow_writes(&mut self) {
1085 if self.read_only {
1086 self.read_only = false;
1087 self.send(ComputeCommand::AllowWrites);
1088 }
1089 }
1090
1091 #[mz_ore::instrument(level = "debug")]
1102 pub fn shutdown(&mut self) {
1103 let (_tx, rx) = mpsc::unbounded_channel();
1105 let mut command_rx = std::mem::replace(&mut self.command_rx, rx);
1106
1107 while let Ok(cmd) = command_rx.try_recv() {
1113 cmd(self);
1114 }
1115
1116 self.cleanup_collections();
1118
1119 let stray_replicas: Vec<_> = self.replicas.keys().collect();
1120 soft_assert_or_log!(
1121 stray_replicas.is_empty(),
1122 "dropped instance still has provisioned replicas: {stray_replicas:?}",
1123 );
1124
1125 let collections = self.collections.iter();
1126 let stray_collections: Vec<_> = collections
1127 .filter(|(_, c)| !c.log_collection)
1128 .map(|(id, _)| id)
1129 .collect();
1130 soft_assert_or_log!(
1131 stray_collections.is_empty(),
1132 "dropped instance still has installed collections: {stray_collections:?}",
1133 );
1134 }
1135
1136 #[mz_ore::instrument(level = "debug")]
1138 fn send(&mut self, cmd: ComputeCommand<T>) {
1139 self.history.push(cmd.clone());
1141
1142 for replica in self.replicas.values_mut() {
1144 let _ = replica.client.send(cmd.clone());
1146 }
1147 }
1148
1149 #[mz_ore::instrument(level = "debug")]
1151 pub fn add_replica(
1152 &mut self,
1153 id: ReplicaId,
1154 mut config: ReplicaConfig,
1155 ) -> Result<(), ReplicaExists> {
1156 if self.replica_exists(id) {
1157 return Err(ReplicaExists(id));
1158 }
1159
1160 config.logging.index_logs = self.log_sources.clone();
1161
1162 let replica_epoch = self.replica_epochs.entry(id).or_default();
1163 *replica_epoch += 1;
1164 let metrics = self.metrics.for_replica(id);
1165 let epoch = ClusterStartupEpoch::new(self.envd_epoch, *replica_epoch);
1166 let client = ReplicaClient::spawn(
1167 id,
1168 self.build_info,
1169 config.clone(),
1170 epoch,
1171 metrics.clone(),
1172 Arc::clone(&self.dyncfg),
1173 self.replica_tx.clone(),
1174 );
1175
1176 self.history.reduce();
1178
1179 self.history.update_source_uppers(&self.storage_collections);
1181
1182 for command in self.history.iter() {
1184 if client.send(command.clone()).is_err() {
1185 tracing::warn!("Replica {:?} connection terminated during hydration", id);
1188 break;
1189 }
1190 }
1191
1192 self.add_replica_state(id, client, config, epoch);
1194
1195 Ok(())
1196 }
1197
1198 #[mz_ore::instrument(level = "debug")]
1200 pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing> {
1201 self.replicas.remove(&id).ok_or(ReplicaMissing(id))?;
1202
1203 let to_drop: Vec<_> = self.subscribes_targeting(id).collect();
1207 for subscribe_id in to_drop {
1208 let subscribe = self.subscribes.remove(&subscribe_id).unwrap();
1209 let response = ComputeControllerResponse::SubscribeResponse(
1210 subscribe_id,
1211 SubscribeBatch {
1212 lower: subscribe.frontier.clone(),
1213 upper: subscribe.frontier,
1214 updates: Err(ERROR_TARGET_REPLICA_FAILED.into()),
1215 },
1216 );
1217 self.deliver_response(response);
1218 }
1219
1220 let mut peek_responses = Vec::new();
1225 let mut to_drop = Vec::new();
1226 for (uuid, peek) in self.peeks_targeting(id) {
1227 peek_responses.push(ComputeControllerResponse::PeekNotification(
1228 uuid,
1229 PeekNotification::Error(ERROR_TARGET_REPLICA_FAILED.into()),
1230 peek.otel_ctx.clone(),
1231 ));
1232 to_drop.push(uuid);
1233 }
1234 for response in peek_responses {
1235 self.deliver_response(response);
1236 }
1237 for uuid in to_drop {
1238 let response = PeekResponse::Error(ERROR_TARGET_REPLICA_FAILED.into());
1239 self.finish_peek(uuid, response);
1240 }
1241
1242 Ok(())
1243 }
1244
1245 fn rehydrate_replica(&mut self, id: ReplicaId) {
1251 let config = self.replicas[&id].config.clone();
1252 self.remove_replica(id).expect("replica must exist");
1253 let result = self.add_replica(id, config);
1254
1255 match result {
1256 Ok(()) => (),
1257 Err(ReplicaExists(_)) => unreachable!("replica was removed"),
1258 }
1259 }
1260
1261 fn rehydrate_failed_replicas(&mut self) {
1263 let replicas = self.replicas.iter();
1264 let failed_replicas: Vec<_> = replicas
1265 .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
1266 .collect();
1267
1268 for replica_id in failed_replicas {
1269 self.rehydrate_replica(replica_id);
1270 }
1271 }
1272
1273 #[mz_ore::instrument(level = "debug")]
1282 pub fn create_dataflow(
1283 &mut self,
1284 dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1285 import_read_holds: Vec<ReadHold<T>>,
1286 subscribe_target_replica: Option<ReplicaId>,
1287 mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState<T>>,
1288 ) -> Result<(), DataflowCreationError> {
1289 use DataflowCreationError::*;
1290
1291 if let Some(replica_id) = subscribe_target_replica {
1292 if !self.replica_exists(replica_id) {
1293 return Err(ReplicaMissing(replica_id));
1294 }
1295 }
1296
1297 let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
1299 if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
1300 return Err(EmptyAsOfForSubscribe);
1301 }
1302 if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
1303 return Err(EmptyAsOfForCopyTo);
1304 }
1305
1306 let mut storage_dependencies = BTreeMap::new();
1308 let mut compute_dependencies = BTreeMap::new();
1309
1310 let mut replica_input_read_holds = Vec::new();
1315
1316 let mut import_read_holds: BTreeMap<_, _> =
1317 import_read_holds.into_iter().map(|r| (r.id(), r)).collect();
1318
1319 for &id in dataflow.source_imports.keys() {
1320 let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1321 replica_input_read_holds.push(read_hold.clone());
1322
1323 read_hold
1324 .try_downgrade(as_of.clone())
1325 .map_err(|_| ReadHoldInsufficient(id))?;
1326 storage_dependencies.insert(id, read_hold);
1327 }
1328
1329 for &id in dataflow.index_imports.keys() {
1330 let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?;
1331 read_hold
1332 .try_downgrade(as_of.clone())
1333 .map_err(|_| ReadHoldInsufficient(id))?;
1334 compute_dependencies.insert(id, read_hold);
1335 }
1336
1337 if as_of.is_empty() {
1340 replica_input_read_holds = Default::default();
1341 }
1342
1343 for export_id in dataflow.export_ids() {
1345 let shared = shared_collection_state
1346 .remove(&export_id)
1347 .unwrap_or_else(|| SharedCollectionState::new(as_of.clone()));
1348 let write_only = dataflow.sink_exports.contains_key(&export_id);
1349 let storage_sink = dataflow.persist_sink_ids().any(|id| id == export_id);
1350 self.add_collection(
1351 export_id,
1352 as_of.clone(),
1353 shared,
1354 storage_dependencies.clone(),
1355 compute_dependencies.clone(),
1356 replica_input_read_holds.clone(),
1357 write_only,
1358 storage_sink,
1359 dataflow.initial_storage_as_of.clone(),
1360 dataflow.refresh_schedule.clone(),
1361 );
1362
1363 if let Ok(frontiers) = self.storage_collections.collection_frontiers(export_id) {
1366 self.maybe_update_global_write_frontier(export_id, frontiers.write_frontier);
1367 }
1368 }
1369
1370 for subscribe_id in dataflow.subscribe_ids() {
1372 self.subscribes
1373 .insert(subscribe_id, ActiveSubscribe::new(subscribe_target_replica));
1374 }
1375
1376 for copy_to_id in dataflow.copy_to_ids() {
1378 self.copy_tos.insert(copy_to_id);
1379 }
1380
1381 let mut source_imports = BTreeMap::new();
1384 for (id, (si, monotonic, _upper)) in dataflow.source_imports {
1385 let frontiers = self
1386 .storage_collections
1387 .collection_frontiers(id)
1388 .expect("collection exists");
1389
1390 let collection_metadata = self
1391 .storage_collections
1392 .collection_metadata(id)
1393 .expect("we have a read hold on this collection");
1394
1395 let desc = SourceInstanceDesc {
1396 storage_metadata: collection_metadata.clone(),
1397 arguments: si.arguments,
1398 typ: si.typ.clone(),
1399 };
1400 source_imports.insert(id, (desc, monotonic, frontiers.write_frontier));
1401 }
1402
1403 let mut sink_exports = BTreeMap::new();
1404 for (id, se) in dataflow.sink_exports {
1405 let connection = match se.connection {
1406 ComputeSinkConnection::MaterializedView(conn) => {
1407 let metadata = self
1408 .storage_collections
1409 .collection_metadata(id)
1410 .map_err(|_| CollectionMissing(id))?
1411 .clone();
1412 let conn = MaterializedViewSinkConnection {
1413 value_desc: conn.value_desc,
1414 storage_metadata: metadata,
1415 };
1416 ComputeSinkConnection::MaterializedView(conn)
1417 }
1418 ComputeSinkConnection::ContinualTask(conn) => {
1419 let metadata = self
1420 .storage_collections
1421 .collection_metadata(id)
1422 .map_err(|_| DataflowCreationError::CollectionMissing(id))?
1423 .clone();
1424 let conn = ContinualTaskConnection {
1425 input_id: conn.input_id,
1426 storage_metadata: metadata,
1427 };
1428 ComputeSinkConnection::ContinualTask(conn)
1429 }
1430 ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
1431 ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1432 ComputeSinkConnection::CopyToS3Oneshot(conn)
1433 }
1434 };
1435 let desc = ComputeSinkDesc {
1436 from: se.from,
1437 from_desc: se.from_desc,
1438 connection,
1439 with_snapshot: se.with_snapshot,
1440 up_to: se.up_to,
1441 non_null_assertions: se.non_null_assertions,
1442 refresh_schedule: se.refresh_schedule,
1443 };
1444 sink_exports.insert(id, desc);
1445 }
1446
1447 let objects_to_build = dataflow
1449 .objects_to_build
1450 .into_iter()
1451 .map(|object| BuildDesc {
1452 id: object.id,
1453 plan: RenderPlan::try_from(object.plan).expect("valid plan"),
1454 })
1455 .collect();
1456
1457 let augmented_dataflow = DataflowDescription {
1458 source_imports,
1459 sink_exports,
1460 objects_to_build,
1461 index_imports: dataflow.index_imports,
1463 index_exports: dataflow.index_exports,
1464 as_of: dataflow.as_of.clone(),
1465 until: dataflow.until,
1466 initial_storage_as_of: dataflow.initial_storage_as_of,
1467 refresh_schedule: dataflow.refresh_schedule,
1468 debug_name: dataflow.debug_name,
1469 time_dependence: dataflow.time_dependence,
1470 };
1471
1472 if augmented_dataflow.is_transient() {
1473 tracing::debug!(
1474 name = %augmented_dataflow.debug_name,
1475 import_ids = %augmented_dataflow.display_import_ids(),
1476 export_ids = %augmented_dataflow.display_export_ids(),
1477 as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1478 until = ?augmented_dataflow.until.elements(),
1479 "creating dataflow",
1480 );
1481 } else {
1482 tracing::info!(
1483 name = %augmented_dataflow.debug_name,
1484 import_ids = %augmented_dataflow.display_import_ids(),
1485 export_ids = %augmented_dataflow.display_export_ids(),
1486 as_of = ?augmented_dataflow.as_of.as_ref().unwrap().elements(),
1487 until = ?augmented_dataflow.until.elements(),
1488 "creating dataflow",
1489 );
1490 }
1491
1492 if as_of.is_empty() {
1495 tracing::info!(
1496 name = %augmented_dataflow.debug_name,
1497 "not sending `CreateDataflow`, because of empty `as_of`",
1498 );
1499 } else {
1500 let collections: Vec<_> = augmented_dataflow.export_ids().collect();
1501 let dataflow = Box::new(augmented_dataflow);
1502 self.send(ComputeCommand::CreateDataflow(dataflow));
1503
1504 for id in collections {
1505 self.maybe_schedule_collection(id);
1506 }
1507 }
1508
1509 Ok(())
1510 }
1511
1512 fn maybe_schedule_collection(&mut self, id: GlobalId) {
1518 let collection = self.expect_collection(id);
1519
1520 if collection.scheduled {
1522 return;
1523 }
1524
1525 let as_of = collection.read_frontier();
1526
1527 if as_of.is_empty() {
1530 return;
1531 }
1532
1533 let ready = if id.is_transient() {
1534 true
1540 } else {
1541 let not_self_dep = |x: &GlobalId| *x != id;
1547
1548 let compute_deps = collection.compute_dependency_ids().filter(not_self_dep);
1553 let compute_frontiers = compute_deps.map(|id| {
1554 let dep = &self.expect_collection(id);
1555 dep.write_frontier()
1556 });
1557
1558 let storage_deps = collection.storage_dependency_ids().filter(not_self_dep);
1559 let storage_frontiers = self
1560 .storage_collections
1561 .collections_frontiers(storage_deps.collect())
1562 .expect("must exist");
1563 let storage_frontiers = storage_frontiers.into_iter().map(|f| f.write_frontier);
1564
1565 let ready = compute_frontiers
1566 .chain(storage_frontiers)
1567 .all(|frontier| PartialOrder::less_than(&as_of, &frontier));
1568
1569 ready
1570 };
1571
1572 if ready {
1573 self.send(ComputeCommand::Schedule(id));
1574 let collection = self.expect_collection_mut(id);
1575 collection.scheduled = true;
1576 }
1577 }
1578
1579 fn schedule_collections(&mut self) {
1581 let ids: Vec<_> = self.collections.keys().copied().collect();
1582 for id in ids {
1583 self.maybe_schedule_collection(id);
1584 }
1585 }
1586
1587 #[mz_ore::instrument(level = "debug")]
1590 pub fn drop_collections(&mut self, ids: Vec<GlobalId>) -> Result<(), CollectionMissing> {
1591 for id in &ids {
1592 let collection = self.collection_mut(*id)?;
1593
1594 collection.dropped = true;
1596
1597 collection.implied_read_hold.release();
1600 collection.warmup_read_hold.release();
1601
1602 self.subscribes.remove(id);
1605 self.copy_tos.remove(id);
1608 }
1609
1610 Ok(())
1611 }
1612
1613 #[mz_ore::instrument(level = "debug")]
1615 pub fn peek(
1616 &mut self,
1617 peek_target: PeekTarget,
1618 literal_constraints: Option<Vec<Row>>,
1619 uuid: Uuid,
1620 timestamp: T,
1621 finishing: RowSetFinishing,
1622 map_filter_project: mz_expr::SafeMfpPlan,
1623 mut read_hold: ReadHold<T>,
1624 target_replica: Option<ReplicaId>,
1625 peek_response_tx: oneshot::Sender<PeekResponse>,
1626 ) -> Result<(), PeekError> {
1627 use PeekError::*;
1628
1629 let target_id = peek_target.id();
1631 if read_hold.id() != target_id {
1632 return Err(ReadHoldIdMismatch(read_hold.id()));
1633 }
1634 read_hold
1635 .try_downgrade(Antichain::from_elem(timestamp.clone()))
1636 .map_err(|_| ReadHoldInsufficient(target_id))?;
1637
1638 if let Some(target) = target_replica {
1639 if !self.replica_exists(target) {
1640 return Err(ReplicaMissing(target));
1641 }
1642 }
1643
1644 let otel_ctx = OpenTelemetryContext::obtain();
1645 self.peeks.insert(
1646 uuid,
1647 PendingPeek {
1648 target_replica,
1649 otel_ctx: otel_ctx.clone(),
1651 requested_at: Instant::now(),
1652 read_hold,
1653 peek_response_tx,
1654 limit: finishing.limit.map(usize::cast_from),
1655 offset: finishing.offset,
1656 },
1657 );
1658
1659 let peek = Peek {
1660 literal_constraints,
1661 uuid,
1662 timestamp,
1663 finishing,
1664 map_filter_project,
1665 otel_ctx,
1668 target: peek_target,
1669 };
1670 self.send(ComputeCommand::Peek(Box::new(peek)));
1671
1672 Ok(())
1673 }
1674
1675 #[mz_ore::instrument(level = "debug")]
1677 pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse) {
1678 let Some(peek) = self.peeks.get_mut(&uuid) else {
1679 tracing::warn!("did not find pending peek for {uuid}");
1680 return;
1681 };
1682
1683 let duration = peek.requested_at.elapsed();
1684 self.metrics
1685 .observe_peek_response(&PeekResponse::Canceled, duration);
1686
1687 let otel_ctx = peek.otel_ctx.clone();
1689 otel_ctx.attach_as_parent();
1690
1691 self.deliver_response(ComputeControllerResponse::PeekNotification(
1692 uuid,
1693 PeekNotification::Canceled,
1694 otel_ctx,
1695 ));
1696
1697 self.finish_peek(uuid, reason);
1700 }
1701
1702 #[mz_ore::instrument(level = "debug")]
1714 pub fn set_read_policy(
1715 &mut self,
1716 policies: Vec<(GlobalId, ReadPolicy<T>)>,
1717 ) -> Result<(), ReadPolicyError> {
1718 for (id, _policy) in &policies {
1721 let collection = self.collection(*id)?;
1722 if collection.read_policy.is_none() {
1723 return Err(ReadPolicyError::WriteOnlyCollection(*id));
1724 }
1725 }
1726
1727 for (id, new_policy) in policies {
1728 let collection = self.expect_collection_mut(id);
1729 let new_since = new_policy.frontier(collection.write_frontier().borrow());
1730 let _ = collection.implied_read_hold.try_downgrade(new_since);
1731 collection.read_policy = Some(new_policy);
1732 }
1733
1734 Ok(())
1735 }
1736
1737 #[mz_ore::instrument(level = "debug")]
1745 fn maybe_update_global_write_frontier(&mut self, id: GlobalId, new_frontier: Antichain<T>) {
1746 let collection = self.expect_collection_mut(id);
1747
1748 let advanced = collection.shared.lock_write_frontier(|f| {
1749 let advanced = PartialOrder::less_than(f, &new_frontier);
1750 if advanced {
1751 f.clone_from(&new_frontier);
1752 }
1753 advanced
1754 });
1755
1756 if !advanced {
1757 return;
1758 }
1759
1760 let new_since = match &collection.read_policy {
1762 Some(read_policy) => {
1763 read_policy.frontier(new_frontier.borrow())
1766 }
1767 None => {
1768 Antichain::from_iter(
1777 new_frontier
1778 .iter()
1779 .map(|t| t.step_back().unwrap_or_else(T::minimum)),
1780 )
1781 }
1782 };
1783 let _ = collection.implied_read_hold.try_downgrade(new_since);
1784
1785 self.deliver_response(ComputeControllerResponse::FrontierUpper {
1787 id,
1788 upper: new_frontier,
1789 });
1790 }
1791
1792 fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch<T>) {
1794 let Some(collection) = self.collections.get_mut(&id) else {
1795 soft_panic_or_log!(
1796 "read hold change for absent collection (id={id}, changes={update:?})"
1797 );
1798 return;
1799 };
1800
1801 let new_since = collection.shared.lock_read_capabilities(|caps| {
1802 let read_frontier = caps.frontier();
1805 for (time, diff) in update.iter() {
1806 let count = caps.count_for(time) + diff;
1807 assert!(
1808 count >= 0,
1809 "invalid read capabilities update: negative capability \
1810 (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1811 );
1812 assert!(
1813 count == 0 || read_frontier.less_equal(time),
1814 "invalid read capabilities update: frontier regression \
1815 (id={id:?}, read_capabilities={caps:?}, update={update:?})",
1816 );
1817 }
1818
1819 let changes = caps.update_iter(update.drain());
1822
1823 let changed = changes.count() > 0;
1824 changed.then(|| caps.frontier().to_owned())
1825 });
1826
1827 let Some(new_since) = new_since else {
1828 return; };
1830
1831 for read_hold in collection.compute_dependencies.values_mut() {
1833 read_hold
1834 .try_downgrade(new_since.clone())
1835 .expect("frontiers don't regress");
1836 }
1837 for read_hold in collection.storage_dependencies.values_mut() {
1838 read_hold
1839 .try_downgrade(new_since.clone())
1840 .expect("frontiers don't regress");
1841 }
1842
1843 self.send(ComputeCommand::AllowCompaction {
1845 id,
1846 frontier: new_since,
1847 });
1848 }
1849
1850 fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse) {
1859 let Some(peek) = self.peeks.remove(&uuid) else {
1860 return;
1861 };
1862
1863 let _ = peek.peek_response_tx.send(response);
1865
1866 self.send(ComputeCommand::CancelPeek { uuid });
1869
1870 drop(peek.read_hold);
1871 }
1872
1873 fn handle_response(&mut self, (replica_id, incarnation, response): ReplicaResponse<T>) {
1876 if self
1878 .replicas
1879 .get(&replica_id)
1880 .filter(|replica| replica.epoch.replica() == incarnation)
1881 .is_none()
1882 {
1883 return;
1884 }
1885
1886 match response {
1889 ComputeResponse::Frontiers(id, frontiers) => {
1890 self.handle_frontiers_response(id, frontiers, replica_id);
1891 }
1892 ComputeResponse::PeekResponse(uuid, peek_response, otel_ctx) => {
1893 self.handle_peek_response(uuid, peek_response, otel_ctx, replica_id);
1894 }
1895 ComputeResponse::CopyToResponse(id, response) => {
1896 self.handle_copy_to_response(id, response, replica_id);
1897 }
1898 ComputeResponse::SubscribeResponse(id, response) => {
1899 self.handle_subscribe_response(id, response, replica_id);
1900 }
1901 ComputeResponse::Status(response) => {
1902 self.handle_status_response(response, replica_id);
1903 }
1904 }
1905 }
1906
1907 fn handle_frontiers_response(
1910 &mut self,
1911 id: GlobalId,
1912 frontiers: FrontiersResponse<T>,
1913 replica_id: ReplicaId,
1914 ) {
1915 if !self.collections.contains_key(&id) {
1916 soft_panic_or_log!(
1917 "frontiers update for an unknown collection \
1918 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1919 );
1920 return;
1921 }
1922 let Some(replica) = self.replicas.get_mut(&replica_id) else {
1923 soft_panic_or_log!(
1924 "frontiers update for an unknown replica \
1925 (replica_id={replica_id}, frontiers={frontiers:?})"
1926 );
1927 return;
1928 };
1929 let Some(replica_collection) = replica.collections.get_mut(&id) else {
1930 soft_panic_or_log!(
1931 "frontiers update for an unknown replica collection \
1932 (id={id}, replica_id={replica_id}, frontiers={frontiers:?})"
1933 );
1934 return;
1935 };
1936
1937 if let Some(new_frontier) = frontiers.input_frontier {
1938 replica_collection.update_input_frontier(new_frontier.clone());
1939 }
1940 if let Some(new_frontier) = frontiers.output_frontier {
1941 replica_collection.update_output_frontier(new_frontier.clone());
1942 }
1943 if let Some(new_frontier) = frontiers.write_frontier {
1944 replica_collection.update_write_frontier(new_frontier.clone());
1945 self.maybe_update_global_write_frontier(id, new_frontier);
1946 }
1947 }
1948
1949 #[mz_ore::instrument(level = "debug")]
1950 fn handle_peek_response(
1951 &mut self,
1952 uuid: Uuid,
1953 response: PeekResponse,
1954 otel_ctx: OpenTelemetryContext,
1955 replica_id: ReplicaId,
1956 ) {
1957 otel_ctx.attach_as_parent();
1958
1959 let Some(peek) = self.peeks.get(&uuid) else {
1962 return;
1963 };
1964
1965 let target_replica = peek.target_replica.unwrap_or(replica_id);
1967 if target_replica != replica_id {
1968 return;
1969 }
1970
1971 let duration = peek.requested_at.elapsed();
1972 self.metrics.observe_peek_response(&response, duration);
1973
1974 let notification = PeekNotification::new(&response, peek.offset, peek.limit);
1975 self.deliver_response(ComputeControllerResponse::PeekNotification(
1978 uuid,
1979 notification,
1980 otel_ctx,
1981 ));
1982
1983 self.finish_peek(uuid, response)
1984 }
1985
1986 fn handle_copy_to_response(
1987 &mut self,
1988 sink_id: GlobalId,
1989 response: CopyToResponse,
1990 replica_id: ReplicaId,
1991 ) {
1992 if !self.copy_tos.remove(&sink_id) {
1995 return;
1996 }
1997
1998 let result = match response {
1999 CopyToResponse::RowCount(count) => Ok(count),
2000 CopyToResponse::Error(error) => Err(anyhow::anyhow!(error)),
2001 CopyToResponse::Dropped => {
2006 tracing::error!(
2007 %sink_id, %replica_id,
2008 "received `Dropped` response for a tracked copy to",
2009 );
2010 return;
2011 }
2012 };
2013
2014 self.deliver_response(ComputeControllerResponse::CopyToResponse(sink_id, result));
2015 }
2016
2017 fn handle_subscribe_response(
2018 &mut self,
2019 subscribe_id: GlobalId,
2020 response: SubscribeResponse<T>,
2021 replica_id: ReplicaId,
2022 ) {
2023 if !self.collections.contains_key(&subscribe_id) {
2024 soft_panic_or_log!(
2025 "received response for an unknown subscribe \
2026 (subscribe_id={subscribe_id}, replica_id={replica_id})",
2027 );
2028 return;
2029 }
2030 let Some(replica) = self.replicas.get_mut(&replica_id) else {
2031 soft_panic_or_log!(
2032 "subscribe response for an unknown replica (replica_id={replica_id})"
2033 );
2034 return;
2035 };
2036 let Some(replica_collection) = replica.collections.get_mut(&subscribe_id) else {
2037 soft_panic_or_log!(
2038 "subscribe response for an unknown replica collection \
2039 (subscribe_id={subscribe_id}, replica_id={replica_id})"
2040 );
2041 return;
2042 };
2043
2044 let write_frontier = match &response {
2048 SubscribeResponse::Batch(batch) => batch.upper.clone(),
2049 SubscribeResponse::DroppedAt(_) => Antichain::new(),
2050 };
2051
2052 replica_collection.update_write_frontier(write_frontier.clone());
2056 replica_collection.update_input_frontier(write_frontier.clone());
2057 replica_collection.update_output_frontier(write_frontier.clone());
2058
2059 let Some(mut subscribe) = self.subscribes.get(&subscribe_id).cloned() else {
2061 return;
2062 };
2063 let replica_targeted = subscribe.target_replica.unwrap_or(replica_id) == replica_id;
2064 if !replica_targeted {
2065 return;
2066 }
2067
2068 self.maybe_update_global_write_frontier(subscribe_id, write_frontier);
2074
2075 match response {
2076 SubscribeResponse::Batch(batch) => {
2077 let upper = batch.upper;
2078 let mut updates = batch.updates;
2079
2080 if PartialOrder::less_than(&subscribe.frontier, &upper) {
2083 let lower = std::mem::replace(&mut subscribe.frontier, upper.clone());
2084
2085 if upper.is_empty() {
2086 self.subscribes.remove(&subscribe_id);
2088 } else {
2089 self.subscribes.insert(subscribe_id, subscribe);
2091 }
2092
2093 if let Ok(updates) = updates.as_mut() {
2094 updates.retain(|(time, _data, _diff)| lower.less_equal(time));
2095 }
2096 self.deliver_response(ComputeControllerResponse::SubscribeResponse(
2097 subscribe_id,
2098 SubscribeBatch {
2099 lower,
2100 upper,
2101 updates,
2102 },
2103 ));
2104 }
2105 }
2106 SubscribeResponse::DroppedAt(frontier) => {
2107 tracing::error!(
2112 %subscribe_id,
2113 %replica_id,
2114 frontier = ?frontier.elements(),
2115 "received `DroppedAt` response for a tracked subscribe",
2116 );
2117 self.subscribes.remove(&subscribe_id);
2118 }
2119 }
2120 }
2121
2122 fn handle_status_response(&mut self, response: StatusResponse, replica_id: ReplicaId) {
2123 match response {
2124 StatusResponse::OperatorHydration(status) => {
2125 self.update_operator_hydration_status(replica_id, status)
2126 }
2127 }
2128 }
2129
2130 fn downgrade_warmup_capabilities(&mut self) {
2143 let mut new_capabilities = BTreeMap::new();
2144 for (id, collection) in &self.collections {
2145 if collection.read_policy.is_none()
2149 && collection.shared.lock_write_frontier(|f| f.is_empty())
2150 {
2151 new_capabilities.insert(*id, Antichain::new());
2152 continue;
2153 }
2154
2155 let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
2156 let collection = self.collections.get(&dep_id);
2157 collection.map(|c| c.write_frontier())
2158 });
2159 let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2160 let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2161 frontiers.map(|f| f.write_frontier)
2162 });
2163
2164 let mut new_capability = Antichain::new();
2165 for frontier in compute_frontiers.chain(storage_frontiers) {
2166 for time in frontier.iter() {
2167 new_capability.insert(time.step_back().unwrap_or_else(|| time.clone()));
2168 }
2169 }
2170
2171 new_capabilities.insert(*id, new_capability);
2172 }
2173
2174 for (id, new_capability) in new_capabilities {
2175 let collection = self.expect_collection_mut(id);
2176 let _ = collection.warmup_read_hold.try_downgrade(new_capability);
2177 }
2178 }
2179
2180 #[mz_ore::instrument(level = "debug")]
2186 pub fn maintain(&mut self) {
2187 self.rehydrate_failed_replicas();
2188 self.downgrade_warmup_capabilities();
2189 self.schedule_collections();
2190 self.cleanup_collections();
2191 self.update_frontier_introspection();
2192 self.refresh_state_metrics();
2193 self.refresh_wallclock_lag();
2194 }
2195}
2196
2197#[derive(Debug)]
2202struct CollectionState<T: ComputeControllerTimestamp> {
2203 log_collection: bool,
2207 dropped: bool,
2213 scheduled: bool,
2216
2217 shared: SharedCollectionState<T>,
2219
2220 implied_read_hold: ReadHold<T>,
2227 warmup_read_hold: ReadHold<T>,
2235 read_policy: Option<ReadPolicy<T>>,
2241
2242 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2245 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2248
2249 introspection: CollectionIntrospection<T>,
2251
2252 wallclock_lag_histogram_stash: Option<
2259 BTreeMap<
2260 (
2261 WallclockLagHistogramPeriod,
2262 u64,
2263 BTreeMap<&'static str, String>,
2264 ),
2265 Diff,
2266 >,
2267 >,
2268}
2269
2270impl<T: ComputeControllerTimestamp> CollectionState<T> {
2271 fn new(
2273 collection_id: GlobalId,
2274 as_of: Antichain<T>,
2275 shared: SharedCollectionState<T>,
2276 storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2277 compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
2278 read_hold_tx: read_holds::ChangeTx<T>,
2279 introspection: CollectionIntrospection<T>,
2280 ) -> Self {
2281 let since = as_of.clone();
2283 let upper = as_of;
2285
2286 assert!(shared.lock_read_capabilities(|c| c.frontier() == since.borrow()));
2288 assert!(shared.lock_write_frontier(|f| f == &upper));
2289
2290 let implied_read_hold =
2294 ReadHold::new(collection_id, since.clone(), Arc::clone(&read_hold_tx));
2295 let warmup_read_hold = ReadHold::new(collection_id, since.clone(), read_hold_tx);
2296
2297 let updates = warmup_read_hold.since().iter().map(|t| (t.clone(), 1));
2298 shared.lock_read_capabilities(|c| {
2299 c.update_iter(updates);
2300 });
2301
2302 let wallclock_lag_histogram_stash = match collection_id.is_transient() {
2306 true => None,
2307 false => Some(Default::default()),
2308 };
2309
2310 Self {
2311 log_collection: false,
2312 dropped: false,
2313 scheduled: false,
2314 shared,
2315 implied_read_hold,
2316 warmup_read_hold,
2317 read_policy: Some(ReadPolicy::ValidFrom(since)),
2318 storage_dependencies,
2319 compute_dependencies,
2320 introspection,
2321 wallclock_lag_histogram_stash,
2322 }
2323 }
2324
2325 fn new_log_collection(
2327 id: GlobalId,
2328 shared: SharedCollectionState<T>,
2329 read_hold_tx: read_holds::ChangeTx<T>,
2330 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2331 ) -> Self {
2332 let since = Antichain::from_elem(T::minimum());
2333 let introspection =
2334 CollectionIntrospection::new(id, introspection_tx, since.clone(), false, None, None);
2335 let mut state = Self::new(
2336 id,
2337 since,
2338 shared,
2339 Default::default(),
2340 Default::default(),
2341 read_hold_tx,
2342 introspection,
2343 );
2344 state.log_collection = true;
2345 state.scheduled = true;
2347 state
2348 }
2349
2350 fn read_frontier(&self) -> Antichain<T> {
2352 self.shared
2353 .lock_read_capabilities(|c| c.frontier().to_owned())
2354 }
2355
2356 fn write_frontier(&self) -> Antichain<T> {
2358 self.shared.lock_write_frontier(|f| f.clone())
2359 }
2360
2361 fn storage_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2362 self.storage_dependencies.keys().copied()
2363 }
2364
2365 fn compute_dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2366 self.compute_dependencies.keys().copied()
2367 }
2368
2369 fn dependency_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2371 self.compute_dependency_ids()
2372 .chain(self.storage_dependency_ids())
2373 }
2374}
2375
2376#[derive(Clone, Debug)]
2387pub(super) struct SharedCollectionState<T> {
2388 read_capabilities: Arc<Mutex<MutableAntichain<T>>>,
2400 write_frontier: Arc<Mutex<Antichain<T>>>,
2402}
2403
2404impl<T: Timestamp> SharedCollectionState<T> {
2405 pub fn new(as_of: Antichain<T>) -> Self {
2406 let since = as_of.clone();
2408 let upper = as_of;
2410
2411 let mut read_capabilities = MutableAntichain::new();
2415 read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2416
2417 Self {
2418 read_capabilities: Arc::new(Mutex::new(read_capabilities)),
2419 write_frontier: Arc::new(Mutex::new(upper)),
2420 }
2421 }
2422
2423 pub fn lock_read_capabilities<F, R>(&self, f: F) -> R
2424 where
2425 F: FnOnce(&mut MutableAntichain<T>) -> R,
2426 {
2427 let mut caps = self.read_capabilities.lock().expect("poisoned");
2428 f(&mut *caps)
2429 }
2430
2431 pub fn lock_write_frontier<F, R>(&self, f: F) -> R
2432 where
2433 F: FnOnce(&mut Antichain<T>) -> R,
2434 {
2435 let mut frontier = self.write_frontier.lock().expect("poisoned");
2436 f(&mut *frontier)
2437 }
2438}
2439
2440#[derive(Debug)]
2445struct CollectionIntrospection<T: ComputeControllerTimestamp> {
2446 collection_id: GlobalId,
2448 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2450 frontiers: Option<FrontiersIntrospectionState<T>>,
2455 refresh: Option<RefreshIntrospectionState<T>>,
2459}
2460
2461impl<T: ComputeControllerTimestamp> CollectionIntrospection<T> {
2462 fn new(
2463 collection_id: GlobalId,
2464 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2465 as_of: Antichain<T>,
2466 storage_sink: bool,
2467 initial_as_of: Option<Antichain<T>>,
2468 refresh_schedule: Option<RefreshSchedule>,
2469 ) -> Self {
2470 let refresh =
2471 match (refresh_schedule, initial_as_of) {
2472 (Some(refresh_schedule), Some(initial_as_of)) => Some(
2473 RefreshIntrospectionState::new(refresh_schedule, initial_as_of, &as_of),
2474 ),
2475 (refresh_schedule, _) => {
2476 soft_assert_or_log!(
2479 refresh_schedule.is_none(),
2480 "`refresh_schedule` without an `initial_as_of`: {collection_id}"
2481 );
2482 None
2483 }
2484 };
2485 let frontiers = (!storage_sink).then(|| FrontiersIntrospectionState::new(as_of));
2486
2487 let self_ = Self {
2488 collection_id,
2489 introspection_tx,
2490 frontiers,
2491 refresh,
2492 };
2493
2494 self_.report_initial_state();
2495 self_
2496 }
2497
2498 fn report_initial_state(&self) {
2500 if let Some(frontiers) = &self.frontiers {
2501 let row = frontiers.row_for_collection(self.collection_id);
2502 let updates = vec![(row, Diff::ONE)];
2503 self.send(IntrospectionType::Frontiers, updates);
2504 }
2505
2506 if let Some(refresh) = &self.refresh {
2507 let row = refresh.row_for_collection(self.collection_id);
2508 let updates = vec![(row, Diff::ONE)];
2509 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2510 }
2511 }
2512
2513 fn observe_frontiers(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2516 self.update_frontier_introspection(read_frontier, write_frontier);
2517 self.update_refresh_introspection(write_frontier);
2518 }
2519
2520 fn update_frontier_introspection(
2521 &mut self,
2522 read_frontier: &Antichain<T>,
2523 write_frontier: &Antichain<T>,
2524 ) {
2525 let Some(frontiers) = &mut self.frontiers else {
2526 return;
2527 };
2528
2529 if &frontiers.read_frontier == read_frontier && &frontiers.write_frontier == write_frontier
2530 {
2531 return; };
2533
2534 let retraction = frontiers.row_for_collection(self.collection_id);
2535 frontiers.update(read_frontier, write_frontier);
2536 let insertion = frontiers.row_for_collection(self.collection_id);
2537 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2538 self.send(IntrospectionType::Frontiers, updates);
2539 }
2540
2541 fn update_refresh_introspection(&mut self, write_frontier: &Antichain<T>) {
2542 let Some(refresh) = &mut self.refresh else {
2543 return;
2544 };
2545
2546 let retraction = refresh.row_for_collection(self.collection_id);
2547 refresh.frontier_update(write_frontier);
2548 let insertion = refresh.row_for_collection(self.collection_id);
2549
2550 if retraction == insertion {
2551 return; }
2553
2554 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
2555 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2556 }
2557
2558 fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
2559 let _ = self.introspection_tx.send((introspection_type, updates));
2562 }
2563}
2564
2565impl<T: ComputeControllerTimestamp> Drop for CollectionIntrospection<T> {
2566 fn drop(&mut self) {
2567 if let Some(frontiers) = &self.frontiers {
2569 let row = frontiers.row_for_collection(self.collection_id);
2570 let updates = vec![(row, Diff::MINUS_ONE)];
2571 self.send(IntrospectionType::Frontiers, updates);
2572 }
2573
2574 if let Some(refresh) = &self.refresh {
2576 let retraction = refresh.row_for_collection(self.collection_id);
2577 let updates = vec![(retraction, Diff::MINUS_ONE)];
2578 self.send(IntrospectionType::ComputeMaterializedViewRefreshes, updates);
2579 }
2580 }
2581}
2582
2583#[derive(Debug)]
2584struct FrontiersIntrospectionState<T> {
2585 read_frontier: Antichain<T>,
2586 write_frontier: Antichain<T>,
2587}
2588
2589impl<T: ComputeControllerTimestamp> FrontiersIntrospectionState<T> {
2590 fn new(as_of: Antichain<T>) -> Self {
2591 Self {
2592 read_frontier: as_of.clone(),
2593 write_frontier: as_of,
2594 }
2595 }
2596
2597 fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2599 let read_frontier = self
2600 .read_frontier
2601 .as_option()
2602 .map_or(Datum::Null, |ts| ts.clone().into());
2603 let write_frontier = self
2604 .write_frontier
2605 .as_option()
2606 .map_or(Datum::Null, |ts| ts.clone().into());
2607 Row::pack_slice(&[
2608 Datum::String(&collection_id.to_string()),
2609 read_frontier,
2610 write_frontier,
2611 ])
2612 }
2613
2614 fn update(&mut self, read_frontier: &Antichain<T>, write_frontier: &Antichain<T>) {
2616 if read_frontier != &self.read_frontier {
2617 self.read_frontier.clone_from(read_frontier);
2618 }
2619 if write_frontier != &self.write_frontier {
2620 self.write_frontier.clone_from(write_frontier);
2621 }
2622 }
2623}
2624
2625#[derive(Debug)]
2628struct RefreshIntrospectionState<T> {
2629 refresh_schedule: RefreshSchedule,
2631 initial_as_of: Antichain<T>,
2632 next_refresh: Datum<'static>, last_completed_refresh: Datum<'static>, }
2636
2637impl<T> RefreshIntrospectionState<T> {
2638 fn row_for_collection(&self, collection_id: GlobalId) -> Row {
2640 Row::pack_slice(&[
2641 Datum::String(&collection_id.to_string()),
2642 self.last_completed_refresh,
2643 self.next_refresh,
2644 ])
2645 }
2646}
2647
2648impl<T: ComputeControllerTimestamp> RefreshIntrospectionState<T> {
2649 fn new(
2652 refresh_schedule: RefreshSchedule,
2653 initial_as_of: Antichain<T>,
2654 upper: &Antichain<T>,
2655 ) -> Self {
2656 let mut self_ = Self {
2657 refresh_schedule: refresh_schedule.clone(),
2658 initial_as_of: initial_as_of.clone(),
2659 next_refresh: Datum::Null,
2660 last_completed_refresh: Datum::Null,
2661 };
2662 self_.frontier_update(upper);
2663 self_
2664 }
2665
2666 fn frontier_update(&mut self, write_frontier: &Antichain<T>) {
2669 if write_frontier.is_empty() {
2670 self.last_completed_refresh =
2671 if let Some(last_refresh) = self.refresh_schedule.last_refresh() {
2672 last_refresh.into()
2673 } else {
2674 T::maximum().into()
2677 };
2678 self.next_refresh = Datum::Null;
2679 } else {
2680 if PartialOrder::less_equal(write_frontier, &self.initial_as_of) {
2681 self.last_completed_refresh = Datum::Null;
2683 let initial_as_of = self.initial_as_of.as_option().expect(
2684 "initial_as_of can't be [], because then there would be no refreshes at all",
2685 );
2686 let first_refresh = initial_as_of
2687 .round_up(&self.refresh_schedule)
2688 .expect("sequencing makes sure that REFRESH MVs always have a first refresh");
2689 soft_assert_or_log!(
2690 first_refresh == *initial_as_of,
2691 "initial_as_of should be set to the first refresh"
2692 );
2693 self.next_refresh = first_refresh.into();
2694 } else {
2695 let write_frontier = write_frontier.as_option().expect("checked above");
2697 self.last_completed_refresh = write_frontier
2698 .round_down_minus_1(&self.refresh_schedule)
2699 .map_or_else(
2700 || {
2701 soft_panic_or_log!(
2702 "rounding down should have returned the first refresh or later"
2703 );
2704 Datum::Null
2705 },
2706 |last_completed_refresh| last_completed_refresh.into(),
2707 );
2708 self.next_refresh = write_frontier.clone().into();
2709 }
2710 }
2711 }
2712}
2713
2714#[derive(Debug)]
2716struct PendingPeek<T: Timestamp> {
2717 target_replica: Option<ReplicaId>,
2721 otel_ctx: OpenTelemetryContext,
2723 requested_at: Instant,
2727 read_hold: ReadHold<T>,
2729 peek_response_tx: oneshot::Sender<PeekResponse>,
2731 limit: Option<usize>,
2733 offset: usize,
2735}
2736
2737#[derive(Debug, Clone)]
2738struct ActiveSubscribe<T> {
2739 frontier: Antichain<T>,
2741 target_replica: Option<ReplicaId>,
2745}
2746
2747impl<T: ComputeControllerTimestamp> ActiveSubscribe<T> {
2748 fn new(target_replica: Option<ReplicaId>) -> Self {
2749 Self {
2750 frontier: Antichain::from_elem(T::minimum()),
2751 target_replica,
2752 }
2753 }
2754}
2755
2756#[derive(Debug)]
2758struct ReplicaState<T: ComputeControllerTimestamp> {
2759 id: ReplicaId,
2761 client: ReplicaClient<T>,
2763 config: ReplicaConfig,
2765 metrics: ReplicaMetrics,
2767 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2769 collections: BTreeMap<GlobalId, ReplicaCollectionState<T>>,
2771 epoch: ClusterStartupEpoch,
2773}
2774
2775impl<T: ComputeControllerTimestamp> ReplicaState<T> {
2776 fn new(
2777 id: ReplicaId,
2778 client: ReplicaClient<T>,
2779 config: ReplicaConfig,
2780 metrics: ReplicaMetrics,
2781 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
2782 epoch: ClusterStartupEpoch,
2783 ) -> Self {
2784 Self {
2785 id,
2786 client,
2787 config,
2788 metrics,
2789 introspection_tx,
2790 epoch,
2791 collections: Default::default(),
2792 }
2793 }
2794
2795 fn add_collection(
2801 &mut self,
2802 id: GlobalId,
2803 as_of: Antichain<T>,
2804 input_read_holds: Vec<ReadHold<T>>,
2805 ) {
2806 let metrics = self.metrics.for_collection(id);
2807 let introspection = ReplicaCollectionIntrospection::new(
2808 self.id,
2809 id,
2810 self.introspection_tx.clone(),
2811 as_of.clone(),
2812 );
2813 let mut state =
2814 ReplicaCollectionState::new(metrics, as_of, introspection, input_read_holds);
2815
2816 if id.is_transient() {
2820 state.wallclock_lag_max = None;
2821 }
2822
2823 if let Some(previous) = self.collections.insert(id, state) {
2824 panic!("attempt to add a collection with existing ID {id} (previous={previous:?}");
2825 }
2826 }
2827
2828 fn remove_collection(&mut self, id: GlobalId) -> Option<ReplicaCollectionState<T>> {
2830 self.collections.remove(&id)
2831 }
2832
2833 fn collection_frontiers_empty(&self, id: GlobalId) -> bool {
2835 self.collections.get(&id).map_or(true, |c| {
2836 c.write_frontier.is_empty()
2837 && c.input_frontier.is_empty()
2838 && c.output_frontier.is_empty()
2839 })
2840 }
2841
2842 #[mz_ore::instrument(level = "debug")]
2846 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2847 let Self {
2854 id,
2855 client: _,
2856 config: _,
2857 metrics: _,
2858 introspection_tx: _,
2859 epoch,
2860 collections,
2861 } = self;
2862
2863 fn field(
2864 key: &str,
2865 value: impl Serialize,
2866 ) -> Result<(String, serde_json::Value), anyhow::Error> {
2867 let value = serde_json::to_value(value)?;
2868 Ok((key.to_string(), value))
2869 }
2870
2871 let collections: BTreeMap<_, _> = collections
2872 .iter()
2873 .map(|(id, collection)| (id.to_string(), format!("{collection:?}")))
2874 .collect();
2875
2876 let map = serde_json::Map::from_iter([
2877 field("id", id.to_string())?,
2878 field("collections", collections)?,
2879 field("epoch", epoch)?,
2880 ]);
2881 Ok(serde_json::Value::Object(map))
2882 }
2883}
2884
2885#[derive(Debug)]
2886struct ReplicaCollectionState<T: ComputeControllerTimestamp> {
2887 write_frontier: Antichain<T>,
2891 input_frontier: Antichain<T>,
2895 output_frontier: Antichain<T>,
2899
2900 metrics: Option<ReplicaCollectionMetrics>,
2904 as_of: Antichain<T>,
2906 introspection: ReplicaCollectionIntrospection<T>,
2908 input_read_holds: Vec<ReadHold<T>>,
2914
2915 wallclock_lag_max: Option<Duration>,
2919}
2920
2921impl<T: ComputeControllerTimestamp> ReplicaCollectionState<T> {
2922 fn new(
2923 metrics: Option<ReplicaCollectionMetrics>,
2924 as_of: Antichain<T>,
2925 introspection: ReplicaCollectionIntrospection<T>,
2926 input_read_holds: Vec<ReadHold<T>>,
2927 ) -> Self {
2928 Self {
2929 write_frontier: as_of.clone(),
2930 input_frontier: as_of.clone(),
2931 output_frontier: as_of.clone(),
2932 metrics,
2933 as_of,
2934 introspection,
2935 input_read_holds,
2936 wallclock_lag_max: Some(Duration::ZERO),
2937 }
2938 }
2939
2940 fn hydrated(&self) -> bool {
2942 self.as_of.is_empty() || PartialOrder::less_than(&self.as_of, &self.output_frontier)
2958 }
2959
2960 fn update_write_frontier(&mut self, new_frontier: Antichain<T>) {
2962 if PartialOrder::less_than(&new_frontier, &self.write_frontier) {
2963 soft_panic_or_log!(
2964 "replica collection write frontier regression (old={:?}, new={new_frontier:?})",
2965 self.write_frontier,
2966 );
2967 return;
2968 } else if new_frontier == self.write_frontier {
2969 return;
2970 }
2971
2972 self.write_frontier = new_frontier;
2973 }
2974
2975 fn update_input_frontier(&mut self, new_frontier: Antichain<T>) {
2977 if PartialOrder::less_than(&new_frontier, &self.input_frontier) {
2978 soft_panic_or_log!(
2979 "replica collection input frontier regression (old={:?}, new={new_frontier:?})",
2980 self.input_frontier,
2981 );
2982 return;
2983 } else if new_frontier == self.input_frontier {
2984 return;
2985 }
2986
2987 self.input_frontier = new_frontier;
2988
2989 for read_hold in &mut self.input_read_holds {
2991 let result = read_hold.try_downgrade(self.input_frontier.clone());
2992 soft_assert_or_log!(
2993 result.is_ok(),
2994 "read hold downgrade failed (read_hold={read_hold:?}, new_since={:?})",
2995 self.input_frontier,
2996 );
2997 }
2998 }
2999
3000 fn update_output_frontier(&mut self, new_frontier: Antichain<T>) {
3002 if PartialOrder::less_than(&new_frontier, &self.output_frontier) {
3003 soft_panic_or_log!(
3004 "replica collection output frontier regression (old={:?}, new={new_frontier:?})",
3005 self.output_frontier,
3006 );
3007 return;
3008 } else if new_frontier == self.output_frontier {
3009 return;
3010 }
3011
3012 self.output_frontier = new_frontier;
3013 }
3014}
3015
3016#[derive(Debug)]
3019struct ReplicaCollectionIntrospection<T: ComputeControllerTimestamp> {
3020 replica_id: ReplicaId,
3022 collection_id: GlobalId,
3024 operators: BTreeMap<(LirId, usize), bool>,
3027 write_frontier: Antichain<T>,
3029 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3031}
3032
3033impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
3034 fn new(
3036 replica_id: ReplicaId,
3037 collection_id: GlobalId,
3038 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
3039 as_of: Antichain<T>,
3040 ) -> Self {
3041 let self_ = Self {
3042 replica_id,
3043 collection_id,
3044 operators: Default::default(),
3045 write_frontier: as_of,
3046 introspection_tx,
3047 };
3048
3049 self_.report_initial_state();
3050 self_
3051 }
3052
3053 fn report_initial_state(&self) {
3055 let row = self.write_frontier_row();
3056 let updates = vec![(row, Diff::ONE)];
3057 self.send(IntrospectionType::ReplicaFrontiers, updates);
3058 }
3059
3060 fn operator_hydrated(&mut self, lir_id: LirId, worker_id: usize, hydrated: bool) {
3062 let retraction = self.operator_hydration_row(lir_id, worker_id);
3063 self.operators.insert((lir_id, worker_id), hydrated);
3064 let insertion = self.operator_hydration_row(lir_id, worker_id);
3065
3066 if retraction == insertion {
3067 return; }
3069
3070 let updates = retraction
3071 .map(|r| (r, Diff::MINUS_ONE))
3072 .into_iter()
3073 .chain(insertion.map(|r| (r, Diff::ONE)))
3074 .collect();
3075 self.send(IntrospectionType::ComputeOperatorHydrationStatus, updates);
3076 }
3077
3078 fn operator_hydration_row(&self, lir_id: LirId, worker_id: usize) -> Option<Row> {
3082 self.operators.get(&(lir_id, worker_id)).map(|hydrated| {
3083 Row::pack_slice(&[
3084 Datum::String(&self.collection_id.to_string()),
3085 Datum::UInt64(lir_id.into()),
3086 Datum::String(&self.replica_id.to_string()),
3087 Datum::UInt64(u64::cast_from(worker_id)),
3088 Datum::from(*hydrated),
3089 ])
3090 })
3091 }
3092
3093 fn observe_frontier(&mut self, write_frontier: &Antichain<T>) {
3095 if self.write_frontier == *write_frontier {
3096 return; }
3098
3099 let retraction = self.write_frontier_row();
3100 self.write_frontier.clone_from(write_frontier);
3101 let insertion = self.write_frontier_row();
3102
3103 let updates = vec![(retraction, Diff::MINUS_ONE), (insertion, Diff::ONE)];
3104 self.send(IntrospectionType::ReplicaFrontiers, updates);
3105 }
3106
3107 fn write_frontier_row(&self) -> Row {
3109 let write_frontier = self
3110 .write_frontier
3111 .as_option()
3112 .map_or(Datum::Null, |ts| ts.clone().into());
3113 Row::pack_slice(&[
3114 Datum::String(&self.collection_id.to_string()),
3115 Datum::String(&self.replica_id.to_string()),
3116 write_frontier,
3117 ])
3118 }
3119
3120 fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
3121 let _ = self.introspection_tx.send((introspection_type, updates));
3124 }
3125}
3126
3127impl<T: ComputeControllerTimestamp> Drop for ReplicaCollectionIntrospection<T> {
3128 fn drop(&mut self) {
3129 let operators: Vec<_> = self.operators.keys().collect();
3131 let updates: Vec<_> = operators
3132 .into_iter()
3133 .flat_map(|(lir_id, worker_id)| self.operator_hydration_row(*lir_id, *worker_id))
3134 .map(|r| (r, Diff::MINUS_ONE))
3135 .collect();
3136 if !updates.is_empty() {
3137 self.send(IntrospectionType::ComputeOperatorHydrationStatus, updates);
3138 }
3139
3140 let row = self.write_frontier_row();
3142 let updates = vec![(row, Diff::MINUS_ONE)];
3143 self.send(IntrospectionType::ReplicaFrontiers, updates);
3144 }
3145}