1use std::collections::{BTreeMap, BTreeSet};
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use mz_build_info::BuildInfo;
40use mz_cluster_client::client::ClusterReplicaLocation;
41use mz_cluster_client::metrics::ControllerMetrics;
42use mz_cluster_client::{ReplicaId, WallclockLagFn};
43use mz_compute_types::ComputeInstanceId;
44use mz_compute_types::config::ComputeReplicaConfig;
45use mz_compute_types::dataflows::DataflowDescription;
46use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
47use mz_dyncfg::ConfigSet;
48use mz_expr::RowSetFinishing;
49use mz_expr::row::RowCollection;
50use mz_ore::cast::CastFrom;
51use mz_ore::collections::CollectionExt;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::NowFn;
54use mz_ore::tracing::OpenTelemetryContext;
55use mz_persist_types::PersistLocation;
56use mz_repr::{GlobalId, RelationDesc, Row, Timestamp};
57use mz_storage_client::controller::StorageController;
58use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
59use mz_storage_types::read_holds::ReadHold;
60use mz_storage_types::read_policy::ReadPolicy;
61use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
62use prometheus::proto::LabelPair;
63use serde::{Deserialize, Serialize};
64use timely::PartialOrder;
65use timely::progress::Antichain;
66use tokio::sync::{mpsc, oneshot};
67use tokio::time::{self, MissedTickBehavior};
68use uuid::Uuid;
69
70use crate::controller::error::{
71 CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
72 HydrationCheckBadTarget, InstanceExists, InstanceMissing, PeekError, ReadPolicyError,
73 ReplicaCreationError, ReplicaDropError,
74};
75use crate::controller::instance::{Instance, SharedCollectionState};
76use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection_sink};
77use crate::controller::replica::ReplicaConfig;
78use crate::logging::{LogVariant, LoggingConfig};
79use crate::metrics::ComputeControllerMetrics;
80use crate::protocol::command::{ComputeParameters, PeekTarget};
81use crate::protocol::response::{PeekResponse, SubscribeBatch};
82
83mod instance;
84mod introspection;
85mod replica;
86mod sequential_hydration;
87
88pub mod error;
89pub mod instance_client;
90pub use instance_client::InstanceClient;
91
92pub(crate) type StorageCollections =
93 Arc<dyn mz_storage_client::storage_collections::StorageCollections + Send + Sync>;
94
95#[derive(Debug)]
97pub enum ComputeControllerResponse {
98 PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
100 SubscribeResponse(GlobalId, SubscribeBatch),
102 CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
113 FrontierUpper {
118 id: GlobalId,
120 upper: Antichain<Timestamp>,
122 },
123}
124
125#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
127pub enum PeekNotification {
128 Success {
130 rows: u64,
132 result_size: u64,
134 },
135 Error(String),
137 Canceled,
139}
140
141impl PeekNotification {
142 fn new(peek_response: &PeekResponse, offset: usize, limit: Option<usize>) -> Self {
145 match peek_response {
146 PeekResponse::Rows(rows) => {
147 let num_rows = u64::cast_from(RowCollection::offset_limit(
148 rows.iter().map(|r| r.count()).sum(),
149 offset,
150 limit,
151 ));
152 let result_size = u64::cast_from(rows.iter().map(|r| r.byte_len()).sum::<usize>());
153
154 tracing::trace!(?num_rows, ?result_size, "inline peek result");
155
156 Self::Success {
157 rows: num_rows,
158 result_size,
159 }
160 }
161 PeekResponse::Stashed(stashed_response) => {
162 let rows = stashed_response.num_rows(offset, limit);
163 let result_size = stashed_response.size_bytes();
164
165 tracing::trace!(?rows, ?result_size, "stashed peek result");
166
167 Self::Success {
168 rows: u64::cast_from(rows),
169 result_size: u64::cast_from(result_size),
170 }
171 }
172 PeekResponse::Error(err) => Self::Error(err.clone()),
173 PeekResponse::Canceled => Self::Canceled,
174 }
175 }
176}
177
178pub struct ComputeController {
180 instances: BTreeMap<ComputeInstanceId, InstanceState>,
181 instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>,
185 build_info: &'static BuildInfo,
186 storage_collections: StorageCollections,
188 initialized: bool,
190 read_only: bool,
196 config: ComputeParameters,
198 peek_stash_persist_location: PersistLocation,
200 stashed_response: Option<ComputeControllerResponse>,
202 metrics: ComputeControllerMetrics,
204 now: NowFn,
206 wallclock_lag: WallclockLagFn<Timestamp>,
208 dyncfg: Arc<ConfigSet>,
213
214 response_rx: mpsc::UnboundedReceiver<ComputeControllerResponse>,
216 response_tx: mpsc::UnboundedSender<ComputeControllerResponse>,
218 introspection_rx: Option<mpsc::UnboundedReceiver<IntrospectionUpdates>>,
223 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
225
226 maintenance_ticker: tokio::time::Interval,
228 maintenance_scheduled: bool,
230}
231
232impl ComputeController {
233 pub fn new(
235 build_info: &'static BuildInfo,
236 storage_collections: StorageCollections,
237 read_only: bool,
238 metrics_registry: &MetricsRegistry,
239 peek_stash_persist_location: PersistLocation,
240 controller_metrics: ControllerMetrics,
241 now: NowFn,
242 wallclock_lag: WallclockLagFn<Timestamp>,
243 ) -> Self {
244 let (response_tx, response_rx) = mpsc::unbounded_channel();
245 let (introspection_tx, introspection_rx) = mpsc::unbounded_channel();
246
247 let mut maintenance_ticker = time::interval(Duration::from_secs(1));
248 maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
249
250 let instance_workload_classes = Arc::new(Mutex::new(BTreeMap::<
251 ComputeInstanceId,
252 Option<String>,
253 >::new()));
254
255 metrics_registry.register_postprocessor({
259 let instance_workload_classes = Arc::clone(&instance_workload_classes);
260 move |metrics| {
261 let instance_workload_classes = instance_workload_classes
262 .lock()
263 .expect("lock poisoned")
264 .iter()
265 .map(|(id, workload_class)| (id.to_string(), workload_class.clone()))
266 .collect::<BTreeMap<String, Option<String>>>();
267 for metric in metrics {
268 'metric: for metric in metric.mut_metric() {
269 for label in metric.get_label() {
270 if label.name() == "instance_id" {
271 if let Some(workload_class) = instance_workload_classes
272 .get(label.value())
273 .cloned()
274 .flatten()
275 {
276 let mut label = LabelPair::default();
277 label.set_name("workload_class".into());
278 label.set_value(workload_class.clone());
279
280 let mut labels = metric.take_label();
281 labels.push(label);
282 metric.set_label(labels);
283 }
284 continue 'metric;
285 }
286 }
287 }
288 }
289 }
290 });
291
292 let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics);
293
294 Self {
295 instances: BTreeMap::new(),
296 instance_workload_classes,
297 build_info,
298 storage_collections,
299 initialized: false,
300 read_only,
301 config: Default::default(),
302 peek_stash_persist_location,
303 stashed_response: None,
304 metrics,
305 now,
306 wallclock_lag,
307 dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()),
308 response_rx,
309 response_tx,
310 introspection_rx: Some(introspection_rx),
311 introspection_tx,
312 maintenance_ticker,
313 maintenance_scheduled: false,
314 }
315 }
316
317 pub fn start_introspection_sink(&mut self, storage_controller: &dyn StorageController) {
322 if let Some(rx) = self.introspection_rx.take() {
323 spawn_introspection_sink(rx, storage_controller);
324 }
325 }
326
327 pub fn instance_exists(&self, id: ComputeInstanceId) -> bool {
329 self.instances.contains_key(&id)
330 }
331
332 fn instance(&self, id: ComputeInstanceId) -> Result<&InstanceState, InstanceMissing> {
334 self.instances.get(&id).ok_or(InstanceMissing(id))
335 }
336
337 pub fn instance_client(
339 &self,
340 id: ComputeInstanceId,
341 ) -> Result<InstanceClient, InstanceMissing> {
342 self.instance(id).map(|instance| instance.client.clone())
343 }
344
345 fn instance_mut(
347 &mut self,
348 id: ComputeInstanceId,
349 ) -> Result<&mut InstanceState, InstanceMissing> {
350 self.instances.get_mut(&id).ok_or(InstanceMissing(id))
351 }
352
353 pub fn collection_ids(
355 &self,
356 instance_id: ComputeInstanceId,
357 ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
358 let instance = self.instance(instance_id)?;
359 let ids = instance.collections.keys().copied();
360 Ok(ids)
361 }
362
363 pub fn collection_frontiers(
368 &self,
369 collection_id: GlobalId,
370 instance_id: Option<ComputeInstanceId>,
371 ) -> Result<CollectionFrontiers, CollectionLookupError> {
372 let collection = match instance_id {
373 Some(id) => self.instance(id)?.collection(collection_id)?,
374 None => self
375 .instances
376 .values()
377 .find_map(|i| i.collections.get(&collection_id))
378 .ok_or(CollectionMissing(collection_id))?,
379 };
380
381 Ok(collection.frontiers())
382 }
383
384 pub fn collection_reverse_dependencies(
386 &self,
387 instance_id: ComputeInstanceId,
388 id: GlobalId,
389 ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
390 let instance = self.instance(instance_id)?;
391 let collections = instance.collections.iter();
392 let ids = collections
393 .filter_map(move |(cid, c)| c.compute_dependencies.contains(&id).then_some(*cid));
394 Ok(ids)
395 }
396
397 pub async fn collection_hydrated(
403 &self,
404 instance_id: ComputeInstanceId,
405 collection_id: GlobalId,
406 ) -> Result<bool, anyhow::Error> {
407 let instance = self.instance(instance_id)?;
408
409 let res = instance
410 .call_sync(move |i| i.collection_hydrated(collection_id))
411 .await?;
412
413 Ok(res)
414 }
415
416 pub fn collections_hydrated_for_replicas(
423 &self,
424 instance_id: ComputeInstanceId,
425 replicas: Vec<ReplicaId>,
426 exclude_collections: BTreeSet<GlobalId>,
427 ) -> Result<oneshot::Receiver<bool>, anyhow::Error> {
428 let instance = self.instance(instance_id)?;
429
430 if !instance.replicas.is_empty()
432 && !replicas.iter().any(|id| instance.replicas.contains(id))
433 {
434 return Err(HydrationCheckBadTarget(replicas).into());
435 }
436
437 let (tx, rx) = oneshot::channel();
438 instance.call(move |i| {
439 let result = i
440 .collections_hydrated_on_replicas(Some(replicas), &exclude_collections)
441 .expect("validated");
442 let _ = tx.send(result);
443 });
444
445 Ok(rx)
446 }
447
448 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
452 let Self {
459 instances,
460 instance_workload_classes,
461 build_info: _,
462 storage_collections: _,
463 initialized,
464 read_only,
465 config: _,
466 peek_stash_persist_location: _,
467 stashed_response,
468 metrics: _,
469 now: _,
470 wallclock_lag: _,
471 dyncfg: _,
472 response_rx: _,
473 response_tx: _,
474 introspection_rx: _,
475 introspection_tx: _,
476 maintenance_ticker: _,
477 maintenance_scheduled,
478 } = self;
479
480 let mut instances_dump = BTreeMap::new();
481 for (id, instance) in instances {
482 let dump = instance.dump().await?;
483 instances_dump.insert(id.to_string(), dump);
484 }
485
486 let instance_workload_classes: BTreeMap<_, _> = instance_workload_classes
487 .lock()
488 .expect("lock poisoned")
489 .iter()
490 .map(|(id, wc)| (id.to_string(), format!("{wc:?}")))
491 .collect();
492
493 Ok(serde_json::json!({
494 "instances": instances_dump,
495 "instance_workload_classes": instance_workload_classes,
496 "initialized": initialized,
497 "read_only": read_only,
498 "stashed_response": format!("{stashed_response:?}"),
499 "maintenance_scheduled": maintenance_scheduled,
500 }))
501 }
502}
503
504impl ComputeController {
505 pub fn create_instance(
507 &mut self,
508 id: ComputeInstanceId,
509 arranged_logs: BTreeMap<LogVariant, GlobalId>,
510 workload_class: Option<String>,
511 ) -> Result<(), InstanceExists> {
512 if self.instances.contains_key(&id) {
513 return Err(InstanceExists(id));
514 }
515
516 let mut collections = BTreeMap::new();
517 let mut logs = Vec::with_capacity(arranged_logs.len());
518 for (&log, &id) in &arranged_logs {
519 let collection = Collection::new_log();
520 let shared = collection.shared.clone();
521 collections.insert(id, collection);
522 logs.push((log, id, shared));
523 }
524
525 let client = InstanceClient::spawn(
526 id,
527 self.build_info,
528 Arc::clone(&self.storage_collections),
529 self.peek_stash_persist_location.clone(),
530 logs,
531 self.metrics.for_instance(id),
532 self.now.clone(),
533 self.wallclock_lag.clone(),
534 Arc::clone(&self.dyncfg),
535 self.response_tx.clone(),
536 self.introspection_tx.clone(),
537 self.read_only,
538 );
539
540 let instance = InstanceState::new(client, collections);
541 self.instances.insert(id, instance);
542
543 self.instance_workload_classes
544 .lock()
545 .expect("lock poisoned")
546 .insert(id, workload_class.clone());
547
548 let instance = self.instances.get_mut(&id).expect("instance just added");
549 if self.initialized {
550 instance.call(Instance::initialization_complete);
551 }
552
553 let mut config_params = self.config.clone();
554 config_params.workload_class = Some(workload_class);
555 instance.call(|i| i.update_configuration(config_params));
556
557 Ok(())
558 }
559
560 pub fn update_instance_workload_class(
562 &mut self,
563 id: ComputeInstanceId,
564 workload_class: Option<String>,
565 ) -> Result<(), InstanceMissing> {
566 let _ = self.instance(id)?;
568
569 self.instance_workload_classes
570 .lock()
571 .expect("lock poisoned")
572 .insert(id, workload_class);
573
574 self.update_configuration(Default::default());
576
577 Ok(())
578 }
579
580 pub fn drop_instance(&mut self, id: ComputeInstanceId) {
586 if let Some(instance) = self.instances.remove(&id) {
587 instance.call(|i| i.shutdown());
588 }
589
590 self.instance_workload_classes
591 .lock()
592 .expect("lock poisoned")
593 .remove(&id);
594 }
595
596 pub fn dyncfg(&self) -> &Arc<ConfigSet> {
598 &self.dyncfg
599 }
600
601 pub fn update_configuration(&mut self, config_params: ComputeParameters) {
603 config_params.dyncfg_updates.apply(&self.dyncfg);
605
606 let instance_workload_classes = self
607 .instance_workload_classes
608 .lock()
609 .expect("lock poisoned");
610
611 for (id, instance) in self.instances.iter_mut() {
614 let mut params = config_params.clone();
615 params.workload_class = Some(instance_workload_classes[id].clone());
616 instance.call(|i| i.update_configuration(params));
617 }
618
619 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(&self.dyncfg);
620 match overflowing_behavior.parse() {
621 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
622 Err(err) => {
623 tracing::error!(
624 err,
625 overflowing_behavior,
626 "Invalid value for ore_overflowing_behavior"
627 );
628 }
629 }
630
631 self.config.update(config_params);
633 }
634
635 pub fn initialization_complete(&mut self) {
641 self.initialized = true;
642 for instance in self.instances.values_mut() {
643 instance.call(Instance::initialization_complete);
644 }
645 }
646
647 pub async fn ready(&mut self) {
655 if self.stashed_response.is_some() {
656 return;
658 }
659 if self.maintenance_scheduled {
660 return;
662 }
663
664 tokio::select! {
665 resp = self.response_rx.recv() => {
666 let resp = resp.expect("`self.response_tx` not dropped");
667 self.stashed_response = Some(resp);
668 }
669 _ = self.maintenance_ticker.tick() => {
670 self.maintenance_scheduled = true;
671 },
672 }
673 }
674
675 pub fn add_replica_to_instance(
677 &mut self,
678 instance_id: ComputeInstanceId,
679 replica_id: ReplicaId,
680 location: ClusterReplicaLocation,
681 config: ComputeReplicaConfig,
682 ) -> Result<(), ReplicaCreationError> {
683 use ReplicaCreationError::*;
684
685 let instance = self.instance(instance_id)?;
686
687 if instance.replicas.contains(&replica_id) {
689 return Err(ReplicaExists(replica_id));
690 }
691
692 let (enable_logging, interval) = match config.logging.interval {
693 Some(interval) => (true, interval),
694 None => (false, Duration::from_secs(1)),
695 };
696
697 let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.dyncfg);
698
699 let replica_config = ReplicaConfig {
700 location,
701 logging: LoggingConfig {
702 interval,
703 enable_logging,
704 log_logging: config.logging.log_logging,
705 index_logs: Default::default(),
706 },
707 grpc_client: self.config.grpc_client.clone(),
708 expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
709 };
710
711 let instance = self.instance_mut(instance_id).expect("validated");
712 instance.replicas.insert(replica_id);
713
714 instance.call(move |i| {
715 i.add_replica(replica_id, replica_config, None)
716 .expect("validated")
717 });
718
719 Ok(())
720 }
721
722 pub fn drop_replica(
724 &mut self,
725 instance_id: ComputeInstanceId,
726 replica_id: ReplicaId,
727 ) -> Result<(), ReplicaDropError> {
728 use ReplicaDropError::*;
729
730 let instance = self.instance_mut(instance_id)?;
731
732 if !instance.replicas.contains(&replica_id) {
734 return Err(ReplicaMissing(replica_id));
735 }
736
737 instance.replicas.remove(&replica_id);
738
739 instance.call(move |i| i.remove_replica(replica_id).expect("validated"));
740
741 Ok(())
742 }
743
744 pub fn create_dataflow(
751 &mut self,
752 instance_id: ComputeInstanceId,
753 mut dataflow: DataflowDescription<mz_compute_types::plan::Plan, ()>,
754 target_replica: Option<ReplicaId>,
755 ) -> Result<(), DataflowCreationError> {
756 use DataflowCreationError::*;
757
758 let instance = self.instance(instance_id)?;
759
760 if let Some(replica_id) = target_replica {
762 if !instance.replicas.contains(&replica_id) {
763 return Err(ReplicaMissing(replica_id));
764 }
765 assert!(
766 dataflow.exported_index_ids().next().is_none(),
767 "Replica-targeted indexes are not supported"
768 );
769 }
770
771 let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
773 if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
774 return Err(EmptyAsOfForSubscribe);
775 }
776 if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
777 return Err(EmptyAsOfForCopyTo);
778 }
779
780 let storage_ids = dataflow.imported_source_ids().collect();
782 let mut import_read_holds = self.storage_collections.acquire_read_holds(storage_ids)?;
783 for id in dataflow.imported_index_ids() {
784 let read_hold = instance.acquire_read_hold(id)?;
785 import_read_holds.push(read_hold);
786 }
787 for hold in &import_read_holds {
788 if PartialOrder::less_than(as_of, hold.since()) {
789 return Err(SinceViolation(hold.id()));
790 }
791 }
792
793 for id in dataflow.persist_sink_ids() {
795 if self.storage_collections.check_exists(id).is_err() {
796 return Err(CollectionMissing(id));
797 }
798 }
799 let time_dependence = self
800 .determine_time_dependence(instance_id, &dataflow)
801 .expect("must exist");
802
803 let instance = self.instance_mut(instance_id).expect("validated");
804
805 let mut shared_collection_state = BTreeMap::new();
806 for id in dataflow.export_ids() {
807 let shared = SharedCollectionState::new(as_of.clone());
808 let collection = Collection {
809 write_only: dataflow.sink_exports.contains_key(&id),
810 compute_dependencies: dataflow.imported_index_ids().collect(),
811 shared: shared.clone(),
812 time_dependence: time_dependence.clone(),
813 };
814 instance.collections.insert(id, collection);
815 shared_collection_state.insert(id, shared);
816 }
817
818 dataflow.time_dependence = time_dependence;
819
820 instance.call(move |i| {
821 i.create_dataflow(
822 dataflow,
823 import_read_holds,
824 shared_collection_state,
825 target_replica,
826 )
827 .expect("validated")
828 });
829
830 Ok(())
831 }
832
833 pub fn drop_collections(
836 &mut self,
837 instance_id: ComputeInstanceId,
838 collection_ids: Vec<GlobalId>,
839 ) -> Result<(), CollectionUpdateError> {
840 let instance = self.instance_mut(instance_id)?;
841
842 for id in &collection_ids {
844 instance.collection(*id)?;
845 }
846
847 for id in &collection_ids {
848 instance.collections.remove(id);
849 }
850
851 instance.call(|i| i.drop_collections(collection_ids).expect("validated"));
852
853 Ok(())
854 }
855
856 pub fn peek(
858 &self,
859 instance_id: ComputeInstanceId,
860 peek_target: PeekTarget,
861 literal_constraints: Option<Vec<Row>>,
862 uuid: Uuid,
863 timestamp: Timestamp,
864 result_desc: RelationDesc,
865 finishing: RowSetFinishing,
866 map_filter_project: mz_expr::SafeMfpPlan,
867 target_replica: Option<ReplicaId>,
868 peek_response_tx: oneshot::Sender<PeekResponse>,
869 ) -> Result<(), PeekError> {
870 use PeekError::*;
871
872 let instance = self.instance(instance_id)?;
873
874 if let Some(replica_id) = target_replica {
876 if !instance.replicas.contains(&replica_id) {
877 return Err(ReplicaMissing(replica_id));
878 }
879 }
880
881 let read_hold = match &peek_target {
883 PeekTarget::Index { id } => instance.acquire_read_hold(*id)?,
884 PeekTarget::Persist { id, .. } => self
885 .storage_collections
886 .acquire_read_holds(vec![*id])?
887 .into_element(),
888 };
889 if !read_hold.since().less_equal(×tamp) {
890 return Err(SinceViolation(peek_target.id()));
891 }
892
893 instance.call(move |i| {
894 i.peek(
895 peek_target,
896 literal_constraints,
897 uuid,
898 timestamp,
899 result_desc,
900 finishing,
901 map_filter_project,
902 read_hold,
903 target_replica,
904 peek_response_tx,
905 )
906 .expect("validated")
907 });
908
909 Ok(())
910 }
911
912 pub fn cancel_peek(
922 &self,
923 instance_id: ComputeInstanceId,
924 uuid: Uuid,
925 reason: PeekResponse,
926 ) -> Result<(), InstanceMissing> {
927 self.instance(instance_id)?
928 .call(move |i| i.cancel_peek(uuid, reason));
929 Ok(())
930 }
931
932 pub fn set_read_policy(
944 &self,
945 instance_id: ComputeInstanceId,
946 policies: Vec<(GlobalId, ReadPolicy)>,
947 ) -> Result<(), ReadPolicyError> {
948 use ReadPolicyError::*;
949
950 let instance = self.instance(instance_id)?;
951
952 for (id, _) in &policies {
954 let collection = instance.collection(*id)?;
955 if collection.write_only {
956 return Err(WriteOnlyCollection(*id));
957 }
958 }
959
960 self.instance(instance_id)?
961 .call(|i| i.set_read_policy(policies).expect("validated"));
962
963 Ok(())
964 }
965
966 pub fn acquire_read_hold(
968 &self,
969 instance_id: ComputeInstanceId,
970 collection_id: GlobalId,
971 ) -> Result<ReadHold, CollectionUpdateError> {
972 let read_hold = self
973 .instance(instance_id)?
974 .acquire_read_hold(collection_id)?;
975 Ok(read_hold)
976 }
977
978 fn determine_time_dependence(
980 &self,
981 instance_id: ComputeInstanceId,
982 dataflow: &DataflowDescription<mz_compute_types::plan::Plan, ()>,
983 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
984 let is_continual_task = dataflow.continual_task_ids().next().is_some();
986 if is_continual_task {
987 return Ok(None);
988 }
989
990 let instance = self
991 .instance(instance_id)
992 .map_err(|err| TimeDependenceError::InstanceMissing(err.0))?;
993 let mut time_dependencies = Vec::new();
994
995 for id in dataflow.imported_index_ids() {
996 let dependence = instance
997 .get_time_dependence(id)
998 .map_err(|err| TimeDependenceError::CollectionMissing(err.0))?;
999 time_dependencies.push(dependence);
1000 }
1001
1002 'source: for id in dataflow.imported_source_ids() {
1003 for instance in self.instances.values() {
1007 if let Ok(dependence) = instance.get_time_dependence(id) {
1008 time_dependencies.push(dependence);
1009 continue 'source;
1010 }
1011 }
1012
1013 time_dependencies.push(self.storage_collections.determine_time_dependence(id)?);
1015 }
1016
1017 Ok(TimeDependence::merge(
1018 time_dependencies,
1019 dataflow.refresh_schedule.as_ref(),
1020 ))
1021 }
1022
1023 #[mz_ore::instrument(level = "debug")]
1025 pub fn process(&mut self) -> Option<ComputeControllerResponse> {
1026 if self.maintenance_scheduled {
1028 self.maintain();
1029 self.maintenance_scheduled = false;
1030 }
1031
1032 self.stashed_response.take()
1034 }
1035
1036 #[mz_ore::instrument(level = "debug")]
1037 fn maintain(&mut self) {
1038 for instance in self.instances.values_mut() {
1040 instance.call(Instance::maintain);
1041 }
1042 }
1043
1044 pub fn allow_writes(
1048 &mut self,
1049 instance_id: ComputeInstanceId,
1050 collection_id: GlobalId,
1051 ) -> Result<(), CollectionUpdateError> {
1052 if self.read_only {
1053 tracing::debug!("Skipping allow_writes in read-only mode");
1054 return Ok(());
1055 }
1056
1057 let instance = self.instance_mut(instance_id)?;
1058
1059 instance.collection(collection_id)?;
1061
1062 instance.call(move |i| i.allow_writes(collection_id).expect("validated"));
1063
1064 Ok(())
1065 }
1066}
1067
1068#[derive(Debug)]
1069struct InstanceState {
1070 client: InstanceClient,
1071 replicas: BTreeSet<ReplicaId>,
1072 collections: BTreeMap<GlobalId, Collection>,
1073}
1074
1075impl InstanceState {
1076 fn new(client: InstanceClient, collections: BTreeMap<GlobalId, Collection>) -> Self {
1077 Self {
1078 client,
1079 replicas: Default::default(),
1080 collections,
1081 }
1082 }
1083
1084 fn collection(&self, id: GlobalId) -> Result<&Collection, CollectionMissing> {
1085 self.collections.get(&id).ok_or(CollectionMissing(id))
1086 }
1087
1088 fn call<F>(&self, f: F)
1094 where
1095 F: FnOnce(&mut Instance) + Send + 'static,
1096 {
1097 self.client.call(f).expect("instance not dropped")
1098 }
1099
1100 async fn call_sync<F, R>(&self, f: F) -> R
1106 where
1107 F: FnOnce(&mut Instance) -> R + Send + 'static,
1108 R: Send + 'static,
1109 {
1110 self.client
1111 .call_sync(f)
1112 .await
1113 .expect("instance not dropped")
1114 }
1115
1116 pub fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold, CollectionMissing> {
1118 let collection = self.collection(id)?;
1128 let since = collection.shared.lock_read_capabilities(|caps| {
1129 let since = caps.frontier().to_owned();
1130 caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
1131 since
1132 });
1133
1134 let hold = ReadHold::new(id, since, self.client.read_hold_tx());
1135 Ok(hold)
1136 }
1137
1138 fn get_time_dependence(
1140 &self,
1141 id: GlobalId,
1142 ) -> Result<Option<TimeDependence>, CollectionMissing> {
1143 Ok(self.collection(id)?.time_dependence.clone())
1144 }
1145
1146 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1148 let Self {
1150 client: _,
1151 replicas,
1152 collections,
1153 } = self;
1154
1155 let instance = self.call_sync(|i| i.dump()).await?;
1156 let replicas: Vec<_> = replicas.iter().map(|id| id.to_string()).collect();
1157 let collections: BTreeMap<_, _> = collections
1158 .iter()
1159 .map(|(id, c)| (id.to_string(), format!("{c:?}")))
1160 .collect();
1161
1162 Ok(serde_json::json!({
1163 "instance": instance,
1164 "replicas": replicas,
1165 "collections": collections,
1166 }))
1167 }
1168}
1169
1170#[derive(Debug)]
1171struct Collection {
1172 write_only: bool,
1174 compute_dependencies: BTreeSet<GlobalId>,
1175 shared: SharedCollectionState,
1176 time_dependence: Option<TimeDependence>,
1179}
1180
1181impl Collection {
1182 fn new_log() -> Self {
1183 let as_of = Antichain::from_elem(Timestamp::MIN);
1184 Self {
1185 write_only: false,
1186 compute_dependencies: Default::default(),
1187 shared: SharedCollectionState::new(as_of),
1188 time_dependence: Some(TimeDependence::default()),
1189 }
1190 }
1191
1192 fn frontiers(&self) -> CollectionFrontiers {
1193 let read_frontier = self
1194 .shared
1195 .lock_read_capabilities(|c| c.frontier().to_owned());
1196 let write_frontier = self.shared.lock_write_frontier(|f| f.clone());
1197 CollectionFrontiers {
1198 read_frontier,
1199 write_frontier,
1200 }
1201 }
1202}
1203
1204#[derive(Clone, Debug)]
1206pub struct CollectionFrontiers {
1207 pub read_frontier: Antichain<Timestamp>,
1209 pub write_frontier: Antichain<Timestamp>,
1211}
1212
1213impl Default for CollectionFrontiers {
1214 fn default() -> Self {
1215 Self {
1216 read_frontier: Antichain::from_elem(Timestamp::MIN),
1217 write_frontier: Antichain::from_elem(Timestamp::MIN),
1218 }
1219 }
1220}