1use std::collections::{BTreeMap, BTreeSet};
32use std::sync::{Arc, Mutex};
33use std::time::Duration;
34
35use futures::stream::FuturesUnordered;
36use futures::{FutureExt, StreamExt};
37use mz_build_info::BuildInfo;
38use mz_cluster_client::client::ClusterReplicaLocation;
39use mz_cluster_client::metrics::ControllerMetrics;
40use mz_cluster_client::{ReplicaId, WallclockLagFn};
41use mz_compute_types::ComputeInstanceId;
42use mz_compute_types::config::ComputeReplicaConfig;
43use mz_compute_types::dataflows::DataflowDescription;
44use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
45use mz_dyncfg::ConfigSet;
46use mz_expr::RowSetFinishing;
47use mz_ore::cast::CastFrom;
48use mz_ore::collections::CollectionExt;
49use mz_ore::metrics::MetricsRegistry;
50use mz_ore::now::NowFn;
51use mz_ore::tracing::OpenTelemetryContext;
52use mz_persist_types::PersistLocation;
53use mz_repr::{Datum, GlobalId, RelationDesc, Row, TimestampManipulation};
54use mz_storage_client::controller::StorageController;
55use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
56use mz_storage_types::read_holds::ReadHold;
57use mz_storage_types::read_policy::ReadPolicy;
58use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
59use prometheus::proto::LabelPair;
60use serde::{Deserialize, Serialize};
61use timely::PartialOrder;
62use timely::progress::{Antichain, Timestamp};
63use tokio::sync::{mpsc, oneshot};
64use tokio::time::{self, MissedTickBehavior};
65use tracing::debug_span;
66use uuid::Uuid;
67
68use crate::controller::error::{
69 CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
70 HydrationCheckBadTarget, InstanceExists, InstanceMissing, PeekError, ReadPolicyError,
71 ReplicaCreationError, ReplicaDropError,
72};
73use crate::controller::instance::{Instance, SharedCollectionState};
74use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection_sink};
75use crate::controller::replica::ReplicaConfig;
76use crate::logging::{LogVariant, LoggingConfig};
77use crate::metrics::ComputeControllerMetrics;
78use crate::protocol::command::{ComputeParameters, PeekTarget};
79use crate::protocol::response::{PeekResponse, SubscribeBatch};
80use crate::service::{ComputeClient, ComputeGrpcClient};
81
82mod instance;
83mod introspection;
84mod replica;
85mod sequential_hydration;
86
87pub mod error;
88
89pub(crate) type StorageCollections<T> = Arc<
90 dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T> + Send + Sync,
91>;
92
93pub trait ComputeControllerTimestamp: TimestampManipulation + Into<Datum<'static>> + Sync {}
96
97impl ComputeControllerTimestamp for mz_repr::Timestamp {}
98
99#[derive(Debug)]
101pub enum ComputeControllerResponse<T> {
102 PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
104 SubscribeResponse(GlobalId, SubscribeBatch<T>),
106 CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
117 FrontierUpper {
122 id: GlobalId,
124 upper: Antichain<T>,
126 },
127}
128
129#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
131pub enum PeekNotification {
132 Success {
134 rows: u64,
136 result_size: u64,
138 },
139 Error(String),
141 Canceled,
143}
144
145impl PeekNotification {
146 fn new(peek_response: &PeekResponse, offset: usize, limit: Option<usize>) -> Self {
149 match peek_response {
150 PeekResponse::Rows(rows) => {
151 let num_rows = u64::cast_from(rows.count(offset, limit));
152 let result_size = u64::cast_from(rows.byte_len());
153
154 tracing::trace!(?num_rows, ?result_size, "inline peek result");
155
156 Self::Success {
157 rows: num_rows,
158 result_size,
159 }
160 }
161 PeekResponse::Stashed(stashed_response) => {
162 let rows = stashed_response.num_rows(offset, limit);
163 let result_size = stashed_response.size_bytes();
164
165 tracing::trace!(?rows, ?result_size, "stashed peek result");
166
167 Self::Success {
168 rows: u64::cast_from(rows),
169 result_size: u64::cast_from(result_size),
170 }
171 }
172 PeekResponse::Error(err) => Self::Error(err.clone()),
173 PeekResponse::Canceled => Self::Canceled,
174 }
175 }
176}
177
178pub struct ComputeController<T: ComputeControllerTimestamp> {
180 instances: BTreeMap<ComputeInstanceId, InstanceState<T>>,
181 instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>,
185 build_info: &'static BuildInfo,
186 storage_collections: StorageCollections<T>,
188 initialized: bool,
190 read_only: bool,
196 config: ComputeParameters,
198 peek_stash_persist_location: PersistLocation,
200 stashed_response: Option<ComputeControllerResponse<T>>,
202 metrics: ComputeControllerMetrics,
204 now: NowFn,
206 wallclock_lag: WallclockLagFn<T>,
208 dyncfg: Arc<ConfigSet>,
213
214 response_rx: mpsc::UnboundedReceiver<ComputeControllerResponse<T>>,
216 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
218 introspection_rx: Option<mpsc::UnboundedReceiver<IntrospectionUpdates>>,
223 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
225
226 maintenance_ticker: tokio::time::Interval,
228 maintenance_scheduled: bool,
230}
231
232impl<T: ComputeControllerTimestamp> ComputeController<T> {
233 pub fn new(
235 build_info: &'static BuildInfo,
236 storage_collections: StorageCollections<T>,
237 read_only: bool,
238 metrics_registry: &MetricsRegistry,
239 peek_stash_persist_location: PersistLocation,
240 controller_metrics: ControllerMetrics,
241 now: NowFn,
242 wallclock_lag: WallclockLagFn<T>,
243 ) -> Self {
244 let (response_tx, response_rx) = mpsc::unbounded_channel();
245 let (introspection_tx, introspection_rx) = mpsc::unbounded_channel();
246
247 let mut maintenance_ticker = time::interval(Duration::from_secs(1));
248 maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
249
250 let instance_workload_classes = Arc::new(Mutex::new(BTreeMap::<
251 ComputeInstanceId,
252 Option<String>,
253 >::new()));
254
255 metrics_registry.register_postprocessor({
259 let instance_workload_classes = Arc::clone(&instance_workload_classes);
260 move |metrics| {
261 let instance_workload_classes = instance_workload_classes
262 .lock()
263 .expect("lock poisoned")
264 .iter()
265 .map(|(id, workload_class)| (id.to_string(), workload_class.clone()))
266 .collect::<BTreeMap<String, Option<String>>>();
267 for metric in metrics {
268 'metric: for metric in metric.mut_metric() {
269 for label in metric.get_label() {
270 if label.get_name() == "instance_id" {
271 if let Some(workload_class) = instance_workload_classes
272 .get(label.get_value())
273 .cloned()
274 .flatten()
275 {
276 let mut label = LabelPair::default();
277 label.set_name("workload_class".into());
278 label.set_value(workload_class.clone());
279
280 let mut labels = metric.take_label();
281 labels.push(label);
282 metric.set_label(labels);
283 }
284 continue 'metric;
285 }
286 }
287 }
288 }
289 }
290 });
291
292 let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics);
293
294 Self {
295 instances: BTreeMap::new(),
296 instance_workload_classes,
297 build_info,
298 storage_collections,
299 initialized: false,
300 read_only,
301 config: Default::default(),
302 peek_stash_persist_location,
303 stashed_response: None,
304 metrics,
305 now,
306 wallclock_lag,
307 dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()),
308 response_rx,
309 response_tx,
310 introspection_rx: Some(introspection_rx),
311 introspection_tx,
312 maintenance_ticker,
313 maintenance_scheduled: false,
314 }
315 }
316
317 pub fn start_introspection_sink(
322 &mut self,
323 storage_controller: &dyn StorageController<Timestamp = T>,
324 ) {
325 if let Some(rx) = self.introspection_rx.take() {
326 spawn_introspection_sink(rx, storage_controller);
327 }
328 }
329
330 pub fn instance_exists(&self, id: ComputeInstanceId) -> bool {
332 self.instances.contains_key(&id)
333 }
334
335 fn instance(&self, id: ComputeInstanceId) -> Result<&InstanceState<T>, InstanceMissing> {
337 self.instances.get(&id).ok_or(InstanceMissing(id))
338 }
339
340 fn instance_mut(
342 &mut self,
343 id: ComputeInstanceId,
344 ) -> Result<&mut InstanceState<T>, InstanceMissing> {
345 self.instances.get_mut(&id).ok_or(InstanceMissing(id))
346 }
347
348 pub fn collection_ids(
350 &self,
351 instance_id: ComputeInstanceId,
352 ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
353 let instance = self.instance(instance_id)?;
354 let ids = instance.collections.keys().copied();
355 Ok(ids)
356 }
357
358 pub fn collection_frontiers(
363 &self,
364 collection_id: GlobalId,
365 instance_id: Option<ComputeInstanceId>,
366 ) -> Result<CollectionFrontiers<T>, CollectionLookupError> {
367 let collection = match instance_id {
368 Some(id) => self.instance(id)?.collection(collection_id)?,
369 None => self
370 .instances
371 .values()
372 .find_map(|i| i.collections.get(&collection_id))
373 .ok_or(CollectionMissing(collection_id))?,
374 };
375
376 Ok(collection.frontiers())
377 }
378
379 pub fn collection_reverse_dependencies(
381 &self,
382 instance_id: ComputeInstanceId,
383 id: GlobalId,
384 ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
385 let instance = self.instance(instance_id)?;
386 let collections = instance.collections.iter();
387 let ids = collections
388 .filter_map(move |(cid, c)| c.compute_dependencies.contains(&id).then_some(*cid));
389 Ok(ids)
390 }
391
392 pub async fn clusters_hydrated(&self, exclude_collections: &BTreeSet<GlobalId>) -> bool {
399 let instances = self.instances.iter();
400 let mut pending: FuturesUnordered<_> = instances
401 .map(|(id, instance)| {
402 let exclude_collections = exclude_collections.clone();
403 instance
404 .call_sync(move |i| i.collections_hydrated(&exclude_collections))
405 .map(move |x| (id, x))
406 })
407 .collect();
408
409 let mut result = true;
410 while let Some((id, hydrated)) = pending.next().await {
411 if !hydrated {
412 result = false;
413
414 tracing::info!("cluster {id} is not hydrated");
417 }
418 }
419
420 result
421 }
422
423 pub async fn collection_hydrated(
429 &self,
430 instance_id: ComputeInstanceId,
431 collection_id: GlobalId,
432 ) -> Result<bool, anyhow::Error> {
433 let instance = self.instance(instance_id)?;
434
435 let res = instance
436 .call_sync(move |i| i.collection_hydrated(collection_id))
437 .await?;
438
439 Ok(res)
440 }
441
442 pub fn collections_hydrated_for_replicas(
449 &self,
450 instance_id: ComputeInstanceId,
451 replicas: Vec<ReplicaId>,
452 exclude_collections: BTreeSet<GlobalId>,
453 ) -> Result<oneshot::Receiver<bool>, anyhow::Error> {
454 let instance = self.instance(instance_id)?;
455
456 if instance.replicas.is_empty() && !replicas.iter().any(|id| instance.replicas.contains(id))
458 {
459 return Err(HydrationCheckBadTarget(replicas).into());
460 }
461
462 let (tx, rx) = oneshot::channel();
463 instance.call(move |i| {
464 let result = i
465 .collections_hydrated_on_replicas(Some(replicas), &exclude_collections)
466 .expect("validated");
467 let _ = tx.send(result);
468 });
469
470 Ok(rx)
471 }
472
473 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
477 let Self {
484 instances,
485 instance_workload_classes,
486 build_info: _,
487 storage_collections: _,
488 initialized,
489 read_only,
490 config: _,
491 peek_stash_persist_location: _,
492 stashed_response,
493 metrics: _,
494 now: _,
495 wallclock_lag: _,
496 dyncfg: _,
497 response_rx: _,
498 response_tx: _,
499 introspection_rx: _,
500 introspection_tx: _,
501 maintenance_ticker: _,
502 maintenance_scheduled,
503 } = self;
504
505 let mut instances_dump = BTreeMap::new();
506 for (id, instance) in instances {
507 let dump = instance.dump().await?;
508 instances_dump.insert(id.to_string(), dump);
509 }
510
511 let instance_workload_classes: BTreeMap<_, _> = instance_workload_classes
512 .lock()
513 .expect("lock poisoned")
514 .iter()
515 .map(|(id, wc)| (id.to_string(), format!("{wc:?}")))
516 .collect();
517
518 fn field(
519 key: &str,
520 value: impl Serialize,
521 ) -> Result<(String, serde_json::Value), anyhow::Error> {
522 let value = serde_json::to_value(value)?;
523 Ok((key.to_string(), value))
524 }
525
526 let map = serde_json::Map::from_iter([
527 field("instances", instances_dump)?,
528 field("instance_workload_classes", instance_workload_classes)?,
529 field("initialized", initialized)?,
530 field("read_only", read_only)?,
531 field("stashed_response", format!("{stashed_response:?}"))?,
532 field("maintenance_scheduled", maintenance_scheduled)?,
533 ]);
534 Ok(serde_json::Value::Object(map))
535 }
536}
537
538impl<T> ComputeController<T>
539where
540 T: ComputeControllerTimestamp,
541 ComputeGrpcClient: ComputeClient<T>,
542{
543 pub fn create_instance(
545 &mut self,
546 id: ComputeInstanceId,
547 arranged_logs: BTreeMap<LogVariant, GlobalId>,
548 workload_class: Option<String>,
549 ) -> Result<(), InstanceExists> {
550 if self.instances.contains_key(&id) {
551 return Err(InstanceExists(id));
552 }
553
554 let mut collections = BTreeMap::new();
555 let mut logs = Vec::with_capacity(arranged_logs.len());
556 for (&log, &id) in &arranged_logs {
557 let collection = Collection::new_log();
558 let shared = collection.shared.clone();
559 collections.insert(id, collection);
560 logs.push((log, id, shared));
561 }
562
563 let client = instance::Client::spawn(
564 id,
565 self.build_info,
566 Arc::clone(&self.storage_collections),
567 self.peek_stash_persist_location.clone(),
568 logs,
569 self.metrics.for_instance(id),
570 self.now.clone(),
571 self.wallclock_lag.clone(),
572 Arc::clone(&self.dyncfg),
573 self.response_tx.clone(),
574 self.introspection_tx.clone(),
575 );
576
577 let instance = InstanceState::new(client, collections);
578 self.instances.insert(id, instance);
579
580 self.instance_workload_classes
581 .lock()
582 .expect("lock poisoned")
583 .insert(id, workload_class.clone());
584
585 let instance = self.instances.get_mut(&id).expect("instance just added");
586 if self.initialized {
587 instance.call(Instance::initialization_complete);
588 }
589
590 if !self.read_only {
591 instance.call(Instance::allow_writes);
592 }
593
594 let mut config_params = self.config.clone();
595 config_params.workload_class = Some(workload_class);
596 instance.call(|i| i.update_configuration(config_params));
597
598 Ok(())
599 }
600
601 pub fn update_instance_workload_class(
603 &mut self,
604 id: ComputeInstanceId,
605 workload_class: Option<String>,
606 ) -> Result<(), InstanceMissing> {
607 let _ = self.instance(id)?;
609
610 self.instance_workload_classes
611 .lock()
612 .expect("lock poisoned")
613 .insert(id, workload_class);
614
615 self.update_configuration(Default::default());
617
618 Ok(())
619 }
620
621 pub fn drop_instance(&mut self, id: ComputeInstanceId) {
627 if let Some(instance) = self.instances.remove(&id) {
628 instance.call(|i| i.shutdown());
629 }
630
631 self.instance_workload_classes
632 .lock()
633 .expect("lock poisoned")
634 .remove(&id);
635 }
636
637 pub fn dyncfg(&self) -> &Arc<ConfigSet> {
639 &self.dyncfg
640 }
641
642 pub fn update_configuration(&mut self, config_params: ComputeParameters) {
644 config_params.dyncfg_updates.apply(&self.dyncfg);
646
647 let instance_workload_classes = self
648 .instance_workload_classes
649 .lock()
650 .expect("lock poisoned");
651
652 for (id, instance) in self.instances.iter_mut() {
655 let mut params = config_params.clone();
656 params.workload_class = Some(instance_workload_classes[id].clone());
657 instance.call(|i| i.update_configuration(params));
658 }
659
660 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(&self.dyncfg);
661 match overflowing_behavior.parse() {
662 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
663 Err(err) => {
664 tracing::error!(
665 err,
666 overflowing_behavior,
667 "Invalid value for ore_overflowing_behavior"
668 );
669 }
670 }
671
672 self.config.update(config_params);
674 }
675
676 pub fn initialization_complete(&mut self) {
682 self.initialized = true;
683 for instance in self.instances.values_mut() {
684 instance.call(Instance::initialization_complete);
685 }
686 }
687
688 pub async fn ready(&mut self) {
696 if self.stashed_response.is_some() {
697 return;
699 }
700 if self.maintenance_scheduled {
701 return;
703 }
704
705 tokio::select! {
706 resp = self.response_rx.recv() => {
707 let resp = resp.expect("`self.response_tx` not dropped");
708 self.stashed_response = Some(resp);
709 }
710 _ = self.maintenance_ticker.tick() => {
711 self.maintenance_scheduled = true;
712 },
713 }
714 }
715
716 pub fn add_replica_to_instance(
718 &mut self,
719 instance_id: ComputeInstanceId,
720 replica_id: ReplicaId,
721 location: ClusterReplicaLocation,
722 config: ComputeReplicaConfig,
723 enable_ctp: bool,
724 ) -> Result<(), ReplicaCreationError> {
725 use ReplicaCreationError::*;
726
727 let instance = self.instance(instance_id)?;
728
729 if instance.replicas.contains(&replica_id) {
731 return Err(ReplicaExists(replica_id));
732 }
733
734 let (enable_logging, interval) = match config.logging.interval {
735 Some(interval) => (true, interval),
736 None => (false, Duration::from_secs(1)),
737 };
738
739 let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.dyncfg);
740
741 let replica_config = ReplicaConfig {
742 location,
743 logging: LoggingConfig {
744 interval,
745 enable_logging,
746 log_logging: config.logging.log_logging,
747 index_logs: Default::default(),
748 },
749 grpc_client: self.config.grpc_client.clone(),
750 expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
751 enable_ctp,
752 };
753
754 let instance = self.instance_mut(instance_id).expect("validated");
755 instance.replicas.insert(replica_id);
756
757 instance.call(move |i| {
758 i.add_replica(replica_id, replica_config, None)
759 .expect("validated")
760 });
761
762 Ok(())
763 }
764
765 pub fn drop_replica(
767 &mut self,
768 instance_id: ComputeInstanceId,
769 replica_id: ReplicaId,
770 ) -> Result<(), ReplicaDropError> {
771 use ReplicaDropError::*;
772
773 let instance = self.instance_mut(instance_id)?;
774
775 if !instance.replicas.contains(&replica_id) {
777 return Err(ReplicaMissing(replica_id));
778 }
779
780 instance.replicas.remove(&replica_id);
781
782 instance.call(move |i| i.remove_replica(replica_id).expect("validated"));
783
784 Ok(())
785 }
786
787 pub fn create_dataflow(
793 &mut self,
794 instance_id: ComputeInstanceId,
795 mut dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
796 subscribe_target_replica: Option<ReplicaId>,
797 ) -> Result<(), DataflowCreationError> {
798 use DataflowCreationError::*;
799
800 let instance = self.instance(instance_id)?;
801
802 if let Some(replica_id) = subscribe_target_replica {
804 if !instance.replicas.contains(&replica_id) {
805 return Err(ReplicaMissing(replica_id));
806 }
807 }
808
809 let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
811 if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
812 return Err(EmptyAsOfForSubscribe);
813 }
814 if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
815 return Err(EmptyAsOfForCopyTo);
816 }
817
818 let storage_ids = dataflow.imported_source_ids().collect();
820 let mut import_read_holds = self.storage_collections.acquire_read_holds(storage_ids)?;
821 for id in dataflow.imported_index_ids() {
822 let read_hold = instance.acquire_read_hold(id)?;
823 import_read_holds.push(read_hold);
824 }
825 for hold in &import_read_holds {
826 if PartialOrder::less_than(as_of, hold.since()) {
827 return Err(SinceViolation(hold.id()));
828 }
829 }
830
831 for id in dataflow.persist_sink_ids() {
833 if self.storage_collections.check_exists(id).is_err() {
834 return Err(CollectionMissing(id));
835 }
836 }
837 let time_dependence = self
838 .determine_time_dependence(instance_id, &dataflow)
839 .expect("must exist");
840
841 let instance = self.instance_mut(instance_id).expect("validated");
842
843 let mut shared_collection_state = BTreeMap::new();
844 for id in dataflow.export_ids() {
845 let shared = SharedCollectionState::new(as_of.clone());
846 let collection = Collection {
847 write_only: dataflow.sink_exports.contains_key(&id),
848 compute_dependencies: dataflow.imported_index_ids().collect(),
849 shared: shared.clone(),
850 time_dependence: time_dependence.clone(),
851 };
852 instance.collections.insert(id, collection);
853 shared_collection_state.insert(id, shared);
854 }
855
856 dataflow.time_dependence = time_dependence;
857
858 instance.call(move |i| {
859 i.create_dataflow(
860 dataflow,
861 import_read_holds,
862 subscribe_target_replica,
863 shared_collection_state,
864 )
865 .expect("validated")
866 });
867
868 Ok(())
869 }
870
871 pub fn drop_collections(
874 &mut self,
875 instance_id: ComputeInstanceId,
876 collection_ids: Vec<GlobalId>,
877 ) -> Result<(), CollectionUpdateError> {
878 let instance = self.instance_mut(instance_id)?;
879
880 for id in &collection_ids {
882 instance.collection(*id)?;
883 }
884
885 for id in &collection_ids {
886 instance.collections.remove(id);
887 }
888
889 instance.call(|i| i.drop_collections(collection_ids).expect("validated"));
890
891 Ok(())
892 }
893
894 pub fn peek(
896 &self,
897 instance_id: ComputeInstanceId,
898 peek_target: PeekTarget,
899 literal_constraints: Option<Vec<Row>>,
900 uuid: Uuid,
901 timestamp: T,
902 result_desc: RelationDesc,
903 finishing: RowSetFinishing,
904 map_filter_project: mz_expr::SafeMfpPlan,
905 target_replica: Option<ReplicaId>,
906 peek_response_tx: oneshot::Sender<PeekResponse>,
907 ) -> Result<(), PeekError> {
908 use PeekError::*;
909
910 let instance = self.instance(instance_id)?;
911
912 if let Some(replica_id) = target_replica {
914 if !instance.replicas.contains(&replica_id) {
915 return Err(ReplicaMissing(replica_id));
916 }
917 }
918
919 let read_hold = match &peek_target {
921 PeekTarget::Index { id } => instance.acquire_read_hold(*id)?,
922 PeekTarget::Persist { id, .. } => self
923 .storage_collections
924 .acquire_read_holds(vec![*id])?
925 .into_element(),
926 };
927 if !read_hold.since().less_equal(×tamp) {
928 return Err(SinceViolation(peek_target.id()));
929 }
930
931 instance.call(move |i| {
932 i.peek(
933 peek_target,
934 literal_constraints,
935 uuid,
936 timestamp,
937 result_desc,
938 finishing,
939 map_filter_project,
940 read_hold,
941 target_replica,
942 peek_response_tx,
943 )
944 .expect("validated")
945 });
946
947 Ok(())
948 }
949
950 pub fn cancel_peek(
960 &self,
961 instance_id: ComputeInstanceId,
962 uuid: Uuid,
963 reason: PeekResponse,
964 ) -> Result<(), InstanceMissing> {
965 self.instance(instance_id)?
966 .call(move |i| i.cancel_peek(uuid, reason));
967 Ok(())
968 }
969
970 pub fn set_read_policy(
982 &self,
983 instance_id: ComputeInstanceId,
984 policies: Vec<(GlobalId, ReadPolicy<T>)>,
985 ) -> Result<(), ReadPolicyError> {
986 use ReadPolicyError::*;
987
988 let instance = self.instance(instance_id)?;
989
990 for (id, _) in &policies {
992 let collection = instance.collection(*id)?;
993 if collection.write_only {
994 return Err(WriteOnlyCollection(*id));
995 }
996 }
997
998 self.instance(instance_id)?
999 .call(|i| i.set_read_policy(policies).expect("validated"));
1000
1001 Ok(())
1002 }
1003
1004 pub fn acquire_read_hold(
1006 &self,
1007 instance_id: ComputeInstanceId,
1008 collection_id: GlobalId,
1009 ) -> Result<ReadHold<T>, CollectionUpdateError> {
1010 let read_hold = self
1011 .instance(instance_id)?
1012 .acquire_read_hold(collection_id)?;
1013 Ok(read_hold)
1014 }
1015
1016 fn determine_time_dependence(
1018 &self,
1019 instance_id: ComputeInstanceId,
1020 dataflow: &DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1021 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
1022 let is_continual_task = dataflow.continual_task_ids().next().is_some();
1024 if is_continual_task {
1025 return Ok(None);
1026 }
1027
1028 let instance = self
1029 .instance(instance_id)
1030 .map_err(|err| TimeDependenceError::InstanceMissing(err.0))?;
1031 let mut time_dependencies = Vec::new();
1032
1033 for id in dataflow.imported_index_ids() {
1034 let dependence = instance
1035 .get_time_dependence(id)
1036 .map_err(|err| TimeDependenceError::CollectionMissing(err.0))?;
1037 time_dependencies.push(dependence);
1038 }
1039
1040 'source: for id in dataflow.imported_source_ids() {
1041 for instance in self.instances.values() {
1045 if let Ok(dependence) = instance.get_time_dependence(id) {
1046 time_dependencies.push(dependence);
1047 continue 'source;
1048 }
1049 }
1050
1051 time_dependencies.push(self.storage_collections.determine_time_dependence(id)?);
1053 }
1054
1055 Ok(TimeDependence::merge(
1056 time_dependencies,
1057 dataflow.refresh_schedule.as_ref(),
1058 ))
1059 }
1060
1061 #[mz_ore::instrument(level = "debug")]
1063 pub fn process(&mut self) -> Option<ComputeControllerResponse<T>> {
1064 if self.maintenance_scheduled {
1066 self.maintain();
1067 self.maintenance_scheduled = false;
1068 }
1069
1070 self.stashed_response.take()
1072 }
1073
1074 #[mz_ore::instrument(level = "debug")]
1075 fn maintain(&mut self) {
1076 for instance in self.instances.values_mut() {
1078 instance.call(Instance::maintain);
1079 }
1080 }
1081}
1082
1083#[derive(Debug)]
1084struct InstanceState<T: ComputeControllerTimestamp> {
1085 client: instance::Client<T>,
1086 replicas: BTreeSet<ReplicaId>,
1087 collections: BTreeMap<GlobalId, Collection<T>>,
1088}
1089
1090impl<T: ComputeControllerTimestamp> InstanceState<T> {
1091 fn new(client: instance::Client<T>, collections: BTreeMap<GlobalId, Collection<T>>) -> Self {
1092 Self {
1093 client,
1094 replicas: Default::default(),
1095 collections,
1096 }
1097 }
1098
1099 fn collection(&self, id: GlobalId) -> Result<&Collection<T>, CollectionMissing> {
1100 self.collections.get(&id).ok_or(CollectionMissing(id))
1101 }
1102
1103 fn call<F>(&self, f: F)
1104 where
1105 F: FnOnce(&mut Instance<T>) + Send + 'static,
1106 {
1107 let otel_ctx = OpenTelemetryContext::obtain();
1108 self.client
1109 .send(Box::new(move |instance| {
1110 let _span = debug_span!("instance::call").entered();
1111 otel_ctx.attach_as_parent();
1112
1113 f(instance)
1114 }))
1115 .expect("instance not dropped");
1116 }
1117
1118 async fn call_sync<F, R>(&self, f: F) -> R
1119 where
1120 F: FnOnce(&mut Instance<T>) -> R + Send + 'static,
1121 R: Send + 'static,
1122 {
1123 let (tx, rx) = oneshot::channel();
1124 let otel_ctx = OpenTelemetryContext::obtain();
1125 self.client
1126 .send(Box::new(move |instance| {
1127 let _span = debug_span!("instance::call_sync").entered();
1128 otel_ctx.attach_as_parent();
1129
1130 let result = f(instance);
1131 let _ = tx.send(result);
1132 }))
1133 .expect("instance not dropped");
1134
1135 rx.await.expect("instance not dropped")
1136 }
1137
1138 pub fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
1140 let collection = self.collection(id)?;
1150 let since = collection.shared.lock_read_capabilities(|caps| {
1151 let since = caps.frontier().to_owned();
1152 caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
1153 since
1154 });
1155
1156 let hold = ReadHold::new(id, since, self.client.read_hold_tx());
1157 Ok(hold)
1158 }
1159
1160 fn get_time_dependence(
1162 &self,
1163 id: GlobalId,
1164 ) -> Result<Option<TimeDependence>, CollectionMissing> {
1165 Ok(self.collection(id)?.time_dependence.clone())
1166 }
1167
1168 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1170 let Self {
1172 client: _,
1173 replicas,
1174 collections,
1175 } = self;
1176
1177 let instance = self.call_sync(|i| i.dump()).await?;
1178 let replicas: Vec<_> = replicas.iter().map(|id| id.to_string()).collect();
1179 let collections: BTreeMap<_, _> = collections
1180 .iter()
1181 .map(|(id, c)| (id.to_string(), format!("{c:?}")))
1182 .collect();
1183
1184 fn field(
1185 key: &str,
1186 value: impl Serialize,
1187 ) -> Result<(String, serde_json::Value), anyhow::Error> {
1188 let value = serde_json::to_value(value)?;
1189 Ok((key.to_string(), value))
1190 }
1191
1192 let map = serde_json::Map::from_iter([
1193 field("instance", instance)?,
1194 field("replicas", replicas)?,
1195 field("collections", collections)?,
1196 ]);
1197 Ok(serde_json::Value::Object(map))
1198 }
1199}
1200
1201#[derive(Debug)]
1202struct Collection<T> {
1203 write_only: bool,
1204 compute_dependencies: BTreeSet<GlobalId>,
1205 shared: SharedCollectionState<T>,
1206 time_dependence: Option<TimeDependence>,
1209}
1210
1211impl<T: Timestamp> Collection<T> {
1212 fn new_log() -> Self {
1213 let as_of = Antichain::from_elem(T::minimum());
1214 Self {
1215 write_only: false,
1216 compute_dependencies: Default::default(),
1217 shared: SharedCollectionState::new(as_of),
1218 time_dependence: Some(TimeDependence::default()),
1219 }
1220 }
1221
1222 fn frontiers(&self) -> CollectionFrontiers<T> {
1223 let read_frontier = self
1224 .shared
1225 .lock_read_capabilities(|c| c.frontier().to_owned());
1226 let write_frontier = self.shared.lock_write_frontier(|f| f.clone());
1227 CollectionFrontiers {
1228 read_frontier,
1229 write_frontier,
1230 }
1231 }
1232}
1233
1234#[derive(Clone, Debug)]
1236pub struct CollectionFrontiers<T> {
1237 pub read_frontier: Antichain<T>,
1239 pub write_frontier: Antichain<T>,
1241}
1242
1243impl<T: Timestamp> Default for CollectionFrontiers<T> {
1244 fn default() -> Self {
1245 Self {
1246 read_frontier: Antichain::from_elem(T::minimum()),
1247 write_frontier: Antichain::from_elem(T::minimum()),
1248 }
1249 }
1250}