1use std::collections::{BTreeMap, BTreeSet};
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use mz_build_info::BuildInfo;
40use mz_cluster_client::client::ClusterReplicaLocation;
41use mz_cluster_client::metrics::ControllerMetrics;
42use mz_cluster_client::{ReplicaId, WallclockLagFn};
43use mz_compute_types::ComputeInstanceId;
44use mz_compute_types::config::ComputeReplicaConfig;
45use mz_compute_types::dataflows::DataflowDescription;
46use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
47use mz_dyncfg::ConfigSet;
48use mz_expr::RowSetFinishing;
49use mz_expr::row::RowCollection;
50use mz_ore::cast::CastFrom;
51use mz_ore::metrics::MetricsRegistry;
52use mz_ore::now::NowFn;
53use mz_ore::tracing::OpenTelemetryContext;
54use mz_persist_types::PersistLocation;
55use mz_repr::{GlobalId, RelationDesc, Row, Timestamp};
56use mz_storage_client::controller::StorageController;
57use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
58use mz_storage_types::read_holds::ReadHold;
59use mz_storage_types::read_policy::ReadPolicy;
60use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
61use prometheus::proto::LabelPair;
62use serde::{Deserialize, Serialize};
63use timely::PartialOrder;
64use timely::progress::Antichain;
65use tokio::sync::{mpsc, oneshot};
66use tokio::time::{self, MissedTickBehavior};
67use uuid::Uuid;
68
69use crate::controller::error::{
70 CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
71 HydrationCheckBadTarget, InstanceExists, InstanceMissing, PeekError, ReadPolicyError,
72 ReplicaCreationError, ReplicaDropError,
73};
74use crate::controller::instance::{Instance, SharedCollectionState};
75use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection_sink};
76use crate::controller::replica::ReplicaConfig;
77use crate::logging::{LogVariant, LoggingConfig};
78use crate::metrics::ComputeControllerMetrics;
79use crate::protocol::command::{ComputeParameters, PeekTarget};
80use crate::protocol::response::{PeekResponse, SubscribeBatch};
81
82mod instance;
83mod introspection;
84mod replica;
85mod sequential_hydration;
86
87pub mod error;
88pub mod instance_client;
89pub use instance_client::InstanceClient;
90
91pub(crate) type StorageCollections =
92 Arc<dyn mz_storage_client::storage_collections::StorageCollections + Send + Sync>;
93
94#[derive(Debug)]
96pub enum ComputeControllerResponse {
97 PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
99 SubscribeResponse(GlobalId, SubscribeBatch),
101 CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
112 FrontierUpper {
117 id: GlobalId,
119 upper: Antichain<Timestamp>,
121 },
122}
123
124#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
126pub enum PeekNotification {
127 Success {
129 rows: u64,
131 result_size: u64,
133 },
134 Error(String),
136 Canceled,
138}
139
140impl PeekNotification {
141 fn new(peek_response: &PeekResponse, offset: usize, limit: Option<usize>) -> Self {
144 match peek_response {
145 PeekResponse::Rows(rows) => {
146 let num_rows = u64::cast_from(RowCollection::offset_limit(
147 rows.iter().map(|r| r.count()).sum(),
148 offset,
149 limit,
150 ));
151 let result_size = u64::cast_from(rows.iter().map(|r| r.byte_len()).sum::<usize>());
152
153 tracing::trace!(?num_rows, ?result_size, "inline peek result");
154
155 Self::Success {
156 rows: num_rows,
157 result_size,
158 }
159 }
160 PeekResponse::Stashed(stashed_response) => {
161 let rows = stashed_response.num_rows(offset, limit);
162 let result_size = stashed_response.size_bytes();
163
164 tracing::trace!(?rows, ?result_size, "stashed peek result");
165
166 Self::Success {
167 rows: u64::cast_from(rows),
168 result_size: u64::cast_from(result_size),
169 }
170 }
171 PeekResponse::Error(err) => Self::Error(err.clone()),
172 PeekResponse::Canceled => Self::Canceled,
173 }
174 }
175}
176
177pub struct ComputeController {
179 instances: BTreeMap<ComputeInstanceId, InstanceState>,
180 instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>,
184 build_info: &'static BuildInfo,
185 storage_collections: StorageCollections,
187 initialized: bool,
189 read_only: bool,
195 config: ComputeParameters,
197 peek_stash_persist_location: PersistLocation,
199 stashed_response: Option<ComputeControllerResponse>,
201 metrics: ComputeControllerMetrics,
203 now: NowFn,
205 wallclock_lag: WallclockLagFn<Timestamp>,
207 dyncfg: Arc<ConfigSet>,
212
213 response_rx: mpsc::UnboundedReceiver<ComputeControllerResponse>,
215 response_tx: mpsc::UnboundedSender<ComputeControllerResponse>,
217 introspection_rx: Option<mpsc::UnboundedReceiver<IntrospectionUpdates>>,
222 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
224
225 maintenance_ticker: tokio::time::Interval,
227 maintenance_scheduled: bool,
229}
230
231impl ComputeController {
232 pub fn new(
234 build_info: &'static BuildInfo,
235 storage_collections: StorageCollections,
236 read_only: bool,
237 metrics_registry: &MetricsRegistry,
238 peek_stash_persist_location: PersistLocation,
239 controller_metrics: ControllerMetrics,
240 now: NowFn,
241 wallclock_lag: WallclockLagFn<Timestamp>,
242 ) -> Self {
243 let (response_tx, response_rx) = mpsc::unbounded_channel();
244 let (introspection_tx, introspection_rx) = mpsc::unbounded_channel();
245
246 let mut maintenance_ticker = time::interval(Duration::from_secs(1));
247 maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
248
249 let instance_workload_classes = Arc::new(Mutex::new(BTreeMap::<
250 ComputeInstanceId,
251 Option<String>,
252 >::new()));
253
254 metrics_registry.register_postprocessor({
258 let instance_workload_classes = Arc::clone(&instance_workload_classes);
259 move |metrics| {
260 let instance_workload_classes = instance_workload_classes
261 .lock()
262 .expect("lock poisoned")
263 .iter()
264 .map(|(id, workload_class)| (id.to_string(), workload_class.clone()))
265 .collect::<BTreeMap<String, Option<String>>>();
266 for metric in metrics {
267 'metric: for metric in metric.mut_metric() {
268 for label in metric.get_label() {
269 if label.name() == "instance_id" {
270 if let Some(workload_class) = instance_workload_classes
271 .get(label.value())
272 .cloned()
273 .flatten()
274 {
275 let mut label = LabelPair::default();
276 label.set_name("workload_class".into());
277 label.set_value(workload_class.clone());
278
279 let mut labels = metric.take_label();
280 labels.push(label);
281 metric.set_label(labels);
282 }
283 continue 'metric;
284 }
285 }
286 }
287 }
288 }
289 });
290
291 let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics);
292
293 Self {
294 instances: BTreeMap::new(),
295 instance_workload_classes,
296 build_info,
297 storage_collections,
298 initialized: false,
299 read_only,
300 config: Default::default(),
301 peek_stash_persist_location,
302 stashed_response: None,
303 metrics,
304 now,
305 wallclock_lag,
306 dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()),
307 response_rx,
308 response_tx,
309 introspection_rx: Some(introspection_rx),
310 introspection_tx,
311 maintenance_ticker,
312 maintenance_scheduled: false,
313 }
314 }
315
316 pub fn start_introspection_sink(&mut self, storage_controller: &dyn StorageController) {
321 if let Some(rx) = self.introspection_rx.take() {
322 spawn_introspection_sink(rx, storage_controller);
323 }
324 }
325
326 pub fn instance_exists(&self, id: ComputeInstanceId) -> bool {
328 self.instances.contains_key(&id)
329 }
330
331 fn instance(&self, id: ComputeInstanceId) -> Result<&InstanceState, InstanceMissing> {
333 self.instances.get(&id).ok_or(InstanceMissing(id))
334 }
335
336 pub fn instance_client(
338 &self,
339 id: ComputeInstanceId,
340 ) -> Result<InstanceClient, InstanceMissing> {
341 self.instance(id).map(|instance| instance.client.clone())
342 }
343
344 fn instance_mut(
346 &mut self,
347 id: ComputeInstanceId,
348 ) -> Result<&mut InstanceState, InstanceMissing> {
349 self.instances.get_mut(&id).ok_or(InstanceMissing(id))
350 }
351
352 pub fn collection_ids(
354 &self,
355 instance_id: ComputeInstanceId,
356 ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
357 let instance = self.instance(instance_id)?;
358 let ids = instance.collections.keys().copied();
359 Ok(ids)
360 }
361
362 pub fn collection_frontiers(
367 &self,
368 collection_id: GlobalId,
369 instance_id: Option<ComputeInstanceId>,
370 ) -> Result<CollectionFrontiers, CollectionLookupError> {
371 let collection = match instance_id {
372 Some(id) => self.instance(id)?.collection(collection_id)?,
373 None => self
374 .instances
375 .values()
376 .find_map(|i| i.collections.get(&collection_id))
377 .ok_or(CollectionMissing(collection_id))?,
378 };
379
380 Ok(collection.frontiers())
381 }
382
383 pub fn collection_reverse_dependencies(
385 &self,
386 instance_id: ComputeInstanceId,
387 id: GlobalId,
388 ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
389 let instance = self.instance(instance_id)?;
390 let collections = instance.collections.iter();
391 let ids = collections
392 .filter_map(move |(cid, c)| c.compute_dependencies.contains(&id).then_some(*cid));
393 Ok(ids)
394 }
395
396 pub async fn collection_hydrated(
402 &self,
403 instance_id: ComputeInstanceId,
404 collection_id: GlobalId,
405 ) -> Result<bool, anyhow::Error> {
406 let instance = self.instance(instance_id)?;
407
408 let res = instance
409 .call_sync(move |i| i.collection_hydrated(collection_id))
410 .await?;
411
412 Ok(res)
413 }
414
415 pub fn collections_hydrated_for_replicas(
422 &self,
423 instance_id: ComputeInstanceId,
424 replicas: Vec<ReplicaId>,
425 exclude_collections: BTreeSet<GlobalId>,
426 ) -> Result<oneshot::Receiver<bool>, anyhow::Error> {
427 let instance = self.instance(instance_id)?;
428
429 if !instance.replicas.is_empty()
431 && !replicas.iter().any(|id| instance.replicas.contains(id))
432 {
433 return Err(HydrationCheckBadTarget(replicas).into());
434 }
435
436 let (tx, rx) = oneshot::channel();
437 instance.call(move |i| {
438 let result = i
439 .collections_hydrated_on_replicas(Some(replicas), &exclude_collections)
440 .expect("validated");
441 let _ = tx.send(result);
442 });
443
444 Ok(rx)
445 }
446
447 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
451 let Self {
458 instances,
459 instance_workload_classes,
460 build_info: _,
461 storage_collections: _,
462 initialized,
463 read_only,
464 config: _,
465 peek_stash_persist_location: _,
466 stashed_response,
467 metrics: _,
468 now: _,
469 wallclock_lag: _,
470 dyncfg: _,
471 response_rx: _,
472 response_tx: _,
473 introspection_rx: _,
474 introspection_tx: _,
475 maintenance_ticker: _,
476 maintenance_scheduled,
477 } = self;
478
479 let mut instances_dump = BTreeMap::new();
480 for (id, instance) in instances {
481 let dump = instance.dump().await?;
482 instances_dump.insert(id.to_string(), dump);
483 }
484
485 let instance_workload_classes: BTreeMap<_, _> = instance_workload_classes
486 .lock()
487 .expect("lock poisoned")
488 .iter()
489 .map(|(id, wc)| (id.to_string(), format!("{wc:?}")))
490 .collect();
491
492 Ok(serde_json::json!({
493 "instances": instances_dump,
494 "instance_workload_classes": instance_workload_classes,
495 "initialized": initialized,
496 "read_only": read_only,
497 "stashed_response": format!("{stashed_response:?}"),
498 "maintenance_scheduled": maintenance_scheduled,
499 }))
500 }
501}
502
503impl ComputeController {
504 pub fn create_instance(
506 &mut self,
507 id: ComputeInstanceId,
508 arranged_logs: BTreeMap<LogVariant, GlobalId>,
509 workload_class: Option<String>,
510 ) -> Result<(), InstanceExists> {
511 if self.instances.contains_key(&id) {
512 return Err(InstanceExists(id));
513 }
514
515 let mut collections = BTreeMap::new();
516 let mut logs = Vec::with_capacity(arranged_logs.len());
517 for (&log, &id) in &arranged_logs {
518 let collection = Collection::new_log();
519 let shared = collection.shared.clone();
520 collections.insert(id, collection);
521 logs.push((log, id, shared));
522 }
523
524 let client = InstanceClient::spawn(
525 id,
526 self.build_info,
527 Arc::clone(&self.storage_collections),
528 self.peek_stash_persist_location.clone(),
529 logs,
530 self.metrics.for_instance(id),
531 self.now.clone(),
532 self.wallclock_lag.clone(),
533 Arc::clone(&self.dyncfg),
534 self.response_tx.clone(),
535 self.introspection_tx.clone(),
536 self.read_only,
537 );
538
539 let instance = InstanceState::new(client, collections);
540 self.instances.insert(id, instance);
541
542 self.instance_workload_classes
543 .lock()
544 .expect("lock poisoned")
545 .insert(id, workload_class.clone());
546
547 let instance = self.instances.get_mut(&id).expect("instance just added");
548 if self.initialized {
549 instance.call(Instance::initialization_complete);
550 }
551
552 let mut config_params = self.config.clone();
553 config_params.workload_class = Some(workload_class);
554 instance.call(|i| i.update_configuration(config_params));
555
556 Ok(())
557 }
558
559 pub fn update_instance_workload_class(
561 &mut self,
562 id: ComputeInstanceId,
563 workload_class: Option<String>,
564 ) -> Result<(), InstanceMissing> {
565 let _ = self.instance(id)?;
567
568 self.instance_workload_classes
569 .lock()
570 .expect("lock poisoned")
571 .insert(id, workload_class);
572
573 self.update_configuration(Default::default());
575
576 Ok(())
577 }
578
579 pub fn drop_instance(&mut self, id: ComputeInstanceId) {
585 if let Some(instance) = self.instances.remove(&id) {
586 instance.call(|i| i.shutdown());
587 }
588
589 self.instance_workload_classes
590 .lock()
591 .expect("lock poisoned")
592 .remove(&id);
593 }
594
595 pub fn dyncfg(&self) -> &Arc<ConfigSet> {
597 &self.dyncfg
598 }
599
600 pub fn update_configuration(&mut self, config_params: ComputeParameters) {
602 config_params.dyncfg_updates.apply(&self.dyncfg);
604
605 let instance_workload_classes = self
606 .instance_workload_classes
607 .lock()
608 .expect("lock poisoned");
609
610 for (id, instance) in self.instances.iter_mut() {
613 let mut params = config_params.clone();
614 params.workload_class = Some(instance_workload_classes[id].clone());
615 instance.call(|i| i.update_configuration(params));
616 }
617
618 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(&self.dyncfg);
619 match overflowing_behavior.parse() {
620 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
621 Err(err) => {
622 tracing::error!(
623 err,
624 overflowing_behavior,
625 "Invalid value for ore_overflowing_behavior"
626 );
627 }
628 }
629
630 self.config.update(config_params);
632 }
633
634 pub fn initialization_complete(&mut self) {
640 self.initialized = true;
641 for instance in self.instances.values_mut() {
642 instance.call(Instance::initialization_complete);
643 }
644 }
645
646 pub async fn ready(&mut self) {
654 if self.stashed_response.is_some() {
655 return;
657 }
658 if self.maintenance_scheduled {
659 return;
661 }
662
663 tokio::select! {
664 resp = self.response_rx.recv() => {
665 let resp = resp.expect("`self.response_tx` not dropped");
666 self.stashed_response = Some(resp);
667 }
668 _ = self.maintenance_ticker.tick() => {
669 self.maintenance_scheduled = true;
670 },
671 }
672 }
673
674 pub fn add_replica_to_instance(
676 &mut self,
677 instance_id: ComputeInstanceId,
678 replica_id: ReplicaId,
679 location: ClusterReplicaLocation,
680 config: ComputeReplicaConfig,
681 ) -> Result<(), ReplicaCreationError> {
682 use ReplicaCreationError::*;
683
684 let instance = self.instance(instance_id)?;
685
686 if instance.replicas.contains(&replica_id) {
688 return Err(ReplicaExists(replica_id));
689 }
690
691 let (enable_logging, interval) = match config.logging.interval {
692 Some(interval) => (true, interval),
693 None => (false, Duration::from_secs(1)),
694 };
695
696 let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.dyncfg);
697
698 let replica_config = ReplicaConfig {
699 location,
700 logging: LoggingConfig {
701 interval,
702 enable_logging,
703 log_logging: config.logging.log_logging,
704 index_logs: Default::default(),
705 },
706 grpc_client: self.config.grpc_client.clone(),
707 expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
708 };
709
710 let instance = self.instance_mut(instance_id).expect("validated");
711 instance.replicas.insert(replica_id);
712
713 instance.call(move |i| {
714 i.add_replica(replica_id, replica_config, None)
715 .expect("validated")
716 });
717
718 Ok(())
719 }
720
721 pub fn drop_replica(
723 &mut self,
724 instance_id: ComputeInstanceId,
725 replica_id: ReplicaId,
726 ) -> Result<(), ReplicaDropError> {
727 use ReplicaDropError::*;
728
729 let instance = self.instance_mut(instance_id)?;
730
731 if !instance.replicas.contains(&replica_id) {
733 return Err(ReplicaMissing(replica_id));
734 }
735
736 instance.replicas.remove(&replica_id);
737
738 instance.call(move |i| i.remove_replica(replica_id).expect("validated"));
739
740 Ok(())
741 }
742
743 pub fn create_dataflow(
750 &mut self,
751 instance_id: ComputeInstanceId,
752 mut dataflow: DataflowDescription<mz_compute_types::plan::Plan, ()>,
753 target_replica: Option<ReplicaId>,
754 ) -> Result<(), DataflowCreationError> {
755 use DataflowCreationError::*;
756
757 let instance = self.instance(instance_id)?;
758
759 if let Some(replica_id) = target_replica {
761 if !instance.replicas.contains(&replica_id) {
762 return Err(ReplicaMissing(replica_id));
763 }
764 assert!(
765 dataflow.exported_index_ids().next().is_none(),
766 "Replica-targeted indexes are not supported"
767 );
768 }
769
770 let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
772 if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
773 return Err(EmptyAsOfForSubscribe);
774 }
775 if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
776 return Err(EmptyAsOfForCopyTo);
777 }
778
779 let storage_ids = dataflow.imported_source_ids().collect();
781 let mut import_read_holds = self.storage_collections.acquire_read_holds(storage_ids)?;
782 for id in dataflow.imported_index_ids() {
783 let read_hold = instance.acquire_read_hold(id)?;
784 import_read_holds.push(read_hold);
785 }
786 for hold in &import_read_holds {
787 if PartialOrder::less_than(as_of, hold.since()) {
788 return Err(SinceViolation(hold.id()));
789 }
790 }
791
792 for id in dataflow.persist_sink_ids() {
794 if self.storage_collections.check_exists(id).is_err() {
795 return Err(CollectionMissing(id));
796 }
797 }
798 let time_dependence = self
799 .determine_time_dependence(instance_id, &dataflow)
800 .expect("must exist");
801
802 let instance = self.instance_mut(instance_id).expect("validated");
803
804 let mut shared_collection_state = BTreeMap::new();
805 for id in dataflow.export_ids() {
806 let shared = SharedCollectionState::new(as_of.clone());
807 let collection = Collection {
808 write_only: dataflow.sink_exports.contains_key(&id),
809 compute_dependencies: dataflow.imported_index_ids().collect(),
810 shared: shared.clone(),
811 time_dependence: time_dependence.clone(),
812 };
813 instance.collections.insert(id, collection);
814 shared_collection_state.insert(id, shared);
815 }
816
817 dataflow.time_dependence = time_dependence;
818
819 instance.call(move |i| {
820 i.create_dataflow(
821 dataflow,
822 import_read_holds,
823 shared_collection_state,
824 target_replica,
825 )
826 .expect("validated")
827 });
828
829 Ok(())
830 }
831
832 pub fn drop_collections(
835 &mut self,
836 instance_id: ComputeInstanceId,
837 collection_ids: Vec<GlobalId>,
838 ) -> Result<(), CollectionUpdateError> {
839 let instance = self.instance_mut(instance_id)?;
840
841 for id in &collection_ids {
843 instance.collection(*id)?;
844 }
845
846 for id in &collection_ids {
847 instance.collections.remove(id);
848 }
849
850 instance.call(|i| i.drop_collections(collection_ids).expect("validated"));
851
852 Ok(())
853 }
854
855 pub fn peek(
862 &self,
863 instance_id: ComputeInstanceId,
864 peek_target: PeekTarget,
865 literal_constraints: Option<Vec<Row>>,
866 uuid: Uuid,
867 timestamp: Timestamp,
868 result_desc: RelationDesc,
869 finishing: RowSetFinishing,
870 map_filter_project: mz_expr::SafeMfpPlan,
871 read_hold: ReadHold,
872 target_replica: Option<ReplicaId>,
873 peek_response_tx: oneshot::Sender<PeekResponse>,
874 ) -> Result<(), PeekError> {
875 use PeekError::*;
876
877 let instance = self.instance(instance_id)?;
878
879 if let Some(replica_id) = target_replica {
881 if !instance.replicas.contains(&replica_id) {
882 return Err(ReplicaMissing(replica_id));
883 }
884 }
885
886 if read_hold.id() != peek_target.id() {
889 return Err(ReadHoldIdMismatch(read_hold.id()));
890 }
891 if !read_hold.since().less_equal(×tamp) {
892 return Err(SinceViolation(peek_target.id()));
893 }
894
895 instance.call(move |i| {
896 i.peek(
897 peek_target,
898 literal_constraints,
899 uuid,
900 timestamp,
901 result_desc,
902 finishing,
903 map_filter_project,
904 read_hold,
905 target_replica,
906 peek_response_tx,
907 )
908 .expect("validated")
909 });
910
911 Ok(())
912 }
913
914 pub fn cancel_peek(
924 &self,
925 instance_id: ComputeInstanceId,
926 uuid: Uuid,
927 reason: PeekResponse,
928 ) -> Result<(), InstanceMissing> {
929 self.instance(instance_id)?
930 .call(move |i| i.cancel_peek(uuid, reason));
931 Ok(())
932 }
933
934 pub fn set_read_policy(
946 &self,
947 instance_id: ComputeInstanceId,
948 policies: Vec<(GlobalId, ReadPolicy)>,
949 ) -> Result<(), ReadPolicyError> {
950 use ReadPolicyError::*;
951
952 let instance = self.instance(instance_id)?;
953
954 for (id, _) in &policies {
956 let collection = instance.collection(*id)?;
957 if collection.write_only {
958 return Err(WriteOnlyCollection(*id));
959 }
960 }
961
962 self.instance(instance_id)?
963 .call(|i| i.set_read_policy(policies).expect("validated"));
964
965 Ok(())
966 }
967
968 pub fn acquire_read_hold(
970 &self,
971 instance_id: ComputeInstanceId,
972 collection_id: GlobalId,
973 ) -> Result<ReadHold, CollectionUpdateError> {
974 let read_hold = self
975 .instance(instance_id)?
976 .acquire_read_hold(collection_id)?;
977 Ok(read_hold)
978 }
979
980 fn determine_time_dependence(
982 &self,
983 instance_id: ComputeInstanceId,
984 dataflow: &DataflowDescription<mz_compute_types::plan::Plan, ()>,
985 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
986 let instance = self
987 .instance(instance_id)
988 .map_err(|err| TimeDependenceError::InstanceMissing(err.0))?;
989 let mut time_dependencies = Vec::new();
990
991 for id in dataflow.imported_index_ids() {
992 let dependence = instance
993 .get_time_dependence(id)
994 .map_err(|err| TimeDependenceError::CollectionMissing(err.0))?;
995 time_dependencies.push(dependence);
996 }
997
998 'source: for id in dataflow.imported_source_ids() {
999 for instance in self.instances.values() {
1002 if let Ok(dependence) = instance.get_time_dependence(id) {
1003 time_dependencies.push(dependence);
1004 continue 'source;
1005 }
1006 }
1007
1008 time_dependencies.push(self.storage_collections.determine_time_dependence(id)?);
1010 }
1011
1012 Ok(TimeDependence::merge(
1013 time_dependencies,
1014 dataflow.refresh_schedule.as_ref(),
1015 ))
1016 }
1017
1018 #[mz_ore::instrument(level = "debug")]
1020 pub fn process(&mut self) -> Option<ComputeControllerResponse> {
1021 if self.maintenance_scheduled {
1023 self.maintain();
1024 self.maintenance_scheduled = false;
1025 }
1026
1027 self.stashed_response.take()
1029 }
1030
1031 #[mz_ore::instrument(level = "debug")]
1032 fn maintain(&mut self) {
1033 for instance in self.instances.values_mut() {
1035 instance.call(Instance::maintain);
1036 }
1037 }
1038
1039 pub fn allow_writes(
1043 &mut self,
1044 instance_id: ComputeInstanceId,
1045 collection_id: GlobalId,
1046 ) -> Result<(), CollectionUpdateError> {
1047 if self.read_only {
1048 tracing::debug!("Skipping allow_writes in read-only mode");
1049 return Ok(());
1050 }
1051
1052 let instance = self.instance_mut(instance_id)?;
1053
1054 instance.collection(collection_id)?;
1056
1057 instance.call(move |i| i.allow_writes(collection_id).expect("validated"));
1058
1059 Ok(())
1060 }
1061}
1062
1063#[derive(Debug)]
1064struct InstanceState {
1065 client: InstanceClient,
1066 replicas: BTreeSet<ReplicaId>,
1067 collections: BTreeMap<GlobalId, Collection>,
1068}
1069
1070impl InstanceState {
1071 fn new(client: InstanceClient, collections: BTreeMap<GlobalId, Collection>) -> Self {
1072 Self {
1073 client,
1074 replicas: Default::default(),
1075 collections,
1076 }
1077 }
1078
1079 fn collection(&self, id: GlobalId) -> Result<&Collection, CollectionMissing> {
1080 self.collections.get(&id).ok_or(CollectionMissing(id))
1081 }
1082
1083 fn call<F>(&self, f: F)
1089 where
1090 F: FnOnce(&mut Instance) + Send + 'static,
1091 {
1092 self.client.call(f).expect("instance not dropped")
1093 }
1094
1095 async fn call_sync<F, R>(&self, f: F) -> R
1101 where
1102 F: FnOnce(&mut Instance) -> R + Send + 'static,
1103 R: Send + 'static,
1104 {
1105 self.client
1106 .call_sync(f)
1107 .await
1108 .expect("instance not dropped")
1109 }
1110
1111 pub fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold, CollectionMissing> {
1113 let collection = self.collection(id)?;
1123 let since = collection.shared.lock_read_capabilities(|caps| {
1124 let since = caps.frontier().to_owned();
1125 caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
1126 since
1127 });
1128
1129 let hold = ReadHold::new(id, since, self.client.read_hold_tx());
1130 Ok(hold)
1131 }
1132
1133 fn get_time_dependence(
1135 &self,
1136 id: GlobalId,
1137 ) -> Result<Option<TimeDependence>, CollectionMissing> {
1138 Ok(self.collection(id)?.time_dependence.clone())
1139 }
1140
1141 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1143 let Self {
1145 client: _,
1146 replicas,
1147 collections,
1148 } = self;
1149
1150 let instance = self.call_sync(|i| i.dump()).await?;
1151 let replicas: Vec<_> = replicas.iter().map(|id| id.to_string()).collect();
1152 let collections: BTreeMap<_, _> = collections
1153 .iter()
1154 .map(|(id, c)| (id.to_string(), format!("{c:?}")))
1155 .collect();
1156
1157 Ok(serde_json::json!({
1158 "instance": instance,
1159 "replicas": replicas,
1160 "collections": collections,
1161 }))
1162 }
1163}
1164
1165#[derive(Debug)]
1166struct Collection {
1167 write_only: bool,
1169 compute_dependencies: BTreeSet<GlobalId>,
1170 shared: SharedCollectionState,
1171 time_dependence: Option<TimeDependence>,
1174}
1175
1176impl Collection {
1177 fn new_log() -> Self {
1178 let as_of = Antichain::from_elem(Timestamp::MIN);
1179 Self {
1180 write_only: false,
1181 compute_dependencies: Default::default(),
1182 shared: SharedCollectionState::new(as_of),
1183 time_dependence: Some(TimeDependence::default()),
1184 }
1185 }
1186
1187 fn frontiers(&self) -> CollectionFrontiers {
1188 let read_frontier = self
1189 .shared
1190 .lock_read_capabilities(|c| c.frontier().to_owned());
1191 let write_frontier = self.shared.lock_write_frontier(|f| f.clone());
1192 CollectionFrontiers {
1193 read_frontier,
1194 write_frontier,
1195 }
1196 }
1197}
1198
1199#[derive(Clone, Debug)]
1201pub struct CollectionFrontiers {
1202 pub read_frontier: Antichain<Timestamp>,
1204 pub write_frontier: Antichain<Timestamp>,
1206}
1207
1208impl Default for CollectionFrontiers {
1209 fn default() -> Self {
1210 Self {
1211 read_frontier: Antichain::from_elem(Timestamp::MIN),
1212 write_frontier: Antichain::from_elem(Timestamp::MIN),
1213 }
1214 }
1215}