1use std::collections::{BTreeMap, BTreeSet};
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use mz_build_info::BuildInfo;
40use mz_cluster_client::client::ClusterReplicaLocation;
41use mz_cluster_client::metrics::ControllerMetrics;
42use mz_cluster_client::{ReplicaId, WallclockLagFn};
43use mz_compute_types::ComputeInstanceId;
44use mz_compute_types::config::ComputeReplicaConfig;
45use mz_compute_types::dataflows::DataflowDescription;
46use mz_compute_types::dyncfgs::{
47 COMPUTE_REPLICA_EXPIRATION_OFFSET, ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA,
48};
49use mz_dyncfg::{ConfigSet, ConfigUpdates};
50use mz_expr::RowSetFinishing;
51use mz_expr::row::RowCollection;
52use mz_ore::cast::CastFrom;
53use mz_ore::metrics::MetricsRegistry;
54use mz_ore::now::NowFn;
55use mz_ore::tracing::OpenTelemetryContext;
56use mz_persist_types::PersistLocation;
57use mz_repr::{GlobalId, RelationDesc, Row, Timestamp};
58use mz_storage_client::controller::StorageController;
59use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
60use mz_storage_types::read_holds::ReadHold;
61use mz_storage_types::read_policy::ReadPolicy;
62use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
63use prometheus::proto::LabelPair;
64use serde::{Deserialize, Serialize};
65use timely::PartialOrder;
66use timely::progress::Antichain;
67use tokio::sync::{mpsc, oneshot};
68use tokio::time::{self, MissedTickBehavior};
69use uuid::Uuid;
70
71use crate::controller::error::{
72 CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
73 HydrationCheckBadTarget, InstanceExists, InstanceMissing, PeekError, ReadPolicyError,
74 ReplicaCreationError, ReplicaDropError,
75};
76use crate::controller::instance::{Instance, SharedCollectionState};
77use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection_sink};
78use crate::controller::replica::ReplicaConfig;
79use crate::logging::{LogVariant, LoggingConfig};
80use crate::metrics::ComputeControllerMetrics;
81use crate::protocol::command::{ComputeParameters, PeekTarget};
82use crate::protocol::response::{PeekResponse, SubscribeBatch};
83
84mod instance;
85mod introspection;
86mod replica;
87mod sequential_hydration;
88
89pub mod error;
90pub mod instance_client;
91pub use instance_client::InstanceClient;
92
93pub(crate) type StorageCollections =
94 Arc<dyn mz_storage_client::storage_collections::StorageCollections + Send + Sync>;
95
96#[derive(Debug)]
98pub enum ComputeControllerResponse {
99 PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
101 SubscribeResponse(GlobalId, SubscribeBatch),
103 CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
114 FrontierUpper {
119 id: GlobalId,
121 upper: Antichain<Timestamp>,
123 },
124}
125
126#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
128pub enum PeekNotification {
129 Success {
131 rows: u64,
133 result_size: u64,
135 },
136 Error(String),
138 Canceled,
140}
141
142impl PeekNotification {
143 fn new(peek_response: &PeekResponse, offset: usize, limit: Option<usize>) -> Self {
146 match peek_response {
147 PeekResponse::Rows(rows) => {
148 let num_rows = u64::cast_from(RowCollection::offset_limit(
149 rows.iter().map(|r| r.count()).sum(),
150 offset,
151 limit,
152 ));
153 let result_size = u64::cast_from(rows.iter().map(|r| r.byte_len()).sum::<usize>());
154
155 tracing::trace!(?num_rows, ?result_size, "inline peek result");
156
157 Self::Success {
158 rows: num_rows,
159 result_size,
160 }
161 }
162 PeekResponse::Stashed(stashed_response) => {
163 let rows = stashed_response.num_rows(offset, limit);
164 let result_size = stashed_response.size_bytes();
165
166 tracing::trace!(?rows, ?result_size, "stashed peek result");
167
168 Self::Success {
169 rows: u64::cast_from(rows),
170 result_size: u64::cast_from(result_size),
171 }
172 }
173 PeekResponse::Error(err) => Self::Error(err.clone()),
174 PeekResponse::Canceled => Self::Canceled,
175 }
176 }
177}
178
179pub struct ComputeController {
181 instances: BTreeMap<ComputeInstanceId, InstanceState>,
182 instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>,
186 build_info: &'static BuildInfo,
187 storage_collections: StorageCollections,
189 initialized: bool,
191 read_only: bool,
197 config: ComputeParameters,
199 peek_stash_persist_location: PersistLocation,
201 stashed_response: Option<ComputeControllerResponse>,
203 metrics: ComputeControllerMetrics,
205 now: NowFn,
207 wallclock_lag: WallclockLagFn<Timestamp>,
209 dyncfg: Arc<ConfigSet>,
214
215 response_rx: mpsc::UnboundedReceiver<ComputeControllerResponse>,
217 response_tx: mpsc::UnboundedSender<ComputeControllerResponse>,
219 introspection_rx: Option<mpsc::UnboundedReceiver<IntrospectionUpdates>>,
224 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
226
227 maintenance_ticker: tokio::time::Interval,
229 maintenance_scheduled: bool,
231}
232
233impl ComputeController {
234 pub fn new(
236 build_info: &'static BuildInfo,
237 storage_collections: StorageCollections,
238 read_only: bool,
239 metrics_registry: &MetricsRegistry,
240 peek_stash_persist_location: PersistLocation,
241 controller_metrics: ControllerMetrics,
242 now: NowFn,
243 wallclock_lag: WallclockLagFn<Timestamp>,
244 ) -> Self {
245 let (response_tx, response_rx) = mpsc::unbounded_channel();
246 let (introspection_tx, introspection_rx) = mpsc::unbounded_channel();
247
248 let mut maintenance_ticker = time::interval(Duration::from_secs(1));
249 maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
250
251 let instance_workload_classes = Arc::new(Mutex::new(BTreeMap::<
252 ComputeInstanceId,
253 Option<String>,
254 >::new()));
255
256 metrics_registry.register_postprocessor({
260 let instance_workload_classes = Arc::clone(&instance_workload_classes);
261 move |metrics| {
262 let instance_workload_classes = instance_workload_classes
263 .lock()
264 .expect("lock poisoned")
265 .iter()
266 .map(|(id, workload_class)| (id.to_string(), workload_class.clone()))
267 .collect::<BTreeMap<String, Option<String>>>();
268 for metric in metrics {
269 'metric: for metric in metric.mut_metric() {
270 for label in metric.get_label() {
271 if label.name() == "instance_id" {
272 if let Some(workload_class) = instance_workload_classes
273 .get(label.value())
274 .cloned()
275 .flatten()
276 {
277 let mut label = LabelPair::default();
278 label.set_name("workload_class".into());
279 label.set_value(workload_class.clone());
280
281 let mut labels = metric.take_label();
282 labels.push(label);
283 metric.set_label(labels);
284 }
285 continue 'metric;
286 }
287 }
288 }
289 }
290 }
291 });
292
293 let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics);
294
295 Self {
296 instances: BTreeMap::new(),
297 instance_workload_classes,
298 build_info,
299 storage_collections,
300 initialized: false,
301 read_only,
302 config: Default::default(),
303 peek_stash_persist_location,
304 stashed_response: None,
305 metrics,
306 now,
307 wallclock_lag,
308 dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()),
309 response_rx,
310 response_tx,
311 introspection_rx: Some(introspection_rx),
312 introspection_tx,
313 maintenance_ticker,
314 maintenance_scheduled: false,
315 }
316 }
317
318 pub fn start_introspection_sink(&mut self, storage_controller: &dyn StorageController) {
323 if let Some(rx) = self.introspection_rx.take() {
324 spawn_introspection_sink(rx, storage_controller);
325 }
326 }
327
328 pub fn instance_exists(&self, id: ComputeInstanceId) -> bool {
330 self.instances.contains_key(&id)
331 }
332
333 fn instance(&self, id: ComputeInstanceId) -> Result<&InstanceState, InstanceMissing> {
335 self.instances.get(&id).ok_or(InstanceMissing(id))
336 }
337
338 pub fn instance_client(
340 &self,
341 id: ComputeInstanceId,
342 ) -> Result<InstanceClient, InstanceMissing> {
343 self.instance(id).map(|instance| instance.client.clone())
344 }
345
346 fn instance_mut(
348 &mut self,
349 id: ComputeInstanceId,
350 ) -> Result<&mut InstanceState, InstanceMissing> {
351 self.instances.get_mut(&id).ok_or(InstanceMissing(id))
352 }
353
354 pub fn collection_ids(
356 &self,
357 instance_id: ComputeInstanceId,
358 ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
359 let instance = self.instance(instance_id)?;
360 let ids = instance.collections.keys().copied();
361 Ok(ids)
362 }
363
364 pub fn collection_frontiers(
369 &self,
370 collection_id: GlobalId,
371 instance_id: Option<ComputeInstanceId>,
372 ) -> Result<CollectionFrontiers, CollectionLookupError> {
373 let collection = match instance_id {
374 Some(id) => self.instance(id)?.collection(collection_id)?,
375 None => self
376 .instances
377 .values()
378 .find_map(|i| i.collections.get(&collection_id))
379 .ok_or(CollectionMissing(collection_id))?,
380 };
381
382 Ok(collection.frontiers())
383 }
384
385 pub fn collection_reverse_dependencies(
387 &self,
388 instance_id: ComputeInstanceId,
389 id: GlobalId,
390 ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
391 let instance = self.instance(instance_id)?;
392 let collections = instance.collections.iter();
393 let ids = collections
394 .filter_map(move |(cid, c)| c.compute_dependencies.contains(&id).then_some(*cid));
395 Ok(ids)
396 }
397
398 pub async fn collection_hydrated(
404 &self,
405 instance_id: ComputeInstanceId,
406 collection_id: GlobalId,
407 ) -> Result<bool, anyhow::Error> {
408 let instance = self.instance(instance_id)?;
409
410 let res = instance
411 .call_sync(move |i| i.collection_hydrated(collection_id))
412 .await?;
413
414 Ok(res)
415 }
416
417 pub fn collections_hydrated_for_replicas(
424 &self,
425 instance_id: ComputeInstanceId,
426 replicas: Vec<ReplicaId>,
427 exclude_collections: BTreeSet<GlobalId>,
428 ) -> Result<oneshot::Receiver<bool>, anyhow::Error> {
429 let instance = self.instance(instance_id)?;
430
431 if !instance.replicas.is_empty()
433 && !replicas.iter().any(|id| instance.replicas.contains(id))
434 {
435 return Err(HydrationCheckBadTarget(replicas).into());
436 }
437
438 let (tx, rx) = oneshot::channel();
439 instance.call(move |i| {
440 let result = i
441 .collections_hydrated_on_replicas(Some(replicas), &exclude_collections)
442 .expect("validated");
443 let _ = tx.send(result);
444 });
445
446 Ok(rx)
447 }
448
449 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
453 let Self {
460 instances,
461 instance_workload_classes,
462 build_info: _,
463 storage_collections: _,
464 initialized,
465 read_only,
466 config: _,
467 peek_stash_persist_location: _,
468 stashed_response,
469 metrics: _,
470 now: _,
471 wallclock_lag: _,
472 dyncfg: _,
473 response_rx: _,
474 response_tx: _,
475 introspection_rx: _,
476 introspection_tx: _,
477 maintenance_ticker: _,
478 maintenance_scheduled,
479 } = self;
480
481 let mut instances_dump = BTreeMap::new();
482 for (id, instance) in instances {
483 let dump = instance.dump().await?;
484 instances_dump.insert(id.to_string(), dump);
485 }
486
487 let instance_workload_classes: BTreeMap<_, _> = instance_workload_classes
488 .lock()
489 .expect("lock poisoned")
490 .iter()
491 .map(|(id, wc)| (id.to_string(), format!("{wc:?}")))
492 .collect();
493
494 Ok(serde_json::json!({
495 "instances": instances_dump,
496 "instance_workload_classes": instance_workload_classes,
497 "initialized": initialized,
498 "read_only": read_only,
499 "stashed_response": format!("{stashed_response:?}"),
500 "maintenance_scheduled": maintenance_scheduled,
501 }))
502 }
503}
504
505impl ComputeController {
506 pub fn create_instance(
508 &mut self,
509 id: ComputeInstanceId,
510 arranged_logs: BTreeMap<LogVariant, GlobalId>,
511 workload_class: Option<String>,
512 ) -> Result<(), InstanceExists> {
513 if self.instances.contains_key(&id) {
514 return Err(InstanceExists(id));
515 }
516
517 let mut collections = BTreeMap::new();
518 let mut logs = Vec::with_capacity(arranged_logs.len());
519 for (&log, &id) in &arranged_logs {
520 let collection = Collection::new_log();
521 let shared = collection.shared.clone();
522 collections.insert(id, collection);
523 logs.push((log, id, shared));
524 }
525
526 let client = InstanceClient::spawn(
527 id,
528 self.build_info,
529 Arc::clone(&self.storage_collections),
530 self.peek_stash_persist_location.clone(),
531 logs,
532 self.metrics.for_instance(id),
533 self.now.clone(),
534 self.wallclock_lag.clone(),
535 Arc::clone(&self.dyncfg),
536 self.response_tx.clone(),
537 self.introspection_tx.clone(),
538 self.read_only,
539 );
540
541 let instance = InstanceState::new(client, collections);
542 self.instances.insert(id, instance);
543
544 self.instance_workload_classes
545 .lock()
546 .expect("lock poisoned")
547 .insert(id, workload_class.clone());
548
549 let instance = self.instances.get_mut(&id).expect("instance just added");
550 if self.initialized {
551 instance.call(Instance::initialization_complete);
552 }
553
554 let mut config_params = self.config.clone();
555 config_params.workload_class = Some(workload_class);
556 instance.call(|i| i.update_configuration(config_params));
557
558 Ok(())
559 }
560
561 pub fn update_instance_workload_class(
563 &mut self,
564 id: ComputeInstanceId,
565 workload_class: Option<String>,
566 ) -> Result<(), InstanceMissing> {
567 let _ = self.instance(id)?;
569
570 self.instance_workload_classes
571 .lock()
572 .expect("lock poisoned")
573 .insert(id, workload_class);
574
575 self.update_configuration(Default::default());
577
578 Ok(())
579 }
580
581 pub fn drop_instance(&mut self, id: ComputeInstanceId) {
587 if let Some(instance) = self.instances.remove(&id) {
588 instance.call(|i| i.shutdown());
589 }
590
591 self.instance_workload_classes
592 .lock()
593 .expect("lock poisoned")
594 .remove(&id);
595 }
596
597 pub fn dyncfg(&self) -> &Arc<ConfigSet> {
599 &self.dyncfg
600 }
601
602 pub fn update_configuration(&mut self, config_params: ComputeParameters) {
604 config_params.dyncfg_updates.apply(&self.dyncfg);
606
607 let instance_workload_classes = self
608 .instance_workload_classes
609 .lock()
610 .expect("lock poisoned");
611
612 for (id, instance) in self.instances.iter_mut() {
615 let mut params = config_params.clone();
616 params.workload_class = Some(instance_workload_classes[id].clone());
617 instance.call(|i| i.update_configuration(params));
618 }
619
620 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(&self.dyncfg);
621 match overflowing_behavior.parse() {
622 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
623 Err(err) => {
624 tracing::error!(
625 err,
626 overflowing_behavior,
627 "Invalid value for ore_overflowing_behavior"
628 );
629 }
630 }
631
632 self.config.update(config_params);
634 }
635
636 pub fn update_replica_dyncfg_overrides(
645 &mut self,
646 mut overrides: BTreeMap<ComputeInstanceId, BTreeMap<ReplicaId, ConfigUpdates>>,
647 ) {
648 for (id, instance) in self.instances.iter_mut() {
649 let instance_overrides = overrides.remove(id).unwrap_or_default();
650 instance.call(move |i| i.update_replica_dyncfg_overrides(instance_overrides));
651 }
652 }
653
654 pub fn initialization_complete(&mut self) {
660 self.initialized = true;
661 for instance in self.instances.values_mut() {
662 instance.call(Instance::initialization_complete);
663 }
664 }
665
666 pub async fn ready(&mut self) {
674 if self.stashed_response.is_some() {
675 return;
677 }
678 if self.maintenance_scheduled {
679 return;
681 }
682
683 tokio::select! {
684 resp = self.response_rx.recv() => {
685 let resp = resp.expect("`self.response_tx` not dropped");
686 self.stashed_response = Some(resp);
687 }
688 _ = self.maintenance_ticker.tick() => {
689 self.maintenance_scheduled = true;
690 },
691 }
692 }
693
694 pub fn add_replica_to_instance(
696 &mut self,
697 instance_id: ComputeInstanceId,
698 replica_id: ReplicaId,
699 location: ClusterReplicaLocation,
700 config: ComputeReplicaConfig,
701 ) -> Result<(), ReplicaCreationError> {
702 use ReplicaCreationError::*;
703
704 let instance = self.instance(instance_id)?;
705
706 if instance.replicas.contains(&replica_id) {
708 return Err(ReplicaExists(replica_id));
709 }
710
711 let (enable_logging, interval) = match config.logging.interval {
712 Some(interval) => (true, interval),
713 None => (false, Duration::from_secs(1)),
714 };
715
716 let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.dyncfg);
717
718 let arrangement_dictionary_compression =
722 ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA.get(&self.dyncfg);
723
724 let replica_config = ReplicaConfig {
725 location,
726 logging: LoggingConfig {
727 interval,
728 enable_logging,
729 log_logging: config.logging.log_logging,
730 index_logs: Default::default(),
731 },
732 grpc_client: self.config.grpc_client.clone(),
733 expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
734 arrangement_dictionary_compression,
735 };
736
737 let instance = self.instance_mut(instance_id).expect("validated");
738 instance.replicas.insert(replica_id);
739
740 instance.call(move |i| {
741 i.add_replica(replica_id, replica_config, None)
742 .expect("validated")
743 });
744
745 Ok(())
746 }
747
748 pub fn drop_replica(
750 &mut self,
751 instance_id: ComputeInstanceId,
752 replica_id: ReplicaId,
753 ) -> Result<(), ReplicaDropError> {
754 use ReplicaDropError::*;
755
756 let instance = self.instance_mut(instance_id)?;
757
758 if !instance.replicas.contains(&replica_id) {
760 return Err(ReplicaMissing(replica_id));
761 }
762
763 instance.replicas.remove(&replica_id);
764
765 instance.call(move |i| i.remove_replica(replica_id).expect("validated"));
766
767 Ok(())
768 }
769
770 pub fn create_dataflow(
777 &mut self,
778 instance_id: ComputeInstanceId,
779 mut dataflow: DataflowDescription<mz_compute_types::plan::Plan, ()>,
780 target_replica: Option<ReplicaId>,
781 ) -> Result<(), DataflowCreationError> {
782 use DataflowCreationError::*;
783
784 let instance = self.instance(instance_id)?;
785
786 if let Some(replica_id) = target_replica {
788 if !instance.replicas.contains(&replica_id) {
789 return Err(ReplicaMissing(replica_id));
790 }
791 assert!(
792 dataflow.exported_index_ids().next().is_none(),
793 "Replica-targeted indexes are not supported"
794 );
795 }
796
797 let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
799 if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
800 return Err(EmptyAsOfForSubscribe);
801 }
802 if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
803 return Err(EmptyAsOfForCopyTo);
804 }
805
806 let storage_ids = dataflow.imported_source_ids().collect();
808 let mut import_read_holds = self.storage_collections.acquire_read_holds(storage_ids)?;
809 for id in dataflow.imported_index_ids() {
810 let read_hold = instance.acquire_read_hold(id)?;
811 import_read_holds.push(read_hold);
812 }
813 for hold in &import_read_holds {
814 if PartialOrder::less_than(as_of, hold.since()) {
815 return Err(SinceViolation(hold.id()));
816 }
817 }
818
819 for id in dataflow.persist_sink_ids() {
821 if self.storage_collections.check_exists(id).is_err() {
822 return Err(CollectionMissing(id));
823 }
824 }
825 let time_dependence = self
826 .determine_time_dependence(instance_id, &dataflow)
827 .expect("must exist");
828
829 let instance = self.instance_mut(instance_id).expect("validated");
830
831 let mut shared_collection_state = BTreeMap::new();
832 for id in dataflow.export_ids() {
833 let shared = SharedCollectionState::new(as_of.clone());
834 let collection = Collection {
835 write_only: dataflow.sink_exports.contains_key(&id),
836 compute_dependencies: dataflow.imported_index_ids().collect(),
837 shared: shared.clone(),
838 time_dependence: time_dependence.clone(),
839 };
840 instance.collections.insert(id, collection);
841 shared_collection_state.insert(id, shared);
842 }
843
844 dataflow.time_dependence = time_dependence;
845
846 instance.call(move |i| {
847 i.create_dataflow(
848 dataflow,
849 import_read_holds,
850 shared_collection_state,
851 target_replica,
852 )
853 .expect("validated")
854 });
855
856 Ok(())
857 }
858
859 pub fn drop_collections(
862 &mut self,
863 instance_id: ComputeInstanceId,
864 collection_ids: Vec<GlobalId>,
865 ) -> Result<(), CollectionUpdateError> {
866 let instance = self.instance_mut(instance_id)?;
867
868 for id in &collection_ids {
870 instance.collection(*id)?;
871 }
872
873 for id in &collection_ids {
874 instance.collections.remove(id);
875 }
876
877 instance.call(|i| i.drop_collections(collection_ids).expect("validated"));
878
879 Ok(())
880 }
881
882 pub fn peek(
889 &self,
890 instance_id: ComputeInstanceId,
891 peek_target: PeekTarget,
892 literal_constraints: Option<Vec<Row>>,
893 uuid: Uuid,
894 timestamp: Timestamp,
895 result_desc: RelationDesc,
896 finishing: RowSetFinishing,
897 map_filter_project: mz_expr::SafeMfpPlan,
898 read_hold: ReadHold,
899 target_replica: Option<ReplicaId>,
900 peek_response_tx: oneshot::Sender<PeekResponse>,
901 ) -> Result<(), PeekError> {
902 use PeekError::*;
903
904 let instance = self.instance(instance_id)?;
905
906 if let Some(replica_id) = target_replica {
908 if !instance.replicas.contains(&replica_id) {
909 return Err(ReplicaMissing(replica_id));
910 }
911 }
912
913 if read_hold.id() != peek_target.id() {
916 return Err(ReadHoldIdMismatch(read_hold.id()));
917 }
918 if !read_hold.since().less_equal(×tamp) {
919 return Err(SinceViolation(peek_target.id()));
920 }
921
922 instance.call(move |i| {
923 i.peek(
924 peek_target,
925 literal_constraints,
926 uuid,
927 timestamp,
928 result_desc,
929 finishing,
930 map_filter_project,
931 read_hold,
932 target_replica,
933 peek_response_tx,
934 )
935 .expect("validated")
936 });
937
938 Ok(())
939 }
940
941 pub fn cancel_peek(
951 &self,
952 instance_id: ComputeInstanceId,
953 uuid: Uuid,
954 reason: PeekResponse,
955 ) -> Result<(), InstanceMissing> {
956 self.instance(instance_id)?
957 .call(move |i| i.cancel_peek(uuid, reason));
958 Ok(())
959 }
960
961 pub fn set_read_policy(
973 &self,
974 instance_id: ComputeInstanceId,
975 policies: Vec<(GlobalId, ReadPolicy)>,
976 ) -> Result<(), ReadPolicyError> {
977 use ReadPolicyError::*;
978
979 let instance = self.instance(instance_id)?;
980
981 for (id, _) in &policies {
983 let collection = instance.collection(*id)?;
984 if collection.write_only {
985 return Err(WriteOnlyCollection(*id));
986 }
987 }
988
989 self.instance(instance_id)?
990 .call(|i| i.set_read_policy(policies).expect("validated"));
991
992 Ok(())
993 }
994
995 pub fn acquire_read_hold(
997 &self,
998 instance_id: ComputeInstanceId,
999 collection_id: GlobalId,
1000 ) -> Result<ReadHold, CollectionUpdateError> {
1001 let read_hold = self
1002 .instance(instance_id)?
1003 .acquire_read_hold(collection_id)?;
1004 Ok(read_hold)
1005 }
1006
1007 fn determine_time_dependence(
1009 &self,
1010 instance_id: ComputeInstanceId,
1011 dataflow: &DataflowDescription<mz_compute_types::plan::Plan, ()>,
1012 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
1013 let instance = self
1014 .instance(instance_id)
1015 .map_err(|err| TimeDependenceError::InstanceMissing(err.0))?;
1016 let mut time_dependencies = Vec::new();
1017
1018 for id in dataflow.imported_index_ids() {
1019 let dependence = instance
1020 .get_time_dependence(id)
1021 .map_err(|err| TimeDependenceError::CollectionMissing(err.0))?;
1022 time_dependencies.push(dependence);
1023 }
1024
1025 'source: for id in dataflow.imported_source_ids() {
1026 for instance in self.instances.values() {
1029 if let Ok(dependence) = instance.get_time_dependence(id) {
1030 time_dependencies.push(dependence);
1031 continue 'source;
1032 }
1033 }
1034
1035 time_dependencies.push(self.storage_collections.determine_time_dependence(id)?);
1037 }
1038
1039 Ok(TimeDependence::merge(
1040 time_dependencies,
1041 dataflow.refresh_schedule.as_ref(),
1042 ))
1043 }
1044
1045 #[mz_ore::instrument(level = "debug")]
1047 pub fn process(&mut self) -> Option<ComputeControllerResponse> {
1048 if self.maintenance_scheduled {
1050 self.maintain();
1051 self.maintenance_scheduled = false;
1052 }
1053
1054 self.stashed_response.take()
1056 }
1057
1058 #[mz_ore::instrument(level = "debug")]
1059 fn maintain(&mut self) {
1060 for instance in self.instances.values_mut() {
1062 instance.call(Instance::maintain);
1063 }
1064 }
1065
1066 pub fn allow_writes(
1070 &mut self,
1071 instance_id: ComputeInstanceId,
1072 collection_id: GlobalId,
1073 ) -> Result<(), CollectionUpdateError> {
1074 if self.read_only {
1075 tracing::debug!("Skipping allow_writes in read-only mode");
1076 return Ok(());
1077 }
1078
1079 let instance = self.instance_mut(instance_id)?;
1080
1081 instance.collection(collection_id)?;
1083
1084 instance.call(move |i| i.allow_writes(collection_id).expect("validated"));
1085
1086 Ok(())
1087 }
1088}
1089
1090#[derive(Debug)]
1091struct InstanceState {
1092 client: InstanceClient,
1093 replicas: BTreeSet<ReplicaId>,
1094 collections: BTreeMap<GlobalId, Collection>,
1095}
1096
1097impl InstanceState {
1098 fn new(client: InstanceClient, collections: BTreeMap<GlobalId, Collection>) -> Self {
1099 Self {
1100 client,
1101 replicas: Default::default(),
1102 collections,
1103 }
1104 }
1105
1106 fn collection(&self, id: GlobalId) -> Result<&Collection, CollectionMissing> {
1107 self.collections.get(&id).ok_or(CollectionMissing(id))
1108 }
1109
1110 fn call<F>(&self, f: F)
1116 where
1117 F: FnOnce(&mut Instance) + Send + 'static,
1118 {
1119 self.client.call(f).expect("instance not dropped")
1120 }
1121
1122 async fn call_sync<F, R>(&self, f: F) -> R
1128 where
1129 F: FnOnce(&mut Instance) -> R + Send + 'static,
1130 R: Send + 'static,
1131 {
1132 self.client
1133 .call_sync(f)
1134 .await
1135 .expect("instance not dropped")
1136 }
1137
1138 pub fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold, CollectionMissing> {
1140 let collection = self.collection(id)?;
1150 let since = collection.shared.lock_read_capabilities(|caps| {
1151 let since = caps.frontier().to_owned();
1152 caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
1153 since
1154 });
1155
1156 let hold = ReadHold::new(id, since, self.client.read_hold_tx());
1157 Ok(hold)
1158 }
1159
1160 fn get_time_dependence(
1162 &self,
1163 id: GlobalId,
1164 ) -> Result<Option<TimeDependence>, CollectionMissing> {
1165 Ok(self.collection(id)?.time_dependence.clone())
1166 }
1167
1168 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1170 let Self {
1172 client: _,
1173 replicas,
1174 collections,
1175 } = self;
1176
1177 let instance = self.call_sync(|i| i.dump()).await?;
1178 let replicas: Vec<_> = replicas.iter().map(|id| id.to_string()).collect();
1179 let collections: BTreeMap<_, _> = collections
1180 .iter()
1181 .map(|(id, c)| (id.to_string(), format!("{c:?}")))
1182 .collect();
1183
1184 Ok(serde_json::json!({
1185 "instance": instance,
1186 "replicas": replicas,
1187 "collections": collections,
1188 }))
1189 }
1190}
1191
1192#[derive(Debug)]
1193struct Collection {
1194 write_only: bool,
1196 compute_dependencies: BTreeSet<GlobalId>,
1197 shared: SharedCollectionState,
1198 time_dependence: Option<TimeDependence>,
1201}
1202
1203impl Collection {
1204 fn new_log() -> Self {
1205 let as_of = Antichain::from_elem(Timestamp::MIN);
1206 Self {
1207 write_only: false,
1208 compute_dependencies: Default::default(),
1209 shared: SharedCollectionState::new(as_of),
1210 time_dependence: Some(TimeDependence::default()),
1211 }
1212 }
1213
1214 fn frontiers(&self) -> CollectionFrontiers {
1215 let read_frontier = self
1216 .shared
1217 .lock_read_capabilities(|c| c.frontier().to_owned());
1218 let write_frontier = self.shared.lock_write_frontier(|f| f.clone());
1219 CollectionFrontiers {
1220 read_frontier,
1221 write_frontier,
1222 }
1223 }
1224}
1225
1226#[derive(Clone, Debug)]
1228pub struct CollectionFrontiers {
1229 pub read_frontier: Antichain<Timestamp>,
1231 pub write_frontier: Antichain<Timestamp>,
1233}
1234
1235impl Default for CollectionFrontiers {
1236 fn default() -> Self {
1237 Self {
1238 read_frontier: Antichain::from_elem(Timestamp::MIN),
1239 write_frontier: Antichain::from_elem(Timestamp::MIN),
1240 }
1241 }
1242}