1use std::collections::{BTreeMap, BTreeSet};
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use mz_build_info::BuildInfo;
40use mz_cluster_client::client::ClusterReplicaLocation;
41use mz_cluster_client::metrics::ControllerMetrics;
42use mz_cluster_client::{ReplicaId, WallclockLagFn};
43use mz_compute_types::ComputeInstanceId;
44use mz_compute_types::config::ComputeReplicaConfig;
45use mz_compute_types::dataflows::DataflowDescription;
46use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
47use mz_dyncfg::ConfigSet;
48use mz_expr::RowSetFinishing;
49use mz_ore::cast::CastFrom;
50use mz_ore::collections::CollectionExt;
51use mz_ore::metrics::MetricsRegistry;
52use mz_ore::now::NowFn;
53use mz_ore::tracing::OpenTelemetryContext;
54use mz_persist_types::PersistLocation;
55use mz_repr::{Datum, GlobalId, RelationDesc, Row, TimestampManipulation};
56use mz_storage_client::controller::StorageController;
57use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
58use mz_storage_types::read_holds::ReadHold;
59use mz_storage_types::read_policy::ReadPolicy;
60use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
61use prometheus::proto::LabelPair;
62use serde::{Deserialize, Serialize};
63use timely::PartialOrder;
64use timely::progress::{Antichain, Timestamp};
65use tokio::sync::{mpsc, oneshot};
66use tokio::time::{self, MissedTickBehavior};
67use uuid::Uuid;
68
69use crate::controller::error::{
70 CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
71 HydrationCheckBadTarget, InstanceExists, InstanceMissing, PeekError, ReadPolicyError,
72 ReplicaCreationError, ReplicaDropError,
73};
74use crate::controller::instance::{Instance, SharedCollectionState};
75use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection_sink};
76use crate::controller::replica::ReplicaConfig;
77use crate::logging::{LogVariant, LoggingConfig};
78use crate::metrics::ComputeControllerMetrics;
79use crate::protocol::command::{ComputeParameters, PeekTarget};
80use crate::protocol::response::{PeekResponse, SubscribeBatch};
81
82mod introspection;
83mod replica;
84mod sequential_hydration;
85
86pub mod error;
87pub mod instance;
88
89pub(crate) type StorageCollections<T> = Arc<
90 dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T> + Send + Sync,
91>;
92
93pub trait ComputeControllerTimestamp: TimestampManipulation + Into<Datum<'static>> + Sync {}
96
97impl ComputeControllerTimestamp for mz_repr::Timestamp {}
98
99#[derive(Debug)]
101pub enum ComputeControllerResponse<T> {
102 PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
104 SubscribeResponse(GlobalId, SubscribeBatch<T>),
106 CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
117 FrontierUpper {
122 id: GlobalId,
124 upper: Antichain<T>,
126 },
127}
128
129#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
131pub enum PeekNotification {
132 Success {
134 rows: u64,
136 result_size: u64,
138 },
139 Error(String),
141 Canceled,
143}
144
145impl PeekNotification {
146 fn new(peek_response: &PeekResponse, offset: usize, limit: Option<usize>) -> Self {
149 match peek_response {
150 PeekResponse::Rows(rows) => {
151 let num_rows = u64::cast_from(rows.count(offset, limit));
152 let result_size = u64::cast_from(rows.byte_len());
153
154 tracing::trace!(?num_rows, ?result_size, "inline peek result");
155
156 Self::Success {
157 rows: num_rows,
158 result_size,
159 }
160 }
161 PeekResponse::Stashed(stashed_response) => {
162 let rows = stashed_response.num_rows(offset, limit);
163 let result_size = stashed_response.size_bytes();
164
165 tracing::trace!(?rows, ?result_size, "stashed peek result");
166
167 Self::Success {
168 rows: u64::cast_from(rows),
169 result_size: u64::cast_from(result_size),
170 }
171 }
172 PeekResponse::Error(err) => Self::Error(err.clone()),
173 PeekResponse::Canceled => Self::Canceled,
174 }
175 }
176}
177
178pub struct ComputeController<T: ComputeControllerTimestamp> {
180 instances: BTreeMap<ComputeInstanceId, InstanceState<T>>,
181 instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>,
185 build_info: &'static BuildInfo,
186 storage_collections: StorageCollections<T>,
188 initialized: bool,
190 read_only: bool,
196 config: ComputeParameters,
198 peek_stash_persist_location: PersistLocation,
200 stashed_response: Option<ComputeControllerResponse<T>>,
202 metrics: ComputeControllerMetrics,
204 now: NowFn,
206 wallclock_lag: WallclockLagFn<T>,
208 dyncfg: Arc<ConfigSet>,
213
214 response_rx: mpsc::UnboundedReceiver<ComputeControllerResponse<T>>,
216 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
218 introspection_rx: Option<mpsc::UnboundedReceiver<IntrospectionUpdates>>,
223 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
225
226 maintenance_ticker: tokio::time::Interval,
228 maintenance_scheduled: bool,
230}
231
232impl<T: ComputeControllerTimestamp> ComputeController<T> {
233 pub fn new(
235 build_info: &'static BuildInfo,
236 storage_collections: StorageCollections<T>,
237 read_only: bool,
238 metrics_registry: &MetricsRegistry,
239 peek_stash_persist_location: PersistLocation,
240 controller_metrics: ControllerMetrics,
241 now: NowFn,
242 wallclock_lag: WallclockLagFn<T>,
243 ) -> Self {
244 let (response_tx, response_rx) = mpsc::unbounded_channel();
245 let (introspection_tx, introspection_rx) = mpsc::unbounded_channel();
246
247 let mut maintenance_ticker = time::interval(Duration::from_secs(1));
248 maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
249
250 let instance_workload_classes = Arc::new(Mutex::new(BTreeMap::<
251 ComputeInstanceId,
252 Option<String>,
253 >::new()));
254
255 metrics_registry.register_postprocessor({
259 let instance_workload_classes = Arc::clone(&instance_workload_classes);
260 move |metrics| {
261 let instance_workload_classes = instance_workload_classes
262 .lock()
263 .expect("lock poisoned")
264 .iter()
265 .map(|(id, workload_class)| (id.to_string(), workload_class.clone()))
266 .collect::<BTreeMap<String, Option<String>>>();
267 for metric in metrics {
268 'metric: for metric in metric.mut_metric() {
269 for label in metric.get_label() {
270 if label.name() == "instance_id" {
271 if let Some(workload_class) = instance_workload_classes
272 .get(label.value())
273 .cloned()
274 .flatten()
275 {
276 let mut label = LabelPair::default();
277 label.set_name("workload_class".into());
278 label.set_value(workload_class.clone());
279
280 let mut labels = metric.take_label();
281 labels.push(label);
282 metric.set_label(labels);
283 }
284 continue 'metric;
285 }
286 }
287 }
288 }
289 }
290 });
291
292 let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics);
293
294 Self {
295 instances: BTreeMap::new(),
296 instance_workload_classes,
297 build_info,
298 storage_collections,
299 initialized: false,
300 read_only,
301 config: Default::default(),
302 peek_stash_persist_location,
303 stashed_response: None,
304 metrics,
305 now,
306 wallclock_lag,
307 dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()),
308 response_rx,
309 response_tx,
310 introspection_rx: Some(introspection_rx),
311 introspection_tx,
312 maintenance_ticker,
313 maintenance_scheduled: false,
314 }
315 }
316
317 pub fn start_introspection_sink(
322 &mut self,
323 storage_controller: &dyn StorageController<Timestamp = T>,
324 ) {
325 if let Some(rx) = self.introspection_rx.take() {
326 spawn_introspection_sink(rx, storage_controller);
327 }
328 }
329
330 pub fn instance_exists(&self, id: ComputeInstanceId) -> bool {
332 self.instances.contains_key(&id)
333 }
334
335 fn instance(&self, id: ComputeInstanceId) -> Result<&InstanceState<T>, InstanceMissing> {
337 self.instances.get(&id).ok_or(InstanceMissing(id))
338 }
339
340 pub fn instance_client(
342 &self,
343 id: ComputeInstanceId,
344 ) -> Result<instance::Client<T>, InstanceMissing> {
345 self.instance(id).map(|instance| instance.client.clone())
346 }
347
348 fn instance_mut(
350 &mut self,
351 id: ComputeInstanceId,
352 ) -> Result<&mut InstanceState<T>, InstanceMissing> {
353 self.instances.get_mut(&id).ok_or(InstanceMissing(id))
354 }
355
356 pub fn collection_ids(
358 &self,
359 instance_id: ComputeInstanceId,
360 ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
361 let instance = self.instance(instance_id)?;
362 let ids = instance.collections.keys().copied();
363 Ok(ids)
364 }
365
366 pub fn collection_frontiers(
371 &self,
372 collection_id: GlobalId,
373 instance_id: Option<ComputeInstanceId>,
374 ) -> Result<CollectionFrontiers<T>, CollectionLookupError> {
375 let collection = match instance_id {
376 Some(id) => self.instance(id)?.collection(collection_id)?,
377 None => self
378 .instances
379 .values()
380 .find_map(|i| i.collections.get(&collection_id))
381 .ok_or(CollectionMissing(collection_id))?,
382 };
383
384 Ok(collection.frontiers())
385 }
386
387 pub fn collection_reverse_dependencies(
389 &self,
390 instance_id: ComputeInstanceId,
391 id: GlobalId,
392 ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
393 let instance = self.instance(instance_id)?;
394 let collections = instance.collections.iter();
395 let ids = collections
396 .filter_map(move |(cid, c)| c.compute_dependencies.contains(&id).then_some(*cid));
397 Ok(ids)
398 }
399
400 pub async fn collection_hydrated(
406 &self,
407 instance_id: ComputeInstanceId,
408 collection_id: GlobalId,
409 ) -> Result<bool, anyhow::Error> {
410 let instance = self.instance(instance_id)?;
411
412 let res = instance
413 .call_sync(move |i| i.collection_hydrated(collection_id))
414 .await?;
415
416 Ok(res)
417 }
418
419 pub fn collections_hydrated_for_replicas(
426 &self,
427 instance_id: ComputeInstanceId,
428 replicas: Vec<ReplicaId>,
429 exclude_collections: BTreeSet<GlobalId>,
430 ) -> Result<oneshot::Receiver<bool>, anyhow::Error> {
431 let instance = self.instance(instance_id)?;
432
433 if instance.replicas.is_empty() && !replicas.iter().any(|id| instance.replicas.contains(id))
435 {
436 return Err(HydrationCheckBadTarget(replicas).into());
437 }
438
439 let (tx, rx) = oneshot::channel();
440 instance.call(move |i| {
441 let result = i
442 .collections_hydrated_on_replicas(Some(replicas), &exclude_collections)
443 .expect("validated");
444 let _ = tx.send(result);
445 });
446
447 Ok(rx)
448 }
449
450 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
454 let Self {
461 instances,
462 instance_workload_classes,
463 build_info: _,
464 storage_collections: _,
465 initialized,
466 read_only,
467 config: _,
468 peek_stash_persist_location: _,
469 stashed_response,
470 metrics: _,
471 now: _,
472 wallclock_lag: _,
473 dyncfg: _,
474 response_rx: _,
475 response_tx: _,
476 introspection_rx: _,
477 introspection_tx: _,
478 maintenance_ticker: _,
479 maintenance_scheduled,
480 } = self;
481
482 let mut instances_dump = BTreeMap::new();
483 for (id, instance) in instances {
484 let dump = instance.dump().await?;
485 instances_dump.insert(id.to_string(), dump);
486 }
487
488 let instance_workload_classes: BTreeMap<_, _> = instance_workload_classes
489 .lock()
490 .expect("lock poisoned")
491 .iter()
492 .map(|(id, wc)| (id.to_string(), format!("{wc:?}")))
493 .collect();
494
495 fn field(
496 key: &str,
497 value: impl Serialize,
498 ) -> Result<(String, serde_json::Value), anyhow::Error> {
499 let value = serde_json::to_value(value)?;
500 Ok((key.to_string(), value))
501 }
502
503 let map = serde_json::Map::from_iter([
504 field("instances", instances_dump)?,
505 field("instance_workload_classes", instance_workload_classes)?,
506 field("initialized", initialized)?,
507 field("read_only", read_only)?,
508 field("stashed_response", format!("{stashed_response:?}"))?,
509 field("maintenance_scheduled", maintenance_scheduled)?,
510 ]);
511 Ok(serde_json::Value::Object(map))
512 }
513}
514
515impl<T> ComputeController<T>
516where
517 T: ComputeControllerTimestamp,
518{
519 pub fn create_instance(
521 &mut self,
522 id: ComputeInstanceId,
523 arranged_logs: BTreeMap<LogVariant, GlobalId>,
524 workload_class: Option<String>,
525 ) -> Result<(), InstanceExists> {
526 if self.instances.contains_key(&id) {
527 return Err(InstanceExists(id));
528 }
529
530 let mut collections = BTreeMap::new();
531 let mut logs = Vec::with_capacity(arranged_logs.len());
532 for (&log, &id) in &arranged_logs {
533 let collection = Collection::new_log();
534 let shared = collection.shared.clone();
535 collections.insert(id, collection);
536 logs.push((log, id, shared));
537 }
538
539 let client = instance::Client::spawn(
540 id,
541 self.build_info,
542 Arc::clone(&self.storage_collections),
543 self.peek_stash_persist_location.clone(),
544 logs,
545 self.metrics.for_instance(id),
546 self.now.clone(),
547 self.wallclock_lag.clone(),
548 Arc::clone(&self.dyncfg),
549 self.response_tx.clone(),
550 self.introspection_tx.clone(),
551 self.read_only,
552 );
553
554 let instance = InstanceState::new(client, collections);
555 self.instances.insert(id, instance);
556
557 self.instance_workload_classes
558 .lock()
559 .expect("lock poisoned")
560 .insert(id, workload_class.clone());
561
562 let instance = self.instances.get_mut(&id).expect("instance just added");
563 if self.initialized {
564 instance.call(Instance::initialization_complete);
565 }
566
567 let mut config_params = self.config.clone();
568 config_params.workload_class = Some(workload_class);
569 instance.call(|i| i.update_configuration(config_params));
570
571 Ok(())
572 }
573
574 pub fn update_instance_workload_class(
576 &mut self,
577 id: ComputeInstanceId,
578 workload_class: Option<String>,
579 ) -> Result<(), InstanceMissing> {
580 let _ = self.instance(id)?;
582
583 self.instance_workload_classes
584 .lock()
585 .expect("lock poisoned")
586 .insert(id, workload_class);
587
588 self.update_configuration(Default::default());
590
591 Ok(())
592 }
593
594 pub fn drop_instance(&mut self, id: ComputeInstanceId) {
600 if let Some(instance) = self.instances.remove(&id) {
601 instance.call(|i| i.shutdown());
602 }
603
604 self.instance_workload_classes
605 .lock()
606 .expect("lock poisoned")
607 .remove(&id);
608 }
609
610 pub fn dyncfg(&self) -> &Arc<ConfigSet> {
612 &self.dyncfg
613 }
614
615 pub fn update_configuration(&mut self, config_params: ComputeParameters) {
617 config_params.dyncfg_updates.apply(&self.dyncfg);
619
620 let instance_workload_classes = self
621 .instance_workload_classes
622 .lock()
623 .expect("lock poisoned");
624
625 for (id, instance) in self.instances.iter_mut() {
628 let mut params = config_params.clone();
629 params.workload_class = Some(instance_workload_classes[id].clone());
630 instance.call(|i| i.update_configuration(params));
631 }
632
633 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(&self.dyncfg);
634 match overflowing_behavior.parse() {
635 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
636 Err(err) => {
637 tracing::error!(
638 err,
639 overflowing_behavior,
640 "Invalid value for ore_overflowing_behavior"
641 );
642 }
643 }
644
645 self.config.update(config_params);
647 }
648
649 pub fn initialization_complete(&mut self) {
655 self.initialized = true;
656 for instance in self.instances.values_mut() {
657 instance.call(Instance::initialization_complete);
658 }
659 }
660
661 pub async fn ready(&mut self) {
669 if self.stashed_response.is_some() {
670 return;
672 }
673 if self.maintenance_scheduled {
674 return;
676 }
677
678 tokio::select! {
679 resp = self.response_rx.recv() => {
680 let resp = resp.expect("`self.response_tx` not dropped");
681 self.stashed_response = Some(resp);
682 }
683 _ = self.maintenance_ticker.tick() => {
684 self.maintenance_scheduled = true;
685 },
686 }
687 }
688
689 pub fn add_replica_to_instance(
691 &mut self,
692 instance_id: ComputeInstanceId,
693 replica_id: ReplicaId,
694 location: ClusterReplicaLocation,
695 config: ComputeReplicaConfig,
696 ) -> Result<(), ReplicaCreationError> {
697 use ReplicaCreationError::*;
698
699 let instance = self.instance(instance_id)?;
700
701 if instance.replicas.contains(&replica_id) {
703 return Err(ReplicaExists(replica_id));
704 }
705
706 let (enable_logging, interval) = match config.logging.interval {
707 Some(interval) => (true, interval),
708 None => (false, Duration::from_secs(1)),
709 };
710
711 let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.dyncfg);
712
713 let replica_config = ReplicaConfig {
714 location,
715 logging: LoggingConfig {
716 interval,
717 enable_logging,
718 log_logging: config.logging.log_logging,
719 index_logs: Default::default(),
720 },
721 grpc_client: self.config.grpc_client.clone(),
722 expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
723 };
724
725 let instance = self.instance_mut(instance_id).expect("validated");
726 instance.replicas.insert(replica_id);
727
728 instance.call(move |i| {
729 i.add_replica(replica_id, replica_config, None)
730 .expect("validated")
731 });
732
733 Ok(())
734 }
735
736 pub fn drop_replica(
738 &mut self,
739 instance_id: ComputeInstanceId,
740 replica_id: ReplicaId,
741 ) -> Result<(), ReplicaDropError> {
742 use ReplicaDropError::*;
743
744 let instance = self.instance_mut(instance_id)?;
745
746 if !instance.replicas.contains(&replica_id) {
748 return Err(ReplicaMissing(replica_id));
749 }
750
751 instance.replicas.remove(&replica_id);
752
753 instance.call(move |i| i.remove_replica(replica_id).expect("validated"));
754
755 Ok(())
756 }
757
758 pub fn create_dataflow(
764 &mut self,
765 instance_id: ComputeInstanceId,
766 mut dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
767 subscribe_target_replica: Option<ReplicaId>,
768 ) -> Result<(), DataflowCreationError> {
769 use DataflowCreationError::*;
770
771 let instance = self.instance(instance_id)?;
772
773 if let Some(replica_id) = subscribe_target_replica {
775 if !instance.replicas.contains(&replica_id) {
776 return Err(ReplicaMissing(replica_id));
777 }
778 }
779
780 let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
782 if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
783 return Err(EmptyAsOfForSubscribe);
784 }
785 if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
786 return Err(EmptyAsOfForCopyTo);
787 }
788
789 let storage_ids = dataflow.imported_source_ids().collect();
791 let mut import_read_holds = self.storage_collections.acquire_read_holds(storage_ids)?;
792 for id in dataflow.imported_index_ids() {
793 let read_hold = instance.acquire_read_hold(id)?;
794 import_read_holds.push(read_hold);
795 }
796 for hold in &import_read_holds {
797 if PartialOrder::less_than(as_of, hold.since()) {
798 return Err(SinceViolation(hold.id()));
799 }
800 }
801
802 for id in dataflow.persist_sink_ids() {
804 if self.storage_collections.check_exists(id).is_err() {
805 return Err(CollectionMissing(id));
806 }
807 }
808 let time_dependence = self
809 .determine_time_dependence(instance_id, &dataflow)
810 .expect("must exist");
811
812 let instance = self.instance_mut(instance_id).expect("validated");
813
814 let mut shared_collection_state = BTreeMap::new();
815 for id in dataflow.export_ids() {
816 let shared = SharedCollectionState::new(as_of.clone());
817 let collection = Collection {
818 write_only: dataflow.sink_exports.contains_key(&id),
819 compute_dependencies: dataflow.imported_index_ids().collect(),
820 shared: shared.clone(),
821 time_dependence: time_dependence.clone(),
822 };
823 instance.collections.insert(id, collection);
824 shared_collection_state.insert(id, shared);
825 }
826
827 dataflow.time_dependence = time_dependence;
828
829 instance.call(move |i| {
830 i.create_dataflow(
831 dataflow,
832 import_read_holds,
833 subscribe_target_replica,
834 shared_collection_state,
835 )
836 .expect("validated")
837 });
838
839 Ok(())
840 }
841
842 pub fn drop_collections(
845 &mut self,
846 instance_id: ComputeInstanceId,
847 collection_ids: Vec<GlobalId>,
848 ) -> Result<(), CollectionUpdateError> {
849 let instance = self.instance_mut(instance_id)?;
850
851 for id in &collection_ids {
853 instance.collection(*id)?;
854 }
855
856 for id in &collection_ids {
857 instance.collections.remove(id);
858 }
859
860 instance.call(|i| i.drop_collections(collection_ids).expect("validated"));
861
862 Ok(())
863 }
864
865 pub fn peek(
867 &self,
868 instance_id: ComputeInstanceId,
869 peek_target: PeekTarget,
870 literal_constraints: Option<Vec<Row>>,
871 uuid: Uuid,
872 timestamp: T,
873 result_desc: RelationDesc,
874 finishing: RowSetFinishing,
875 map_filter_project: mz_expr::SafeMfpPlan,
876 target_replica: Option<ReplicaId>,
877 peek_response_tx: oneshot::Sender<PeekResponse>,
878 ) -> Result<(), PeekError> {
879 use PeekError::*;
880
881 let instance = self.instance(instance_id)?;
882
883 if let Some(replica_id) = target_replica {
885 if !instance.replicas.contains(&replica_id) {
886 return Err(ReplicaMissing(replica_id));
887 }
888 }
889
890 let read_hold = match &peek_target {
892 PeekTarget::Index { id } => instance.acquire_read_hold(*id)?,
893 PeekTarget::Persist { id, .. } => self
894 .storage_collections
895 .acquire_read_holds(vec![*id])?
896 .into_element(),
897 };
898 if !read_hold.since().less_equal(×tamp) {
899 return Err(SinceViolation(peek_target.id()));
900 }
901
902 instance.call(move |i| {
903 i.peek(
904 peek_target,
905 literal_constraints,
906 uuid,
907 timestamp,
908 result_desc,
909 finishing,
910 map_filter_project,
911 read_hold,
912 target_replica,
913 peek_response_tx,
914 )
915 .expect("validated")
916 });
917
918 Ok(())
919 }
920
921 pub fn cancel_peek(
931 &self,
932 instance_id: ComputeInstanceId,
933 uuid: Uuid,
934 reason: PeekResponse,
935 ) -> Result<(), InstanceMissing> {
936 self.instance(instance_id)?
937 .call(move |i| i.cancel_peek(uuid, reason));
938 Ok(())
939 }
940
941 pub fn set_read_policy(
953 &self,
954 instance_id: ComputeInstanceId,
955 policies: Vec<(GlobalId, ReadPolicy<T>)>,
956 ) -> Result<(), ReadPolicyError> {
957 use ReadPolicyError::*;
958
959 let instance = self.instance(instance_id)?;
960
961 for (id, _) in &policies {
963 let collection = instance.collection(*id)?;
964 if collection.write_only {
965 return Err(WriteOnlyCollection(*id));
966 }
967 }
968
969 self.instance(instance_id)?
970 .call(|i| i.set_read_policy(policies).expect("validated"));
971
972 Ok(())
973 }
974
975 pub fn acquire_read_hold(
977 &self,
978 instance_id: ComputeInstanceId,
979 collection_id: GlobalId,
980 ) -> Result<ReadHold<T>, CollectionUpdateError> {
981 let read_hold = self
982 .instance(instance_id)?
983 .acquire_read_hold(collection_id)?;
984 Ok(read_hold)
985 }
986
987 fn determine_time_dependence(
989 &self,
990 instance_id: ComputeInstanceId,
991 dataflow: &DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
992 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
993 let is_continual_task = dataflow.continual_task_ids().next().is_some();
995 if is_continual_task {
996 return Ok(None);
997 }
998
999 let instance = self
1000 .instance(instance_id)
1001 .map_err(|err| TimeDependenceError::InstanceMissing(err.0))?;
1002 let mut time_dependencies = Vec::new();
1003
1004 for id in dataflow.imported_index_ids() {
1005 let dependence = instance
1006 .get_time_dependence(id)
1007 .map_err(|err| TimeDependenceError::CollectionMissing(err.0))?;
1008 time_dependencies.push(dependence);
1009 }
1010
1011 'source: for id in dataflow.imported_source_ids() {
1012 for instance in self.instances.values() {
1016 if let Ok(dependence) = instance.get_time_dependence(id) {
1017 time_dependencies.push(dependence);
1018 continue 'source;
1019 }
1020 }
1021
1022 time_dependencies.push(self.storage_collections.determine_time_dependence(id)?);
1024 }
1025
1026 Ok(TimeDependence::merge(
1027 time_dependencies,
1028 dataflow.refresh_schedule.as_ref(),
1029 ))
1030 }
1031
1032 #[mz_ore::instrument(level = "debug")]
1034 pub fn process(&mut self) -> Option<ComputeControllerResponse<T>> {
1035 if self.maintenance_scheduled {
1037 self.maintain();
1038 self.maintenance_scheduled = false;
1039 }
1040
1041 self.stashed_response.take()
1043 }
1044
1045 #[mz_ore::instrument(level = "debug")]
1046 fn maintain(&mut self) {
1047 for instance in self.instances.values_mut() {
1049 instance.call(Instance::maintain);
1050 }
1051 }
1052
1053 pub fn allow_writes(
1057 &mut self,
1058 instance_id: ComputeInstanceId,
1059 collection_id: GlobalId,
1060 ) -> Result<(), CollectionUpdateError> {
1061 if self.read_only {
1062 tracing::debug!("Skipping allow_writes in read-only mode");
1063 return Ok(());
1064 }
1065
1066 let instance = self.instance_mut(instance_id)?;
1067
1068 instance.collection(collection_id)?;
1070
1071 instance.call(move |i| i.allow_writes(collection_id).expect("validated"));
1072
1073 Ok(())
1074 }
1075}
1076
1077#[derive(Debug)]
1078struct InstanceState<T: ComputeControllerTimestamp> {
1079 client: instance::Client<T>,
1080 replicas: BTreeSet<ReplicaId>,
1081 collections: BTreeMap<GlobalId, Collection<T>>,
1082}
1083
1084impl<T: ComputeControllerTimestamp> InstanceState<T> {
1085 fn new(client: instance::Client<T>, collections: BTreeMap<GlobalId, Collection<T>>) -> Self {
1086 Self {
1087 client,
1088 replicas: Default::default(),
1089 collections,
1090 }
1091 }
1092
1093 fn collection(&self, id: GlobalId) -> Result<&Collection<T>, CollectionMissing> {
1094 self.collections.get(&id).ok_or(CollectionMissing(id))
1095 }
1096
1097 fn call<F>(&self, f: F)
1103 where
1104 F: FnOnce(&mut Instance<T>) + Send + 'static,
1105 {
1106 self.client.call(f).expect("instance not dropped")
1107 }
1108
1109 async fn call_sync<F, R>(&self, f: F) -> R
1115 where
1116 F: FnOnce(&mut Instance<T>) -> R + Send + 'static,
1117 R: Send + 'static,
1118 {
1119 self.client
1120 .call_sync(f)
1121 .await
1122 .expect("instance not dropped")
1123 }
1124
1125 pub fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
1127 let collection = self.collection(id)?;
1137 let since = collection.shared.lock_read_capabilities(|caps| {
1138 let since = caps.frontier().to_owned();
1139 caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
1140 since
1141 });
1142
1143 let hold = ReadHold::new(id, since, self.client.read_hold_tx());
1144 Ok(hold)
1145 }
1146
1147 fn get_time_dependence(
1149 &self,
1150 id: GlobalId,
1151 ) -> Result<Option<TimeDependence>, CollectionMissing> {
1152 Ok(self.collection(id)?.time_dependence.clone())
1153 }
1154
1155 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1157 let Self {
1159 client: _,
1160 replicas,
1161 collections,
1162 } = self;
1163
1164 let instance = self.call_sync(|i| i.dump()).await?;
1165 let replicas: Vec<_> = replicas.iter().map(|id| id.to_string()).collect();
1166 let collections: BTreeMap<_, _> = collections
1167 .iter()
1168 .map(|(id, c)| (id.to_string(), format!("{c:?}")))
1169 .collect();
1170
1171 fn field(
1172 key: &str,
1173 value: impl Serialize,
1174 ) -> Result<(String, serde_json::Value), anyhow::Error> {
1175 let value = serde_json::to_value(value)?;
1176 Ok((key.to_string(), value))
1177 }
1178
1179 let map = serde_json::Map::from_iter([
1180 field("instance", instance)?,
1181 field("replicas", replicas)?,
1182 field("collections", collections)?,
1183 ]);
1184 Ok(serde_json::Value::Object(map))
1185 }
1186}
1187
1188#[derive(Debug)]
1189struct Collection<T> {
1190 write_only: bool,
1191 compute_dependencies: BTreeSet<GlobalId>,
1192 shared: SharedCollectionState<T>,
1193 time_dependence: Option<TimeDependence>,
1196}
1197
1198impl<T: Timestamp> Collection<T> {
1199 fn new_log() -> Self {
1200 let as_of = Antichain::from_elem(T::minimum());
1201 Self {
1202 write_only: false,
1203 compute_dependencies: Default::default(),
1204 shared: SharedCollectionState::new(as_of),
1205 time_dependence: Some(TimeDependence::default()),
1206 }
1207 }
1208
1209 fn frontiers(&self) -> CollectionFrontiers<T> {
1210 let read_frontier = self
1211 .shared
1212 .lock_read_capabilities(|c| c.frontier().to_owned());
1213 let write_frontier = self.shared.lock_write_frontier(|f| f.clone());
1214 CollectionFrontiers {
1215 read_frontier,
1216 write_frontier,
1217 }
1218 }
1219}
1220
1221#[derive(Clone, Debug)]
1223pub struct CollectionFrontiers<T> {
1224 pub read_frontier: Antichain<T>,
1226 pub write_frontier: Antichain<T>,
1228}
1229
1230impl<T: Timestamp> Default for CollectionFrontiers<T> {
1231 fn default() -> Self {
1232 Self {
1233 read_frontier: Antichain::from_elem(T::minimum()),
1234 write_frontier: Antichain::from_elem(T::minimum()),
1235 }
1236 }
1237}