1use std::collections::{BTreeMap, BTreeSet};
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use mz_build_info::BuildInfo;
40use mz_cluster_client::client::ClusterReplicaLocation;
41use mz_cluster_client::metrics::ControllerMetrics;
42use mz_cluster_client::{ReplicaId, WallclockLagFn};
43use mz_compute_types::ComputeInstanceId;
44use mz_compute_types::config::ComputeReplicaConfig;
45use mz_compute_types::dataflows::DataflowDescription;
46use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
47use mz_dyncfg::ConfigSet;
48use mz_expr::RowSetFinishing;
49use mz_ore::cast::CastFrom;
50use mz_ore::collections::CollectionExt;
51use mz_ore::metrics::MetricsRegistry;
52use mz_ore::now::NowFn;
53use mz_ore::tracing::OpenTelemetryContext;
54use mz_persist_types::PersistLocation;
55use mz_repr::{Datum, GlobalId, RelationDesc, Row, TimestampManipulation};
56use mz_storage_client::controller::StorageController;
57use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
58use mz_storage_types::read_holds::ReadHold;
59use mz_storage_types::read_policy::ReadPolicy;
60use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
61use prometheus::proto::LabelPair;
62use serde::{Deserialize, Serialize};
63use timely::PartialOrder;
64use timely::progress::{Antichain, Timestamp};
65use tokio::sync::{mpsc, oneshot};
66use tokio::time::{self, MissedTickBehavior};
67use uuid::Uuid;
68
69use crate::controller::error::{
70 CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
71 HydrationCheckBadTarget, InstanceExists, InstanceMissing, PeekError, ReadPolicyError,
72 ReplicaCreationError, ReplicaDropError,
73};
74use crate::controller::instance::{Instance, SharedCollectionState};
75use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection_sink};
76use crate::controller::replica::ReplicaConfig;
77use crate::logging::{LogVariant, LoggingConfig};
78use crate::metrics::ComputeControllerMetrics;
79use crate::protocol::command::{ComputeParameters, PeekTarget};
80use crate::protocol::response::{PeekResponse, SubscribeBatch};
81
82mod instance;
83mod introspection;
84mod replica;
85mod sequential_hydration;
86
87pub mod error;
88pub mod instance_client;
89pub use instance_client::InstanceClient;
90
91pub(crate) type StorageCollections<T> = Arc<
92 dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T> + Send + Sync,
93>;
94
95pub trait ComputeControllerTimestamp: TimestampManipulation + Into<Datum<'static>> + Sync {}
98
99impl ComputeControllerTimestamp for mz_repr::Timestamp {}
100
101#[derive(Debug)]
103pub enum ComputeControllerResponse<T> {
104 PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
106 SubscribeResponse(GlobalId, SubscribeBatch<T>),
108 CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
119 FrontierUpper {
124 id: GlobalId,
126 upper: Antichain<T>,
128 },
129}
130
131#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
133pub enum PeekNotification {
134 Success {
136 rows: u64,
138 result_size: u64,
140 },
141 Error(String),
143 Canceled,
145}
146
147impl PeekNotification {
148 fn new(peek_response: &PeekResponse, offset: usize, limit: Option<usize>) -> Self {
151 match peek_response {
152 PeekResponse::Rows(rows) => {
153 let num_rows = u64::cast_from(rows.count(offset, limit));
154 let result_size = u64::cast_from(rows.byte_len());
155
156 tracing::trace!(?num_rows, ?result_size, "inline peek result");
157
158 Self::Success {
159 rows: num_rows,
160 result_size,
161 }
162 }
163 PeekResponse::Stashed(stashed_response) => {
164 let rows = stashed_response.num_rows(offset, limit);
165 let result_size = stashed_response.size_bytes();
166
167 tracing::trace!(?rows, ?result_size, "stashed peek result");
168
169 Self::Success {
170 rows: u64::cast_from(rows),
171 result_size: u64::cast_from(result_size),
172 }
173 }
174 PeekResponse::Error(err) => Self::Error(err.clone()),
175 PeekResponse::Canceled => Self::Canceled,
176 }
177 }
178}
179
180pub struct ComputeController<T: ComputeControllerTimestamp> {
182 instances: BTreeMap<ComputeInstanceId, InstanceState<T>>,
183 instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>,
187 build_info: &'static BuildInfo,
188 storage_collections: StorageCollections<T>,
190 initialized: bool,
192 read_only: bool,
198 config: ComputeParameters,
200 peek_stash_persist_location: PersistLocation,
202 stashed_response: Option<ComputeControllerResponse<T>>,
204 metrics: ComputeControllerMetrics,
206 now: NowFn,
208 wallclock_lag: WallclockLagFn<T>,
210 dyncfg: Arc<ConfigSet>,
215
216 response_rx: mpsc::UnboundedReceiver<ComputeControllerResponse<T>>,
218 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
220 introspection_rx: Option<mpsc::UnboundedReceiver<IntrospectionUpdates>>,
225 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
227
228 maintenance_ticker: tokio::time::Interval,
230 maintenance_scheduled: bool,
232}
233
234impl<T: ComputeControllerTimestamp> ComputeController<T> {
235 pub fn new(
237 build_info: &'static BuildInfo,
238 storage_collections: StorageCollections<T>,
239 read_only: bool,
240 metrics_registry: &MetricsRegistry,
241 peek_stash_persist_location: PersistLocation,
242 controller_metrics: ControllerMetrics,
243 now: NowFn,
244 wallclock_lag: WallclockLagFn<T>,
245 ) -> Self {
246 let (response_tx, response_rx) = mpsc::unbounded_channel();
247 let (introspection_tx, introspection_rx) = mpsc::unbounded_channel();
248
249 let mut maintenance_ticker = time::interval(Duration::from_secs(1));
250 maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
251
252 let instance_workload_classes = Arc::new(Mutex::new(BTreeMap::<
253 ComputeInstanceId,
254 Option<String>,
255 >::new()));
256
257 metrics_registry.register_postprocessor({
261 let instance_workload_classes = Arc::clone(&instance_workload_classes);
262 move |metrics| {
263 let instance_workload_classes = instance_workload_classes
264 .lock()
265 .expect("lock poisoned")
266 .iter()
267 .map(|(id, workload_class)| (id.to_string(), workload_class.clone()))
268 .collect::<BTreeMap<String, Option<String>>>();
269 for metric in metrics {
270 'metric: for metric in metric.mut_metric() {
271 for label in metric.get_label() {
272 if label.name() == "instance_id" {
273 if let Some(workload_class) = instance_workload_classes
274 .get(label.value())
275 .cloned()
276 .flatten()
277 {
278 let mut label = LabelPair::default();
279 label.set_name("workload_class".into());
280 label.set_value(workload_class.clone());
281
282 let mut labels = metric.take_label();
283 labels.push(label);
284 metric.set_label(labels);
285 }
286 continue 'metric;
287 }
288 }
289 }
290 }
291 }
292 });
293
294 let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics);
295
296 Self {
297 instances: BTreeMap::new(),
298 instance_workload_classes,
299 build_info,
300 storage_collections,
301 initialized: false,
302 read_only,
303 config: Default::default(),
304 peek_stash_persist_location,
305 stashed_response: None,
306 metrics,
307 now,
308 wallclock_lag,
309 dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()),
310 response_rx,
311 response_tx,
312 introspection_rx: Some(introspection_rx),
313 introspection_tx,
314 maintenance_ticker,
315 maintenance_scheduled: false,
316 }
317 }
318
319 pub fn start_introspection_sink(
324 &mut self,
325 storage_controller: &dyn StorageController<Timestamp = T>,
326 ) {
327 if let Some(rx) = self.introspection_rx.take() {
328 spawn_introspection_sink(rx, storage_controller);
329 }
330 }
331
332 pub fn instance_exists(&self, id: ComputeInstanceId) -> bool {
334 self.instances.contains_key(&id)
335 }
336
337 fn instance(&self, id: ComputeInstanceId) -> Result<&InstanceState<T>, InstanceMissing> {
339 self.instances.get(&id).ok_or(InstanceMissing(id))
340 }
341
342 pub fn instance_client(
344 &self,
345 id: ComputeInstanceId,
346 ) -> Result<InstanceClient<T>, InstanceMissing> {
347 self.instance(id).map(|instance| instance.client.clone())
348 }
349
350 fn instance_mut(
352 &mut self,
353 id: ComputeInstanceId,
354 ) -> Result<&mut InstanceState<T>, InstanceMissing> {
355 self.instances.get_mut(&id).ok_or(InstanceMissing(id))
356 }
357
358 pub fn collection_ids(
360 &self,
361 instance_id: ComputeInstanceId,
362 ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
363 let instance = self.instance(instance_id)?;
364 let ids = instance.collections.keys().copied();
365 Ok(ids)
366 }
367
368 pub fn collection_frontiers(
373 &self,
374 collection_id: GlobalId,
375 instance_id: Option<ComputeInstanceId>,
376 ) -> Result<CollectionFrontiers<T>, CollectionLookupError> {
377 let collection = match instance_id {
378 Some(id) => self.instance(id)?.collection(collection_id)?,
379 None => self
380 .instances
381 .values()
382 .find_map(|i| i.collections.get(&collection_id))
383 .ok_or(CollectionMissing(collection_id))?,
384 };
385
386 Ok(collection.frontiers())
387 }
388
389 pub fn collection_reverse_dependencies(
391 &self,
392 instance_id: ComputeInstanceId,
393 id: GlobalId,
394 ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
395 let instance = self.instance(instance_id)?;
396 let collections = instance.collections.iter();
397 let ids = collections
398 .filter_map(move |(cid, c)| c.compute_dependencies.contains(&id).then_some(*cid));
399 Ok(ids)
400 }
401
402 pub async fn collection_hydrated(
408 &self,
409 instance_id: ComputeInstanceId,
410 collection_id: GlobalId,
411 ) -> Result<bool, anyhow::Error> {
412 let instance = self.instance(instance_id)?;
413
414 let res = instance
415 .call_sync(move |i| i.collection_hydrated(collection_id))
416 .await?;
417
418 Ok(res)
419 }
420
421 pub fn collections_hydrated_for_replicas(
428 &self,
429 instance_id: ComputeInstanceId,
430 replicas: Vec<ReplicaId>,
431 exclude_collections: BTreeSet<GlobalId>,
432 ) -> Result<oneshot::Receiver<bool>, anyhow::Error> {
433 let instance = self.instance(instance_id)?;
434
435 if instance.replicas.is_empty() && !replicas.iter().any(|id| instance.replicas.contains(id))
437 {
438 return Err(HydrationCheckBadTarget(replicas).into());
439 }
440
441 let (tx, rx) = oneshot::channel();
442 instance.call(move |i| {
443 let result = i
444 .collections_hydrated_on_replicas(Some(replicas), &exclude_collections)
445 .expect("validated");
446 let _ = tx.send(result);
447 });
448
449 Ok(rx)
450 }
451
452 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
456 let Self {
463 instances,
464 instance_workload_classes,
465 build_info: _,
466 storage_collections: _,
467 initialized,
468 read_only,
469 config: _,
470 peek_stash_persist_location: _,
471 stashed_response,
472 metrics: _,
473 now: _,
474 wallclock_lag: _,
475 dyncfg: _,
476 response_rx: _,
477 response_tx: _,
478 introspection_rx: _,
479 introspection_tx: _,
480 maintenance_ticker: _,
481 maintenance_scheduled,
482 } = self;
483
484 let mut instances_dump = BTreeMap::new();
485 for (id, instance) in instances {
486 let dump = instance.dump().await?;
487 instances_dump.insert(id.to_string(), dump);
488 }
489
490 let instance_workload_classes: BTreeMap<_, _> = instance_workload_classes
491 .lock()
492 .expect("lock poisoned")
493 .iter()
494 .map(|(id, wc)| (id.to_string(), format!("{wc:?}")))
495 .collect();
496
497 Ok(serde_json::json!({
498 "instances": instances_dump,
499 "instance_workload_classes": instance_workload_classes,
500 "initialized": initialized,
501 "read_only": read_only,
502 "stashed_response": format!("{stashed_response:?}"),
503 "maintenance_scheduled": maintenance_scheduled,
504 }))
505 }
506}
507
508impl<T> ComputeController<T>
509where
510 T: ComputeControllerTimestamp,
511{
512 pub fn create_instance(
514 &mut self,
515 id: ComputeInstanceId,
516 arranged_logs: BTreeMap<LogVariant, GlobalId>,
517 workload_class: Option<String>,
518 ) -> Result<(), InstanceExists> {
519 if self.instances.contains_key(&id) {
520 return Err(InstanceExists(id));
521 }
522
523 let mut collections = BTreeMap::new();
524 let mut logs = Vec::with_capacity(arranged_logs.len());
525 for (&log, &id) in &arranged_logs {
526 let collection = Collection::new_log();
527 let shared = collection.shared.clone();
528 collections.insert(id, collection);
529 logs.push((log, id, shared));
530 }
531
532 let client = InstanceClient::spawn(
533 id,
534 self.build_info,
535 Arc::clone(&self.storage_collections),
536 self.peek_stash_persist_location.clone(),
537 logs,
538 self.metrics.for_instance(id),
539 self.now.clone(),
540 self.wallclock_lag.clone(),
541 Arc::clone(&self.dyncfg),
542 self.response_tx.clone(),
543 self.introspection_tx.clone(),
544 self.read_only,
545 );
546
547 let instance = InstanceState::new(client, collections);
548 self.instances.insert(id, instance);
549
550 self.instance_workload_classes
551 .lock()
552 .expect("lock poisoned")
553 .insert(id, workload_class.clone());
554
555 let instance = self.instances.get_mut(&id).expect("instance just added");
556 if self.initialized {
557 instance.call(Instance::initialization_complete);
558 }
559
560 let mut config_params = self.config.clone();
561 config_params.workload_class = Some(workload_class);
562 instance.call(|i| i.update_configuration(config_params));
563
564 Ok(())
565 }
566
567 pub fn update_instance_workload_class(
569 &mut self,
570 id: ComputeInstanceId,
571 workload_class: Option<String>,
572 ) -> Result<(), InstanceMissing> {
573 let _ = self.instance(id)?;
575
576 self.instance_workload_classes
577 .lock()
578 .expect("lock poisoned")
579 .insert(id, workload_class);
580
581 self.update_configuration(Default::default());
583
584 Ok(())
585 }
586
587 pub fn drop_instance(&mut self, id: ComputeInstanceId) {
593 if let Some(instance) = self.instances.remove(&id) {
594 instance.call(|i| i.shutdown());
595 }
596
597 self.instance_workload_classes
598 .lock()
599 .expect("lock poisoned")
600 .remove(&id);
601 }
602
603 pub fn dyncfg(&self) -> &Arc<ConfigSet> {
605 &self.dyncfg
606 }
607
608 pub fn update_configuration(&mut self, config_params: ComputeParameters) {
610 config_params.dyncfg_updates.apply(&self.dyncfg);
612
613 let instance_workload_classes = self
614 .instance_workload_classes
615 .lock()
616 .expect("lock poisoned");
617
618 for (id, instance) in self.instances.iter_mut() {
621 let mut params = config_params.clone();
622 params.workload_class = Some(instance_workload_classes[id].clone());
623 instance.call(|i| i.update_configuration(params));
624 }
625
626 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(&self.dyncfg);
627 match overflowing_behavior.parse() {
628 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
629 Err(err) => {
630 tracing::error!(
631 err,
632 overflowing_behavior,
633 "Invalid value for ore_overflowing_behavior"
634 );
635 }
636 }
637
638 self.config.update(config_params);
640 }
641
642 pub fn initialization_complete(&mut self) {
648 self.initialized = true;
649 for instance in self.instances.values_mut() {
650 instance.call(Instance::initialization_complete);
651 }
652 }
653
654 pub async fn ready(&mut self) {
662 if self.stashed_response.is_some() {
663 return;
665 }
666 if self.maintenance_scheduled {
667 return;
669 }
670
671 tokio::select! {
672 resp = self.response_rx.recv() => {
673 let resp = resp.expect("`self.response_tx` not dropped");
674 self.stashed_response = Some(resp);
675 }
676 _ = self.maintenance_ticker.tick() => {
677 self.maintenance_scheduled = true;
678 },
679 }
680 }
681
682 pub fn add_replica_to_instance(
684 &mut self,
685 instance_id: ComputeInstanceId,
686 replica_id: ReplicaId,
687 location: ClusterReplicaLocation,
688 config: ComputeReplicaConfig,
689 ) -> Result<(), ReplicaCreationError> {
690 use ReplicaCreationError::*;
691
692 let instance = self.instance(instance_id)?;
693
694 if instance.replicas.contains(&replica_id) {
696 return Err(ReplicaExists(replica_id));
697 }
698
699 let (enable_logging, interval) = match config.logging.interval {
700 Some(interval) => (true, interval),
701 None => (false, Duration::from_secs(1)),
702 };
703
704 let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.dyncfg);
705
706 let replica_config = ReplicaConfig {
707 location,
708 logging: LoggingConfig {
709 interval,
710 enable_logging,
711 log_logging: config.logging.log_logging,
712 index_logs: Default::default(),
713 },
714 grpc_client: self.config.grpc_client.clone(),
715 expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
716 };
717
718 let instance = self.instance_mut(instance_id).expect("validated");
719 instance.replicas.insert(replica_id);
720
721 instance.call(move |i| {
722 i.add_replica(replica_id, replica_config, None)
723 .expect("validated")
724 });
725
726 Ok(())
727 }
728
729 pub fn drop_replica(
731 &mut self,
732 instance_id: ComputeInstanceId,
733 replica_id: ReplicaId,
734 ) -> Result<(), ReplicaDropError> {
735 use ReplicaDropError::*;
736
737 let instance = self.instance_mut(instance_id)?;
738
739 if !instance.replicas.contains(&replica_id) {
741 return Err(ReplicaMissing(replica_id));
742 }
743
744 instance.replicas.remove(&replica_id);
745
746 instance.call(move |i| i.remove_replica(replica_id).expect("validated"));
747
748 Ok(())
749 }
750
751 pub fn create_dataflow(
757 &mut self,
758 instance_id: ComputeInstanceId,
759 mut dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
760 subscribe_target_replica: Option<ReplicaId>,
761 ) -> Result<(), DataflowCreationError> {
762 use DataflowCreationError::*;
763
764 let instance = self.instance(instance_id)?;
765
766 if let Some(replica_id) = subscribe_target_replica {
768 if !instance.replicas.contains(&replica_id) {
769 return Err(ReplicaMissing(replica_id));
770 }
771 }
772
773 let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
775 if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
776 return Err(EmptyAsOfForSubscribe);
777 }
778 if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
779 return Err(EmptyAsOfForCopyTo);
780 }
781
782 let storage_ids = dataflow.imported_source_ids().collect();
784 let mut import_read_holds = self.storage_collections.acquire_read_holds(storage_ids)?;
785 for id in dataflow.imported_index_ids() {
786 let read_hold = instance.acquire_read_hold(id)?;
787 import_read_holds.push(read_hold);
788 }
789 for hold in &import_read_holds {
790 if PartialOrder::less_than(as_of, hold.since()) {
791 return Err(SinceViolation(hold.id()));
792 }
793 }
794
795 for id in dataflow.persist_sink_ids() {
797 if self.storage_collections.check_exists(id).is_err() {
798 return Err(CollectionMissing(id));
799 }
800 }
801 let time_dependence = self
802 .determine_time_dependence(instance_id, &dataflow)
803 .expect("must exist");
804
805 let instance = self.instance_mut(instance_id).expect("validated");
806
807 let mut shared_collection_state = BTreeMap::new();
808 for id in dataflow.export_ids() {
809 let shared = SharedCollectionState::new(as_of.clone());
810 let collection = Collection {
811 write_only: dataflow.sink_exports.contains_key(&id),
812 compute_dependencies: dataflow.imported_index_ids().collect(),
813 shared: shared.clone(),
814 time_dependence: time_dependence.clone(),
815 };
816 instance.collections.insert(id, collection);
817 shared_collection_state.insert(id, shared);
818 }
819
820 dataflow.time_dependence = time_dependence;
821
822 instance.call(move |i| {
823 i.create_dataflow(
824 dataflow,
825 import_read_holds,
826 subscribe_target_replica,
827 shared_collection_state,
828 )
829 .expect("validated")
830 });
831
832 Ok(())
833 }
834
835 pub fn drop_collections(
838 &mut self,
839 instance_id: ComputeInstanceId,
840 collection_ids: Vec<GlobalId>,
841 ) -> Result<(), CollectionUpdateError> {
842 let instance = self.instance_mut(instance_id)?;
843
844 for id in &collection_ids {
846 instance.collection(*id)?;
847 }
848
849 for id in &collection_ids {
850 instance.collections.remove(id);
851 }
852
853 instance.call(|i| i.drop_collections(collection_ids).expect("validated"));
854
855 Ok(())
856 }
857
858 pub fn peek(
860 &self,
861 instance_id: ComputeInstanceId,
862 peek_target: PeekTarget,
863 literal_constraints: Option<Vec<Row>>,
864 uuid: Uuid,
865 timestamp: T,
866 result_desc: RelationDesc,
867 finishing: RowSetFinishing,
868 map_filter_project: mz_expr::SafeMfpPlan,
869 target_replica: Option<ReplicaId>,
870 peek_response_tx: oneshot::Sender<PeekResponse>,
871 ) -> Result<(), PeekError> {
872 use PeekError::*;
873
874 let instance = self.instance(instance_id)?;
875
876 if let Some(replica_id) = target_replica {
878 if !instance.replicas.contains(&replica_id) {
879 return Err(ReplicaMissing(replica_id));
880 }
881 }
882
883 let read_hold = match &peek_target {
885 PeekTarget::Index { id } => instance.acquire_read_hold(*id)?,
886 PeekTarget::Persist { id, .. } => self
887 .storage_collections
888 .acquire_read_holds(vec![*id])?
889 .into_element(),
890 };
891 if !read_hold.since().less_equal(×tamp) {
892 return Err(SinceViolation(peek_target.id()));
893 }
894
895 instance.call(move |i| {
896 i.peek(
897 peek_target,
898 literal_constraints,
899 uuid,
900 timestamp,
901 result_desc,
902 finishing,
903 map_filter_project,
904 read_hold,
905 target_replica,
906 peek_response_tx,
907 )
908 .expect("validated")
909 });
910
911 Ok(())
912 }
913
914 pub fn cancel_peek(
924 &self,
925 instance_id: ComputeInstanceId,
926 uuid: Uuid,
927 reason: PeekResponse,
928 ) -> Result<(), InstanceMissing> {
929 self.instance(instance_id)?
930 .call(move |i| i.cancel_peek(uuid, reason));
931 Ok(())
932 }
933
934 pub fn set_read_policy(
946 &self,
947 instance_id: ComputeInstanceId,
948 policies: Vec<(GlobalId, ReadPolicy<T>)>,
949 ) -> Result<(), ReadPolicyError> {
950 use ReadPolicyError::*;
951
952 let instance = self.instance(instance_id)?;
953
954 for (id, _) in &policies {
956 let collection = instance.collection(*id)?;
957 if collection.write_only {
958 return Err(WriteOnlyCollection(*id));
959 }
960 }
961
962 self.instance(instance_id)?
963 .call(|i| i.set_read_policy(policies).expect("validated"));
964
965 Ok(())
966 }
967
968 pub fn acquire_read_hold(
970 &self,
971 instance_id: ComputeInstanceId,
972 collection_id: GlobalId,
973 ) -> Result<ReadHold<T>, CollectionUpdateError> {
974 let read_hold = self
975 .instance(instance_id)?
976 .acquire_read_hold(collection_id)?;
977 Ok(read_hold)
978 }
979
980 fn determine_time_dependence(
982 &self,
983 instance_id: ComputeInstanceId,
984 dataflow: &DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
985 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
986 let is_continual_task = dataflow.continual_task_ids().next().is_some();
988 if is_continual_task {
989 return Ok(None);
990 }
991
992 let instance = self
993 .instance(instance_id)
994 .map_err(|err| TimeDependenceError::InstanceMissing(err.0))?;
995 let mut time_dependencies = Vec::new();
996
997 for id in dataflow.imported_index_ids() {
998 let dependence = instance
999 .get_time_dependence(id)
1000 .map_err(|err| TimeDependenceError::CollectionMissing(err.0))?;
1001 time_dependencies.push(dependence);
1002 }
1003
1004 'source: for id in dataflow.imported_source_ids() {
1005 for instance in self.instances.values() {
1009 if let Ok(dependence) = instance.get_time_dependence(id) {
1010 time_dependencies.push(dependence);
1011 continue 'source;
1012 }
1013 }
1014
1015 time_dependencies.push(self.storage_collections.determine_time_dependence(id)?);
1017 }
1018
1019 Ok(TimeDependence::merge(
1020 time_dependencies,
1021 dataflow.refresh_schedule.as_ref(),
1022 ))
1023 }
1024
1025 #[mz_ore::instrument(level = "debug")]
1027 pub fn process(&mut self) -> Option<ComputeControllerResponse<T>> {
1028 if self.maintenance_scheduled {
1030 self.maintain();
1031 self.maintenance_scheduled = false;
1032 }
1033
1034 self.stashed_response.take()
1036 }
1037
1038 #[mz_ore::instrument(level = "debug")]
1039 fn maintain(&mut self) {
1040 for instance in self.instances.values_mut() {
1042 instance.call(Instance::maintain);
1043 }
1044 }
1045
1046 pub fn allow_writes(
1050 &mut self,
1051 instance_id: ComputeInstanceId,
1052 collection_id: GlobalId,
1053 ) -> Result<(), CollectionUpdateError> {
1054 if self.read_only {
1055 tracing::debug!("Skipping allow_writes in read-only mode");
1056 return Ok(());
1057 }
1058
1059 let instance = self.instance_mut(instance_id)?;
1060
1061 instance.collection(collection_id)?;
1063
1064 instance.call(move |i| i.allow_writes(collection_id).expect("validated"));
1065
1066 Ok(())
1067 }
1068}
1069
1070#[derive(Debug)]
1071struct InstanceState<T: ComputeControllerTimestamp> {
1072 client: InstanceClient<T>,
1073 replicas: BTreeSet<ReplicaId>,
1074 collections: BTreeMap<GlobalId, Collection<T>>,
1075}
1076
1077impl<T: ComputeControllerTimestamp> InstanceState<T> {
1078 fn new(client: InstanceClient<T>, collections: BTreeMap<GlobalId, Collection<T>>) -> Self {
1079 Self {
1080 client,
1081 replicas: Default::default(),
1082 collections,
1083 }
1084 }
1085
1086 fn collection(&self, id: GlobalId) -> Result<&Collection<T>, CollectionMissing> {
1087 self.collections.get(&id).ok_or(CollectionMissing(id))
1088 }
1089
1090 fn call<F>(&self, f: F)
1096 where
1097 F: FnOnce(&mut Instance<T>) + Send + 'static,
1098 {
1099 self.client.call(f).expect("instance not dropped")
1100 }
1101
1102 async fn call_sync<F, R>(&self, f: F) -> R
1108 where
1109 F: FnOnce(&mut Instance<T>) -> R + Send + 'static,
1110 R: Send + 'static,
1111 {
1112 self.client
1113 .call_sync(f)
1114 .await
1115 .expect("instance not dropped")
1116 }
1117
1118 pub fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
1120 let collection = self.collection(id)?;
1130 let since = collection.shared.lock_read_capabilities(|caps| {
1131 let since = caps.frontier().to_owned();
1132 caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
1133 since
1134 });
1135
1136 let hold = ReadHold::new(id, since, self.client.read_hold_tx());
1137 Ok(hold)
1138 }
1139
1140 fn get_time_dependence(
1142 &self,
1143 id: GlobalId,
1144 ) -> Result<Option<TimeDependence>, CollectionMissing> {
1145 Ok(self.collection(id)?.time_dependence.clone())
1146 }
1147
1148 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1150 let Self {
1152 client: _,
1153 replicas,
1154 collections,
1155 } = self;
1156
1157 let instance = self.call_sync(|i| i.dump()).await?;
1158 let replicas: Vec<_> = replicas.iter().map(|id| id.to_string()).collect();
1159 let collections: BTreeMap<_, _> = collections
1160 .iter()
1161 .map(|(id, c)| (id.to_string(), format!("{c:?}")))
1162 .collect();
1163
1164 Ok(serde_json::json!({
1165 "instance": instance,
1166 "replicas": replicas,
1167 "collections": collections,
1168 }))
1169 }
1170}
1171
1172#[derive(Debug)]
1173struct Collection<T> {
1174 write_only: bool,
1175 compute_dependencies: BTreeSet<GlobalId>,
1176 shared: SharedCollectionState<T>,
1177 time_dependence: Option<TimeDependence>,
1180}
1181
1182impl<T: Timestamp> Collection<T> {
1183 fn new_log() -> Self {
1184 let as_of = Antichain::from_elem(T::minimum());
1185 Self {
1186 write_only: false,
1187 compute_dependencies: Default::default(),
1188 shared: SharedCollectionState::new(as_of),
1189 time_dependence: Some(TimeDependence::default()),
1190 }
1191 }
1192
1193 fn frontiers(&self) -> CollectionFrontiers<T> {
1194 let read_frontier = self
1195 .shared
1196 .lock_read_capabilities(|c| c.frontier().to_owned());
1197 let write_frontier = self.shared.lock_write_frontier(|f| f.clone());
1198 CollectionFrontiers {
1199 read_frontier,
1200 write_frontier,
1201 }
1202 }
1203}
1204
1205#[derive(Clone, Debug)]
1207pub struct CollectionFrontiers<T> {
1208 pub read_frontier: Antichain<T>,
1210 pub write_frontier: Antichain<T>,
1212}
1213
1214impl<T: Timestamp> Default for CollectionFrontiers<T> {
1215 fn default() -> Self {
1216 Self {
1217 read_frontier: Antichain::from_elem(T::minimum()),
1218 write_frontier: Antichain::from_elem(T::minimum()),
1219 }
1220 }
1221}