1use std::collections::{BTreeMap, BTreeSet};
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use mz_build_info::BuildInfo;
40use mz_cluster_client::client::ClusterReplicaLocation;
41use mz_cluster_client::metrics::ControllerMetrics;
42use mz_cluster_client::{ReplicaId, WallclockLagFn};
43use mz_compute_types::ComputeInstanceId;
44use mz_compute_types::config::ComputeReplicaConfig;
45use mz_compute_types::dataflows::DataflowDescription;
46use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
47use mz_dyncfg::ConfigSet;
48use mz_expr::RowSetFinishing;
49use mz_expr::row::RowCollection;
50use mz_ore::cast::CastFrom;
51use mz_ore::collections::CollectionExt;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::NowFn;
54use mz_ore::tracing::OpenTelemetryContext;
55use mz_persist_types::PersistLocation;
56use mz_repr::{Datum, GlobalId, RelationDesc, Row, TimestampManipulation};
57use mz_storage_client::controller::StorageController;
58use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
59use mz_storage_types::read_holds::ReadHold;
60use mz_storage_types::read_policy::ReadPolicy;
61use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
62use prometheus::proto::LabelPair;
63use serde::{Deserialize, Serialize};
64use timely::PartialOrder;
65use timely::progress::{Antichain, Timestamp};
66use tokio::sync::{mpsc, oneshot};
67use tokio::time::{self, MissedTickBehavior};
68use uuid::Uuid;
69
70use crate::controller::error::{
71 CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
72 HydrationCheckBadTarget, InstanceExists, InstanceMissing, PeekError, ReadPolicyError,
73 ReplicaCreationError, ReplicaDropError,
74};
75use crate::controller::instance::{Instance, SharedCollectionState};
76use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection_sink};
77use crate::controller::replica::ReplicaConfig;
78use crate::logging::{LogVariant, LoggingConfig};
79use crate::metrics::ComputeControllerMetrics;
80use crate::protocol::command::{ComputeParameters, PeekTarget};
81use crate::protocol::response::{PeekResponse, SubscribeBatch};
82
83mod instance;
84mod introspection;
85mod replica;
86mod sequential_hydration;
87
88pub mod error;
89pub mod instance_client;
90pub use instance_client::InstanceClient;
91
92pub(crate) type StorageCollections<T> = Arc<
93 dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T> + Send + Sync,
94>;
95
96pub trait ComputeControllerTimestamp: TimestampManipulation + Into<Datum<'static>> + Sync {}
99
100impl ComputeControllerTimestamp for mz_repr::Timestamp {}
101
102#[derive(Debug)]
104pub enum ComputeControllerResponse<T> {
105 PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
107 SubscribeResponse(GlobalId, SubscribeBatch<T>),
109 CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
120 FrontierUpper {
125 id: GlobalId,
127 upper: Antichain<T>,
129 },
130}
131
132#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
134pub enum PeekNotification {
135 Success {
137 rows: u64,
139 result_size: u64,
141 },
142 Error(String),
144 Canceled,
146}
147
148impl PeekNotification {
149 fn new(peek_response: &PeekResponse, offset: usize, limit: Option<usize>) -> Self {
152 match peek_response {
153 PeekResponse::Rows(rows) => {
154 let num_rows = u64::cast_from(RowCollection::offset_limit(
155 rows.iter().map(|r| r.count()).sum(),
156 offset,
157 limit,
158 ));
159 let result_size = u64::cast_from(rows.iter().map(|r| r.byte_len()).sum::<usize>());
160
161 tracing::trace!(?num_rows, ?result_size, "inline peek result");
162
163 Self::Success {
164 rows: num_rows,
165 result_size,
166 }
167 }
168 PeekResponse::Stashed(stashed_response) => {
169 let rows = stashed_response.num_rows(offset, limit);
170 let result_size = stashed_response.size_bytes();
171
172 tracing::trace!(?rows, ?result_size, "stashed peek result");
173
174 Self::Success {
175 rows: u64::cast_from(rows),
176 result_size: u64::cast_from(result_size),
177 }
178 }
179 PeekResponse::Error(err) => Self::Error(err.clone()),
180 PeekResponse::Canceled => Self::Canceled,
181 }
182 }
183}
184
185pub struct ComputeController<T: ComputeControllerTimestamp> {
187 instances: BTreeMap<ComputeInstanceId, InstanceState<T>>,
188 instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>,
192 build_info: &'static BuildInfo,
193 storage_collections: StorageCollections<T>,
195 initialized: bool,
197 read_only: bool,
203 config: ComputeParameters,
205 peek_stash_persist_location: PersistLocation,
207 stashed_response: Option<ComputeControllerResponse<T>>,
209 metrics: ComputeControllerMetrics,
211 now: NowFn,
213 wallclock_lag: WallclockLagFn<T>,
215 dyncfg: Arc<ConfigSet>,
220
221 response_rx: mpsc::UnboundedReceiver<ComputeControllerResponse<T>>,
223 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
225 introspection_rx: Option<mpsc::UnboundedReceiver<IntrospectionUpdates>>,
230 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
232
233 maintenance_ticker: tokio::time::Interval,
235 maintenance_scheduled: bool,
237}
238
239impl<T: ComputeControllerTimestamp> ComputeController<T> {
240 pub fn new(
242 build_info: &'static BuildInfo,
243 storage_collections: StorageCollections<T>,
244 read_only: bool,
245 metrics_registry: &MetricsRegistry,
246 peek_stash_persist_location: PersistLocation,
247 controller_metrics: ControllerMetrics,
248 now: NowFn,
249 wallclock_lag: WallclockLagFn<T>,
250 ) -> Self {
251 let (response_tx, response_rx) = mpsc::unbounded_channel();
252 let (introspection_tx, introspection_rx) = mpsc::unbounded_channel();
253
254 let mut maintenance_ticker = time::interval(Duration::from_secs(1));
255 maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
256
257 let instance_workload_classes = Arc::new(Mutex::new(BTreeMap::<
258 ComputeInstanceId,
259 Option<String>,
260 >::new()));
261
262 metrics_registry.register_postprocessor({
266 let instance_workload_classes = Arc::clone(&instance_workload_classes);
267 move |metrics| {
268 let instance_workload_classes = instance_workload_classes
269 .lock()
270 .expect("lock poisoned")
271 .iter()
272 .map(|(id, workload_class)| (id.to_string(), workload_class.clone()))
273 .collect::<BTreeMap<String, Option<String>>>();
274 for metric in metrics {
275 'metric: for metric in metric.mut_metric() {
276 for label in metric.get_label() {
277 if label.name() == "instance_id" {
278 if let Some(workload_class) = instance_workload_classes
279 .get(label.value())
280 .cloned()
281 .flatten()
282 {
283 let mut label = LabelPair::default();
284 label.set_name("workload_class".into());
285 label.set_value(workload_class.clone());
286
287 let mut labels = metric.take_label();
288 labels.push(label);
289 metric.set_label(labels);
290 }
291 continue 'metric;
292 }
293 }
294 }
295 }
296 }
297 });
298
299 let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics);
300
301 Self {
302 instances: BTreeMap::new(),
303 instance_workload_classes,
304 build_info,
305 storage_collections,
306 initialized: false,
307 read_only,
308 config: Default::default(),
309 peek_stash_persist_location,
310 stashed_response: None,
311 metrics,
312 now,
313 wallclock_lag,
314 dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()),
315 response_rx,
316 response_tx,
317 introspection_rx: Some(introspection_rx),
318 introspection_tx,
319 maintenance_ticker,
320 maintenance_scheduled: false,
321 }
322 }
323
324 pub fn start_introspection_sink(
329 &mut self,
330 storage_controller: &dyn StorageController<Timestamp = T>,
331 ) {
332 if let Some(rx) = self.introspection_rx.take() {
333 spawn_introspection_sink(rx, storage_controller);
334 }
335 }
336
337 pub fn instance_exists(&self, id: ComputeInstanceId) -> bool {
339 self.instances.contains_key(&id)
340 }
341
342 fn instance(&self, id: ComputeInstanceId) -> Result<&InstanceState<T>, InstanceMissing> {
344 self.instances.get(&id).ok_or(InstanceMissing(id))
345 }
346
347 pub fn instance_client(
349 &self,
350 id: ComputeInstanceId,
351 ) -> Result<InstanceClient<T>, InstanceMissing> {
352 self.instance(id).map(|instance| instance.client.clone())
353 }
354
355 fn instance_mut(
357 &mut self,
358 id: ComputeInstanceId,
359 ) -> Result<&mut InstanceState<T>, InstanceMissing> {
360 self.instances.get_mut(&id).ok_or(InstanceMissing(id))
361 }
362
363 pub fn collection_ids(
365 &self,
366 instance_id: ComputeInstanceId,
367 ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
368 let instance = self.instance(instance_id)?;
369 let ids = instance.collections.keys().copied();
370 Ok(ids)
371 }
372
373 pub fn collection_frontiers(
378 &self,
379 collection_id: GlobalId,
380 instance_id: Option<ComputeInstanceId>,
381 ) -> Result<CollectionFrontiers<T>, CollectionLookupError> {
382 let collection = match instance_id {
383 Some(id) => self.instance(id)?.collection(collection_id)?,
384 None => self
385 .instances
386 .values()
387 .find_map(|i| i.collections.get(&collection_id))
388 .ok_or(CollectionMissing(collection_id))?,
389 };
390
391 Ok(collection.frontiers())
392 }
393
394 pub fn collection_reverse_dependencies(
396 &self,
397 instance_id: ComputeInstanceId,
398 id: GlobalId,
399 ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
400 let instance = self.instance(instance_id)?;
401 let collections = instance.collections.iter();
402 let ids = collections
403 .filter_map(move |(cid, c)| c.compute_dependencies.contains(&id).then_some(*cid));
404 Ok(ids)
405 }
406
407 pub async fn collection_hydrated(
413 &self,
414 instance_id: ComputeInstanceId,
415 collection_id: GlobalId,
416 ) -> Result<bool, anyhow::Error> {
417 let instance = self.instance(instance_id)?;
418
419 let res = instance
420 .call_sync(move |i| i.collection_hydrated(collection_id))
421 .await?;
422
423 Ok(res)
424 }
425
426 pub fn collections_hydrated_for_replicas(
433 &self,
434 instance_id: ComputeInstanceId,
435 replicas: Vec<ReplicaId>,
436 exclude_collections: BTreeSet<GlobalId>,
437 ) -> Result<oneshot::Receiver<bool>, anyhow::Error> {
438 let instance = self.instance(instance_id)?;
439
440 if instance.replicas.is_empty() && !replicas.iter().any(|id| instance.replicas.contains(id))
442 {
443 return Err(HydrationCheckBadTarget(replicas).into());
444 }
445
446 let (tx, rx) = oneshot::channel();
447 instance.call(move |i| {
448 let result = i
449 .collections_hydrated_on_replicas(Some(replicas), &exclude_collections)
450 .expect("validated");
451 let _ = tx.send(result);
452 });
453
454 Ok(rx)
455 }
456
457 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
461 let Self {
468 instances,
469 instance_workload_classes,
470 build_info: _,
471 storage_collections: _,
472 initialized,
473 read_only,
474 config: _,
475 peek_stash_persist_location: _,
476 stashed_response,
477 metrics: _,
478 now: _,
479 wallclock_lag: _,
480 dyncfg: _,
481 response_rx: _,
482 response_tx: _,
483 introspection_rx: _,
484 introspection_tx: _,
485 maintenance_ticker: _,
486 maintenance_scheduled,
487 } = self;
488
489 let mut instances_dump = BTreeMap::new();
490 for (id, instance) in instances {
491 let dump = instance.dump().await?;
492 instances_dump.insert(id.to_string(), dump);
493 }
494
495 let instance_workload_classes: BTreeMap<_, _> = instance_workload_classes
496 .lock()
497 .expect("lock poisoned")
498 .iter()
499 .map(|(id, wc)| (id.to_string(), format!("{wc:?}")))
500 .collect();
501
502 Ok(serde_json::json!({
503 "instances": instances_dump,
504 "instance_workload_classes": instance_workload_classes,
505 "initialized": initialized,
506 "read_only": read_only,
507 "stashed_response": format!("{stashed_response:?}"),
508 "maintenance_scheduled": maintenance_scheduled,
509 }))
510 }
511}
512
513impl<T> ComputeController<T>
514where
515 T: ComputeControllerTimestamp,
516{
517 pub fn create_instance(
519 &mut self,
520 id: ComputeInstanceId,
521 arranged_logs: BTreeMap<LogVariant, GlobalId>,
522 workload_class: Option<String>,
523 ) -> Result<(), InstanceExists> {
524 if self.instances.contains_key(&id) {
525 return Err(InstanceExists(id));
526 }
527
528 let mut collections = BTreeMap::new();
529 let mut logs = Vec::with_capacity(arranged_logs.len());
530 for (&log, &id) in &arranged_logs {
531 let collection = Collection::new_log();
532 let shared = collection.shared.clone();
533 collections.insert(id, collection);
534 logs.push((log, id, shared));
535 }
536
537 let client = InstanceClient::spawn(
538 id,
539 self.build_info,
540 Arc::clone(&self.storage_collections),
541 self.peek_stash_persist_location.clone(),
542 logs,
543 self.metrics.for_instance(id),
544 self.now.clone(),
545 self.wallclock_lag.clone(),
546 Arc::clone(&self.dyncfg),
547 self.response_tx.clone(),
548 self.introspection_tx.clone(),
549 self.read_only,
550 );
551
552 let instance = InstanceState::new(client, collections);
553 self.instances.insert(id, instance);
554
555 self.instance_workload_classes
556 .lock()
557 .expect("lock poisoned")
558 .insert(id, workload_class.clone());
559
560 let instance = self.instances.get_mut(&id).expect("instance just added");
561 if self.initialized {
562 instance.call(Instance::initialization_complete);
563 }
564
565 let mut config_params = self.config.clone();
566 config_params.workload_class = Some(workload_class);
567 instance.call(|i| i.update_configuration(config_params));
568
569 Ok(())
570 }
571
572 pub fn update_instance_workload_class(
574 &mut self,
575 id: ComputeInstanceId,
576 workload_class: Option<String>,
577 ) -> Result<(), InstanceMissing> {
578 let _ = self.instance(id)?;
580
581 self.instance_workload_classes
582 .lock()
583 .expect("lock poisoned")
584 .insert(id, workload_class);
585
586 self.update_configuration(Default::default());
588
589 Ok(())
590 }
591
592 pub fn drop_instance(&mut self, id: ComputeInstanceId) {
598 if let Some(instance) = self.instances.remove(&id) {
599 instance.call(|i| i.shutdown());
600 }
601
602 self.instance_workload_classes
603 .lock()
604 .expect("lock poisoned")
605 .remove(&id);
606 }
607
608 pub fn dyncfg(&self) -> &Arc<ConfigSet> {
610 &self.dyncfg
611 }
612
613 pub fn update_configuration(&mut self, config_params: ComputeParameters) {
615 config_params.dyncfg_updates.apply(&self.dyncfg);
617
618 let instance_workload_classes = self
619 .instance_workload_classes
620 .lock()
621 .expect("lock poisoned");
622
623 for (id, instance) in self.instances.iter_mut() {
626 let mut params = config_params.clone();
627 params.workload_class = Some(instance_workload_classes[id].clone());
628 instance.call(|i| i.update_configuration(params));
629 }
630
631 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(&self.dyncfg);
632 match overflowing_behavior.parse() {
633 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
634 Err(err) => {
635 tracing::error!(
636 err,
637 overflowing_behavior,
638 "Invalid value for ore_overflowing_behavior"
639 );
640 }
641 }
642
643 self.config.update(config_params);
645 }
646
647 pub fn initialization_complete(&mut self) {
653 self.initialized = true;
654 for instance in self.instances.values_mut() {
655 instance.call(Instance::initialization_complete);
656 }
657 }
658
659 pub async fn ready(&mut self) {
667 if self.stashed_response.is_some() {
668 return;
670 }
671 if self.maintenance_scheduled {
672 return;
674 }
675
676 tokio::select! {
677 resp = self.response_rx.recv() => {
678 let resp = resp.expect("`self.response_tx` not dropped");
679 self.stashed_response = Some(resp);
680 }
681 _ = self.maintenance_ticker.tick() => {
682 self.maintenance_scheduled = true;
683 },
684 }
685 }
686
687 pub fn add_replica_to_instance(
689 &mut self,
690 instance_id: ComputeInstanceId,
691 replica_id: ReplicaId,
692 location: ClusterReplicaLocation,
693 config: ComputeReplicaConfig,
694 ) -> Result<(), ReplicaCreationError> {
695 use ReplicaCreationError::*;
696
697 let instance = self.instance(instance_id)?;
698
699 if instance.replicas.contains(&replica_id) {
701 return Err(ReplicaExists(replica_id));
702 }
703
704 let (enable_logging, interval) = match config.logging.interval {
705 Some(interval) => (true, interval),
706 None => (false, Duration::from_secs(1)),
707 };
708
709 let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.dyncfg);
710
711 let replica_config = ReplicaConfig {
712 location,
713 logging: LoggingConfig {
714 interval,
715 enable_logging,
716 log_logging: config.logging.log_logging,
717 index_logs: Default::default(),
718 },
719 grpc_client: self.config.grpc_client.clone(),
720 expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
721 };
722
723 let instance = self.instance_mut(instance_id).expect("validated");
724 instance.replicas.insert(replica_id);
725
726 instance.call(move |i| {
727 i.add_replica(replica_id, replica_config, None)
728 .expect("validated")
729 });
730
731 Ok(())
732 }
733
734 pub fn drop_replica(
736 &mut self,
737 instance_id: ComputeInstanceId,
738 replica_id: ReplicaId,
739 ) -> Result<(), ReplicaDropError> {
740 use ReplicaDropError::*;
741
742 let instance = self.instance_mut(instance_id)?;
743
744 if !instance.replicas.contains(&replica_id) {
746 return Err(ReplicaMissing(replica_id));
747 }
748
749 instance.replicas.remove(&replica_id);
750
751 instance.call(move |i| i.remove_replica(replica_id).expect("validated"));
752
753 Ok(())
754 }
755
756 pub fn create_dataflow(
763 &mut self,
764 instance_id: ComputeInstanceId,
765 mut dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
766 target_replica: Option<ReplicaId>,
767 ) -> Result<(), DataflowCreationError> {
768 use DataflowCreationError::*;
769
770 let instance = self.instance(instance_id)?;
771
772 if let Some(replica_id) = target_replica {
774 if !instance.replicas.contains(&replica_id) {
775 return Err(ReplicaMissing(replica_id));
776 }
777 assert!(
778 dataflow.exported_index_ids().next().is_none(),
779 "Replica-targeted indexes are not supported"
780 );
781 }
782
783 let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
785 if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
786 return Err(EmptyAsOfForSubscribe);
787 }
788 if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
789 return Err(EmptyAsOfForCopyTo);
790 }
791
792 let storage_ids = dataflow.imported_source_ids().collect();
794 let mut import_read_holds = self.storage_collections.acquire_read_holds(storage_ids)?;
795 for id in dataflow.imported_index_ids() {
796 let read_hold = instance.acquire_read_hold(id)?;
797 import_read_holds.push(read_hold);
798 }
799 for hold in &import_read_holds {
800 if PartialOrder::less_than(as_of, hold.since()) {
801 return Err(SinceViolation(hold.id()));
802 }
803 }
804
805 for id in dataflow.persist_sink_ids() {
807 if self.storage_collections.check_exists(id).is_err() {
808 return Err(CollectionMissing(id));
809 }
810 }
811 let time_dependence = self
812 .determine_time_dependence(instance_id, &dataflow)
813 .expect("must exist");
814
815 let instance = self.instance_mut(instance_id).expect("validated");
816
817 let mut shared_collection_state = BTreeMap::new();
818 for id in dataflow.export_ids() {
819 let shared = SharedCollectionState::new(as_of.clone());
820 let collection = Collection {
821 write_only: dataflow.sink_exports.contains_key(&id),
822 compute_dependencies: dataflow.imported_index_ids().collect(),
823 shared: shared.clone(),
824 time_dependence: time_dependence.clone(),
825 };
826 instance.collections.insert(id, collection);
827 shared_collection_state.insert(id, shared);
828 }
829
830 dataflow.time_dependence = time_dependence;
831
832 instance.call(move |i| {
833 i.create_dataflow(
834 dataflow,
835 import_read_holds,
836 shared_collection_state,
837 target_replica,
838 )
839 .expect("validated")
840 });
841
842 Ok(())
843 }
844
845 pub fn drop_collections(
848 &mut self,
849 instance_id: ComputeInstanceId,
850 collection_ids: Vec<GlobalId>,
851 ) -> Result<(), CollectionUpdateError> {
852 let instance = self.instance_mut(instance_id)?;
853
854 for id in &collection_ids {
856 instance.collection(*id)?;
857 }
858
859 for id in &collection_ids {
860 instance.collections.remove(id);
861 }
862
863 instance.call(|i| i.drop_collections(collection_ids).expect("validated"));
864
865 Ok(())
866 }
867
868 pub fn peek(
870 &self,
871 instance_id: ComputeInstanceId,
872 peek_target: PeekTarget,
873 literal_constraints: Option<Vec<Row>>,
874 uuid: Uuid,
875 timestamp: T,
876 result_desc: RelationDesc,
877 finishing: RowSetFinishing,
878 map_filter_project: mz_expr::SafeMfpPlan,
879 target_replica: Option<ReplicaId>,
880 peek_response_tx: oneshot::Sender<PeekResponse>,
881 ) -> Result<(), PeekError> {
882 use PeekError::*;
883
884 let instance = self.instance(instance_id)?;
885
886 if let Some(replica_id) = target_replica {
888 if !instance.replicas.contains(&replica_id) {
889 return Err(ReplicaMissing(replica_id));
890 }
891 }
892
893 let read_hold = match &peek_target {
895 PeekTarget::Index { id } => instance.acquire_read_hold(*id)?,
896 PeekTarget::Persist { id, .. } => self
897 .storage_collections
898 .acquire_read_holds(vec![*id])?
899 .into_element(),
900 };
901 if !read_hold.since().less_equal(×tamp) {
902 return Err(SinceViolation(peek_target.id()));
903 }
904
905 instance.call(move |i| {
906 i.peek(
907 peek_target,
908 literal_constraints,
909 uuid,
910 timestamp,
911 result_desc,
912 finishing,
913 map_filter_project,
914 read_hold,
915 target_replica,
916 peek_response_tx,
917 )
918 .expect("validated")
919 });
920
921 Ok(())
922 }
923
924 pub fn cancel_peek(
934 &self,
935 instance_id: ComputeInstanceId,
936 uuid: Uuid,
937 reason: PeekResponse,
938 ) -> Result<(), InstanceMissing> {
939 self.instance(instance_id)?
940 .call(move |i| i.cancel_peek(uuid, reason));
941 Ok(())
942 }
943
944 pub fn set_read_policy(
956 &self,
957 instance_id: ComputeInstanceId,
958 policies: Vec<(GlobalId, ReadPolicy<T>)>,
959 ) -> Result<(), ReadPolicyError> {
960 use ReadPolicyError::*;
961
962 let instance = self.instance(instance_id)?;
963
964 for (id, _) in &policies {
966 let collection = instance.collection(*id)?;
967 if collection.write_only {
968 return Err(WriteOnlyCollection(*id));
969 }
970 }
971
972 self.instance(instance_id)?
973 .call(|i| i.set_read_policy(policies).expect("validated"));
974
975 Ok(())
976 }
977
978 pub fn acquire_read_hold(
980 &self,
981 instance_id: ComputeInstanceId,
982 collection_id: GlobalId,
983 ) -> Result<ReadHold<T>, CollectionUpdateError> {
984 let read_hold = self
985 .instance(instance_id)?
986 .acquire_read_hold(collection_id)?;
987 Ok(read_hold)
988 }
989
990 fn determine_time_dependence(
992 &self,
993 instance_id: ComputeInstanceId,
994 dataflow: &DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
995 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
996 let is_continual_task = dataflow.continual_task_ids().next().is_some();
998 if is_continual_task {
999 return Ok(None);
1000 }
1001
1002 let instance = self
1003 .instance(instance_id)
1004 .map_err(|err| TimeDependenceError::InstanceMissing(err.0))?;
1005 let mut time_dependencies = Vec::new();
1006
1007 for id in dataflow.imported_index_ids() {
1008 let dependence = instance
1009 .get_time_dependence(id)
1010 .map_err(|err| TimeDependenceError::CollectionMissing(err.0))?;
1011 time_dependencies.push(dependence);
1012 }
1013
1014 'source: for id in dataflow.imported_source_ids() {
1015 for instance in self.instances.values() {
1019 if let Ok(dependence) = instance.get_time_dependence(id) {
1020 time_dependencies.push(dependence);
1021 continue 'source;
1022 }
1023 }
1024
1025 time_dependencies.push(self.storage_collections.determine_time_dependence(id)?);
1027 }
1028
1029 Ok(TimeDependence::merge(
1030 time_dependencies,
1031 dataflow.refresh_schedule.as_ref(),
1032 ))
1033 }
1034
1035 #[mz_ore::instrument(level = "debug")]
1037 pub fn process(&mut self) -> Option<ComputeControllerResponse<T>> {
1038 if self.maintenance_scheduled {
1040 self.maintain();
1041 self.maintenance_scheduled = false;
1042 }
1043
1044 self.stashed_response.take()
1046 }
1047
1048 #[mz_ore::instrument(level = "debug")]
1049 fn maintain(&mut self) {
1050 for instance in self.instances.values_mut() {
1052 instance.call(Instance::maintain);
1053 }
1054 }
1055
1056 pub fn allow_writes(
1060 &mut self,
1061 instance_id: ComputeInstanceId,
1062 collection_id: GlobalId,
1063 ) -> Result<(), CollectionUpdateError> {
1064 if self.read_only {
1065 tracing::debug!("Skipping allow_writes in read-only mode");
1066 return Ok(());
1067 }
1068
1069 let instance = self.instance_mut(instance_id)?;
1070
1071 instance.collection(collection_id)?;
1073
1074 instance.call(move |i| i.allow_writes(collection_id).expect("validated"));
1075
1076 Ok(())
1077 }
1078}
1079
1080#[derive(Debug)]
1081struct InstanceState<T: ComputeControllerTimestamp> {
1082 client: InstanceClient<T>,
1083 replicas: BTreeSet<ReplicaId>,
1084 collections: BTreeMap<GlobalId, Collection<T>>,
1085}
1086
1087impl<T: ComputeControllerTimestamp> InstanceState<T> {
1088 fn new(client: InstanceClient<T>, collections: BTreeMap<GlobalId, Collection<T>>) -> Self {
1089 Self {
1090 client,
1091 replicas: Default::default(),
1092 collections,
1093 }
1094 }
1095
1096 fn collection(&self, id: GlobalId) -> Result<&Collection<T>, CollectionMissing> {
1097 self.collections.get(&id).ok_or(CollectionMissing(id))
1098 }
1099
1100 fn call<F>(&self, f: F)
1106 where
1107 F: FnOnce(&mut Instance<T>) + Send + 'static,
1108 {
1109 self.client.call(f).expect("instance not dropped")
1110 }
1111
1112 async fn call_sync<F, R>(&self, f: F) -> R
1118 where
1119 F: FnOnce(&mut Instance<T>) -> R + Send + 'static,
1120 R: Send + 'static,
1121 {
1122 self.client
1123 .call_sync(f)
1124 .await
1125 .expect("instance not dropped")
1126 }
1127
1128 pub fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
1130 let collection = self.collection(id)?;
1140 let since = collection.shared.lock_read_capabilities(|caps| {
1141 let since = caps.frontier().to_owned();
1142 caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
1143 since
1144 });
1145
1146 let hold = ReadHold::new(id, since, self.client.read_hold_tx());
1147 Ok(hold)
1148 }
1149
1150 fn get_time_dependence(
1152 &self,
1153 id: GlobalId,
1154 ) -> Result<Option<TimeDependence>, CollectionMissing> {
1155 Ok(self.collection(id)?.time_dependence.clone())
1156 }
1157
1158 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1160 let Self {
1162 client: _,
1163 replicas,
1164 collections,
1165 } = self;
1166
1167 let instance = self.call_sync(|i| i.dump()).await?;
1168 let replicas: Vec<_> = replicas.iter().map(|id| id.to_string()).collect();
1169 let collections: BTreeMap<_, _> = collections
1170 .iter()
1171 .map(|(id, c)| (id.to_string(), format!("{c:?}")))
1172 .collect();
1173
1174 Ok(serde_json::json!({
1175 "instance": instance,
1176 "replicas": replicas,
1177 "collections": collections,
1178 }))
1179 }
1180}
1181
1182#[derive(Debug)]
1183struct Collection<T> {
1184 write_only: bool,
1186 compute_dependencies: BTreeSet<GlobalId>,
1187 shared: SharedCollectionState<T>,
1188 time_dependence: Option<TimeDependence>,
1191}
1192
1193impl<T: Timestamp> Collection<T> {
1194 fn new_log() -> Self {
1195 let as_of = Antichain::from_elem(T::minimum());
1196 Self {
1197 write_only: false,
1198 compute_dependencies: Default::default(),
1199 shared: SharedCollectionState::new(as_of),
1200 time_dependence: Some(TimeDependence::default()),
1201 }
1202 }
1203
1204 fn frontiers(&self) -> CollectionFrontiers<T> {
1205 let read_frontier = self
1206 .shared
1207 .lock_read_capabilities(|c| c.frontier().to_owned());
1208 let write_frontier = self.shared.lock_write_frontier(|f| f.clone());
1209 CollectionFrontiers {
1210 read_frontier,
1211 write_frontier,
1212 }
1213 }
1214}
1215
1216#[derive(Clone, Debug)]
1218pub struct CollectionFrontiers<T> {
1219 pub read_frontier: Antichain<T>,
1221 pub write_frontier: Antichain<T>,
1223}
1224
1225impl<T: Timestamp> Default for CollectionFrontiers<T> {
1226 fn default() -> Self {
1227 Self {
1228 read_frontier: Antichain::from_elem(T::minimum()),
1229 write_frontier: Antichain::from_elem(T::minimum()),
1230 }
1231 }
1232}