1use std::collections::{BTreeMap, BTreeSet};
32use std::sync::{Arc, Mutex};
33use std::time::Duration;
34
35use mz_build_info::BuildInfo;
36use mz_cluster_client::client::ClusterReplicaLocation;
37use mz_cluster_client::metrics::ControllerMetrics;
38use mz_cluster_client::{ReplicaId, WallclockLagFn};
39use mz_compute_types::ComputeInstanceId;
40use mz_compute_types::config::ComputeReplicaConfig;
41use mz_compute_types::dataflows::DataflowDescription;
42use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
43use mz_dyncfg::ConfigSet;
44use mz_expr::RowSetFinishing;
45use mz_ore::cast::CastFrom;
46use mz_ore::collections::CollectionExt;
47use mz_ore::metrics::MetricsRegistry;
48use mz_ore::now::NowFn;
49use mz_ore::tracing::OpenTelemetryContext;
50use mz_persist_types::PersistLocation;
51use mz_repr::{Datum, GlobalId, RelationDesc, Row, TimestampManipulation};
52use mz_storage_client::controller::StorageController;
53use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
54use mz_storage_types::read_holds::ReadHold;
55use mz_storage_types::read_policy::ReadPolicy;
56use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
57use prometheus::proto::LabelPair;
58use serde::{Deserialize, Serialize};
59use timely::PartialOrder;
60use timely::progress::{Antichain, Timestamp};
61use tokio::sync::{mpsc, oneshot};
62use tokio::time::{self, MissedTickBehavior};
63use tracing::debug_span;
64use uuid::Uuid;
65
66use crate::controller::error::{
67 CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
68 HydrationCheckBadTarget, InstanceExists, InstanceMissing, PeekError, ReadPolicyError,
69 ReplicaCreationError, ReplicaDropError,
70};
71use crate::controller::instance::{Instance, SharedCollectionState};
72use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection_sink};
73use crate::controller::replica::ReplicaConfig;
74use crate::logging::{LogVariant, LoggingConfig};
75use crate::metrics::ComputeControllerMetrics;
76use crate::protocol::command::{ComputeParameters, PeekTarget};
77use crate::protocol::response::{PeekResponse, SubscribeBatch};
78
79mod instance;
80mod introspection;
81mod replica;
82mod sequential_hydration;
83
84pub mod error;
85
86pub(crate) type StorageCollections<T> = Arc<
87 dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T> + Send + Sync,
88>;
89
90pub trait ComputeControllerTimestamp: TimestampManipulation + Into<Datum<'static>> + Sync {}
93
94impl ComputeControllerTimestamp for mz_repr::Timestamp {}
95
96#[derive(Debug)]
98pub enum ComputeControllerResponse<T> {
99 PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
101 SubscribeResponse(GlobalId, SubscribeBatch<T>),
103 CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
114 FrontierUpper {
119 id: GlobalId,
121 upper: Antichain<T>,
123 },
124}
125
126#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
128pub enum PeekNotification {
129 Success {
131 rows: u64,
133 result_size: u64,
135 },
136 Error(String),
138 Canceled,
140}
141
142impl PeekNotification {
143 fn new(peek_response: &PeekResponse, offset: usize, limit: Option<usize>) -> Self {
146 match peek_response {
147 PeekResponse::Rows(rows) => {
148 let num_rows = u64::cast_from(rows.count(offset, limit));
149 let result_size = u64::cast_from(rows.byte_len());
150
151 tracing::trace!(?num_rows, ?result_size, "inline peek result");
152
153 Self::Success {
154 rows: num_rows,
155 result_size,
156 }
157 }
158 PeekResponse::Stashed(stashed_response) => {
159 let rows = stashed_response.num_rows(offset, limit);
160 let result_size = stashed_response.size_bytes();
161
162 tracing::trace!(?rows, ?result_size, "stashed peek result");
163
164 Self::Success {
165 rows: u64::cast_from(rows),
166 result_size: u64::cast_from(result_size),
167 }
168 }
169 PeekResponse::Error(err) => Self::Error(err.clone()),
170 PeekResponse::Canceled => Self::Canceled,
171 }
172 }
173}
174
175pub struct ComputeController<T: ComputeControllerTimestamp> {
177 instances: BTreeMap<ComputeInstanceId, InstanceState<T>>,
178 instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>,
182 build_info: &'static BuildInfo,
183 storage_collections: StorageCollections<T>,
185 initialized: bool,
187 read_only: bool,
193 config: ComputeParameters,
195 peek_stash_persist_location: PersistLocation,
197 stashed_response: Option<ComputeControllerResponse<T>>,
199 metrics: ComputeControllerMetrics,
201 now: NowFn,
203 wallclock_lag: WallclockLagFn<T>,
205 dyncfg: Arc<ConfigSet>,
210
211 response_rx: mpsc::UnboundedReceiver<ComputeControllerResponse<T>>,
213 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
215 introspection_rx: Option<mpsc::UnboundedReceiver<IntrospectionUpdates>>,
220 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
222
223 maintenance_ticker: tokio::time::Interval,
225 maintenance_scheduled: bool,
227}
228
229impl<T: ComputeControllerTimestamp> ComputeController<T> {
230 pub fn new(
232 build_info: &'static BuildInfo,
233 storage_collections: StorageCollections<T>,
234 read_only: bool,
235 metrics_registry: &MetricsRegistry,
236 peek_stash_persist_location: PersistLocation,
237 controller_metrics: ControllerMetrics,
238 now: NowFn,
239 wallclock_lag: WallclockLagFn<T>,
240 ) -> Self {
241 let (response_tx, response_rx) = mpsc::unbounded_channel();
242 let (introspection_tx, introspection_rx) = mpsc::unbounded_channel();
243
244 let mut maintenance_ticker = time::interval(Duration::from_secs(1));
245 maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
246
247 let instance_workload_classes = Arc::new(Mutex::new(BTreeMap::<
248 ComputeInstanceId,
249 Option<String>,
250 >::new()));
251
252 metrics_registry.register_postprocessor({
256 let instance_workload_classes = Arc::clone(&instance_workload_classes);
257 move |metrics| {
258 let instance_workload_classes = instance_workload_classes
259 .lock()
260 .expect("lock poisoned")
261 .iter()
262 .map(|(id, workload_class)| (id.to_string(), workload_class.clone()))
263 .collect::<BTreeMap<String, Option<String>>>();
264 for metric in metrics {
265 'metric: for metric in metric.mut_metric() {
266 for label in metric.get_label() {
267 if label.get_name() == "instance_id" {
268 if let Some(workload_class) = instance_workload_classes
269 .get(label.get_value())
270 .cloned()
271 .flatten()
272 {
273 let mut label = LabelPair::default();
274 label.set_name("workload_class".into());
275 label.set_value(workload_class.clone());
276
277 let mut labels = metric.take_label();
278 labels.push(label);
279 metric.set_label(labels);
280 }
281 continue 'metric;
282 }
283 }
284 }
285 }
286 }
287 });
288
289 let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics);
290
291 Self {
292 instances: BTreeMap::new(),
293 instance_workload_classes,
294 build_info,
295 storage_collections,
296 initialized: false,
297 read_only,
298 config: Default::default(),
299 peek_stash_persist_location,
300 stashed_response: None,
301 metrics,
302 now,
303 wallclock_lag,
304 dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()),
305 response_rx,
306 response_tx,
307 introspection_rx: Some(introspection_rx),
308 introspection_tx,
309 maintenance_ticker,
310 maintenance_scheduled: false,
311 }
312 }
313
314 pub fn start_introspection_sink(
319 &mut self,
320 storage_controller: &dyn StorageController<Timestamp = T>,
321 ) {
322 if let Some(rx) = self.introspection_rx.take() {
323 spawn_introspection_sink(rx, storage_controller);
324 }
325 }
326
327 pub fn instance_exists(&self, id: ComputeInstanceId) -> bool {
329 self.instances.contains_key(&id)
330 }
331
332 fn instance(&self, id: ComputeInstanceId) -> Result<&InstanceState<T>, InstanceMissing> {
334 self.instances.get(&id).ok_or(InstanceMissing(id))
335 }
336
337 fn instance_mut(
339 &mut self,
340 id: ComputeInstanceId,
341 ) -> Result<&mut InstanceState<T>, InstanceMissing> {
342 self.instances.get_mut(&id).ok_or(InstanceMissing(id))
343 }
344
345 pub fn collection_ids(
347 &self,
348 instance_id: ComputeInstanceId,
349 ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
350 let instance = self.instance(instance_id)?;
351 let ids = instance.collections.keys().copied();
352 Ok(ids)
353 }
354
355 pub fn collection_frontiers(
360 &self,
361 collection_id: GlobalId,
362 instance_id: Option<ComputeInstanceId>,
363 ) -> Result<CollectionFrontiers<T>, CollectionLookupError> {
364 let collection = match instance_id {
365 Some(id) => self.instance(id)?.collection(collection_id)?,
366 None => self
367 .instances
368 .values()
369 .find_map(|i| i.collections.get(&collection_id))
370 .ok_or(CollectionMissing(collection_id))?,
371 };
372
373 Ok(collection.frontiers())
374 }
375
376 pub fn collection_reverse_dependencies(
378 &self,
379 instance_id: ComputeInstanceId,
380 id: GlobalId,
381 ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
382 let instance = self.instance(instance_id)?;
383 let collections = instance.collections.iter();
384 let ids = collections
385 .filter_map(move |(cid, c)| c.compute_dependencies.contains(&id).then_some(*cid));
386 Ok(ids)
387 }
388
389 pub async fn collection_hydrated(
395 &self,
396 instance_id: ComputeInstanceId,
397 collection_id: GlobalId,
398 ) -> Result<bool, anyhow::Error> {
399 let instance = self.instance(instance_id)?;
400
401 let res = instance
402 .call_sync(move |i| i.collection_hydrated(collection_id))
403 .await?;
404
405 Ok(res)
406 }
407
408 pub fn collections_hydrated_for_replicas(
415 &self,
416 instance_id: ComputeInstanceId,
417 replicas: Vec<ReplicaId>,
418 exclude_collections: BTreeSet<GlobalId>,
419 ) -> Result<oneshot::Receiver<bool>, anyhow::Error> {
420 let instance = self.instance(instance_id)?;
421
422 if instance.replicas.is_empty() && !replicas.iter().any(|id| instance.replicas.contains(id))
424 {
425 return Err(HydrationCheckBadTarget(replicas).into());
426 }
427
428 let (tx, rx) = oneshot::channel();
429 instance.call(move |i| {
430 let result = i
431 .collections_hydrated_on_replicas(Some(replicas), &exclude_collections)
432 .expect("validated");
433 let _ = tx.send(result);
434 });
435
436 Ok(rx)
437 }
438
439 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
443 let Self {
450 instances,
451 instance_workload_classes,
452 build_info: _,
453 storage_collections: _,
454 initialized,
455 read_only,
456 config: _,
457 peek_stash_persist_location: _,
458 stashed_response,
459 metrics: _,
460 now: _,
461 wallclock_lag: _,
462 dyncfg: _,
463 response_rx: _,
464 response_tx: _,
465 introspection_rx: _,
466 introspection_tx: _,
467 maintenance_ticker: _,
468 maintenance_scheduled,
469 } = self;
470
471 let mut instances_dump = BTreeMap::new();
472 for (id, instance) in instances {
473 let dump = instance.dump().await?;
474 instances_dump.insert(id.to_string(), dump);
475 }
476
477 let instance_workload_classes: BTreeMap<_, _> = instance_workload_classes
478 .lock()
479 .expect("lock poisoned")
480 .iter()
481 .map(|(id, wc)| (id.to_string(), format!("{wc:?}")))
482 .collect();
483
484 fn field(
485 key: &str,
486 value: impl Serialize,
487 ) -> Result<(String, serde_json::Value), anyhow::Error> {
488 let value = serde_json::to_value(value)?;
489 Ok((key.to_string(), value))
490 }
491
492 let map = serde_json::Map::from_iter([
493 field("instances", instances_dump)?,
494 field("instance_workload_classes", instance_workload_classes)?,
495 field("initialized", initialized)?,
496 field("read_only", read_only)?,
497 field("stashed_response", format!("{stashed_response:?}"))?,
498 field("maintenance_scheduled", maintenance_scheduled)?,
499 ]);
500 Ok(serde_json::Value::Object(map))
501 }
502}
503
504impl<T> ComputeController<T>
505where
506 T: ComputeControllerTimestamp,
507{
508 pub fn create_instance(
510 &mut self,
511 id: ComputeInstanceId,
512 arranged_logs: BTreeMap<LogVariant, GlobalId>,
513 workload_class: Option<String>,
514 ) -> Result<(), InstanceExists> {
515 if self.instances.contains_key(&id) {
516 return Err(InstanceExists(id));
517 }
518
519 let mut collections = BTreeMap::new();
520 let mut logs = Vec::with_capacity(arranged_logs.len());
521 for (&log, &id) in &arranged_logs {
522 let collection = Collection::new_log();
523 let shared = collection.shared.clone();
524 collections.insert(id, collection);
525 logs.push((log, id, shared));
526 }
527
528 let client = instance::Client::spawn(
529 id,
530 self.build_info,
531 Arc::clone(&self.storage_collections),
532 self.peek_stash_persist_location.clone(),
533 logs,
534 self.metrics.for_instance(id),
535 self.now.clone(),
536 self.wallclock_lag.clone(),
537 Arc::clone(&self.dyncfg),
538 self.response_tx.clone(),
539 self.introspection_tx.clone(),
540 );
541
542 let instance = InstanceState::new(client, collections);
543 self.instances.insert(id, instance);
544
545 self.instance_workload_classes
546 .lock()
547 .expect("lock poisoned")
548 .insert(id, workload_class.clone());
549
550 let instance = self.instances.get_mut(&id).expect("instance just added");
551 if self.initialized {
552 instance.call(Instance::initialization_complete);
553 }
554
555 if !self.read_only {
556 instance.call(Instance::allow_writes);
557 }
558
559 let mut config_params = self.config.clone();
560 config_params.workload_class = Some(workload_class);
561 instance.call(|i| i.update_configuration(config_params));
562
563 Ok(())
564 }
565
566 pub fn update_instance_workload_class(
568 &mut self,
569 id: ComputeInstanceId,
570 workload_class: Option<String>,
571 ) -> Result<(), InstanceMissing> {
572 let _ = self.instance(id)?;
574
575 self.instance_workload_classes
576 .lock()
577 .expect("lock poisoned")
578 .insert(id, workload_class);
579
580 self.update_configuration(Default::default());
582
583 Ok(())
584 }
585
586 pub fn drop_instance(&mut self, id: ComputeInstanceId) {
592 if let Some(instance) = self.instances.remove(&id) {
593 instance.call(|i| i.shutdown());
594 }
595
596 self.instance_workload_classes
597 .lock()
598 .expect("lock poisoned")
599 .remove(&id);
600 }
601
602 pub fn dyncfg(&self) -> &Arc<ConfigSet> {
604 &self.dyncfg
605 }
606
607 pub fn update_configuration(&mut self, config_params: ComputeParameters) {
609 config_params.dyncfg_updates.apply(&self.dyncfg);
611
612 let instance_workload_classes = self
613 .instance_workload_classes
614 .lock()
615 .expect("lock poisoned");
616
617 for (id, instance) in self.instances.iter_mut() {
620 let mut params = config_params.clone();
621 params.workload_class = Some(instance_workload_classes[id].clone());
622 instance.call(|i| i.update_configuration(params));
623 }
624
625 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(&self.dyncfg);
626 match overflowing_behavior.parse() {
627 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
628 Err(err) => {
629 tracing::error!(
630 err,
631 overflowing_behavior,
632 "Invalid value for ore_overflowing_behavior"
633 );
634 }
635 }
636
637 self.config.update(config_params);
639 }
640
641 pub fn initialization_complete(&mut self) {
647 self.initialized = true;
648 for instance in self.instances.values_mut() {
649 instance.call(Instance::initialization_complete);
650 }
651 }
652
653 pub async fn ready(&mut self) {
661 if self.stashed_response.is_some() {
662 return;
664 }
665 if self.maintenance_scheduled {
666 return;
668 }
669
670 tokio::select! {
671 resp = self.response_rx.recv() => {
672 let resp = resp.expect("`self.response_tx` not dropped");
673 self.stashed_response = Some(resp);
674 }
675 _ = self.maintenance_ticker.tick() => {
676 self.maintenance_scheduled = true;
677 },
678 }
679 }
680
681 pub fn add_replica_to_instance(
683 &mut self,
684 instance_id: ComputeInstanceId,
685 replica_id: ReplicaId,
686 location: ClusterReplicaLocation,
687 config: ComputeReplicaConfig,
688 ) -> Result<(), ReplicaCreationError> {
689 use ReplicaCreationError::*;
690
691 let instance = self.instance(instance_id)?;
692
693 if instance.replicas.contains(&replica_id) {
695 return Err(ReplicaExists(replica_id));
696 }
697
698 let (enable_logging, interval) = match config.logging.interval {
699 Some(interval) => (true, interval),
700 None => (false, Duration::from_secs(1)),
701 };
702
703 let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.dyncfg);
704
705 let replica_config = ReplicaConfig {
706 location,
707 logging: LoggingConfig {
708 interval,
709 enable_logging,
710 log_logging: config.logging.log_logging,
711 index_logs: Default::default(),
712 },
713 grpc_client: self.config.grpc_client.clone(),
714 expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
715 };
716
717 let instance = self.instance_mut(instance_id).expect("validated");
718 instance.replicas.insert(replica_id);
719
720 instance.call(move |i| {
721 i.add_replica(replica_id, replica_config, None)
722 .expect("validated")
723 });
724
725 Ok(())
726 }
727
728 pub fn drop_replica(
730 &mut self,
731 instance_id: ComputeInstanceId,
732 replica_id: ReplicaId,
733 ) -> Result<(), ReplicaDropError> {
734 use ReplicaDropError::*;
735
736 let instance = self.instance_mut(instance_id)?;
737
738 if !instance.replicas.contains(&replica_id) {
740 return Err(ReplicaMissing(replica_id));
741 }
742
743 instance.replicas.remove(&replica_id);
744
745 instance.call(move |i| i.remove_replica(replica_id).expect("validated"));
746
747 Ok(())
748 }
749
750 pub fn create_dataflow(
756 &mut self,
757 instance_id: ComputeInstanceId,
758 mut dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
759 subscribe_target_replica: Option<ReplicaId>,
760 ) -> Result<(), DataflowCreationError> {
761 use DataflowCreationError::*;
762
763 let instance = self.instance(instance_id)?;
764
765 if let Some(replica_id) = subscribe_target_replica {
767 if !instance.replicas.contains(&replica_id) {
768 return Err(ReplicaMissing(replica_id));
769 }
770 }
771
772 let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
774 if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
775 return Err(EmptyAsOfForSubscribe);
776 }
777 if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
778 return Err(EmptyAsOfForCopyTo);
779 }
780
781 let storage_ids = dataflow.imported_source_ids().collect();
783 let mut import_read_holds = self.storage_collections.acquire_read_holds(storage_ids)?;
784 for id in dataflow.imported_index_ids() {
785 let read_hold = instance.acquire_read_hold(id)?;
786 import_read_holds.push(read_hold);
787 }
788 for hold in &import_read_holds {
789 if PartialOrder::less_than(as_of, hold.since()) {
790 return Err(SinceViolation(hold.id()));
791 }
792 }
793
794 for id in dataflow.persist_sink_ids() {
796 if self.storage_collections.check_exists(id).is_err() {
797 return Err(CollectionMissing(id));
798 }
799 }
800 let time_dependence = self
801 .determine_time_dependence(instance_id, &dataflow)
802 .expect("must exist");
803
804 let instance = self.instance_mut(instance_id).expect("validated");
805
806 let mut shared_collection_state = BTreeMap::new();
807 for id in dataflow.export_ids() {
808 let shared = SharedCollectionState::new(as_of.clone());
809 let collection = Collection {
810 write_only: dataflow.sink_exports.contains_key(&id),
811 compute_dependencies: dataflow.imported_index_ids().collect(),
812 shared: shared.clone(),
813 time_dependence: time_dependence.clone(),
814 };
815 instance.collections.insert(id, collection);
816 shared_collection_state.insert(id, shared);
817 }
818
819 dataflow.time_dependence = time_dependence;
820
821 instance.call(move |i| {
822 i.create_dataflow(
823 dataflow,
824 import_read_holds,
825 subscribe_target_replica,
826 shared_collection_state,
827 )
828 .expect("validated")
829 });
830
831 Ok(())
832 }
833
834 pub fn drop_collections(
837 &mut self,
838 instance_id: ComputeInstanceId,
839 collection_ids: Vec<GlobalId>,
840 ) -> Result<(), CollectionUpdateError> {
841 let instance = self.instance_mut(instance_id)?;
842
843 for id in &collection_ids {
845 instance.collection(*id)?;
846 }
847
848 for id in &collection_ids {
849 instance.collections.remove(id);
850 }
851
852 instance.call(|i| i.drop_collections(collection_ids).expect("validated"));
853
854 Ok(())
855 }
856
857 pub fn peek(
859 &self,
860 instance_id: ComputeInstanceId,
861 peek_target: PeekTarget,
862 literal_constraints: Option<Vec<Row>>,
863 uuid: Uuid,
864 timestamp: T,
865 result_desc: RelationDesc,
866 finishing: RowSetFinishing,
867 map_filter_project: mz_expr::SafeMfpPlan,
868 target_replica: Option<ReplicaId>,
869 peek_response_tx: oneshot::Sender<PeekResponse>,
870 ) -> Result<(), PeekError> {
871 use PeekError::*;
872
873 let instance = self.instance(instance_id)?;
874
875 if let Some(replica_id) = target_replica {
877 if !instance.replicas.contains(&replica_id) {
878 return Err(ReplicaMissing(replica_id));
879 }
880 }
881
882 let read_hold = match &peek_target {
884 PeekTarget::Index { id } => instance.acquire_read_hold(*id)?,
885 PeekTarget::Persist { id, .. } => self
886 .storage_collections
887 .acquire_read_holds(vec![*id])?
888 .into_element(),
889 };
890 if !read_hold.since().less_equal(×tamp) {
891 return Err(SinceViolation(peek_target.id()));
892 }
893
894 instance.call(move |i| {
895 i.peek(
896 peek_target,
897 literal_constraints,
898 uuid,
899 timestamp,
900 result_desc,
901 finishing,
902 map_filter_project,
903 read_hold,
904 target_replica,
905 peek_response_tx,
906 )
907 .expect("validated")
908 });
909
910 Ok(())
911 }
912
913 pub fn cancel_peek(
923 &self,
924 instance_id: ComputeInstanceId,
925 uuid: Uuid,
926 reason: PeekResponse,
927 ) -> Result<(), InstanceMissing> {
928 self.instance(instance_id)?
929 .call(move |i| i.cancel_peek(uuid, reason));
930 Ok(())
931 }
932
933 pub fn set_read_policy(
945 &self,
946 instance_id: ComputeInstanceId,
947 policies: Vec<(GlobalId, ReadPolicy<T>)>,
948 ) -> Result<(), ReadPolicyError> {
949 use ReadPolicyError::*;
950
951 let instance = self.instance(instance_id)?;
952
953 for (id, _) in &policies {
955 let collection = instance.collection(*id)?;
956 if collection.write_only {
957 return Err(WriteOnlyCollection(*id));
958 }
959 }
960
961 self.instance(instance_id)?
962 .call(|i| i.set_read_policy(policies).expect("validated"));
963
964 Ok(())
965 }
966
967 pub fn acquire_read_hold(
969 &self,
970 instance_id: ComputeInstanceId,
971 collection_id: GlobalId,
972 ) -> Result<ReadHold<T>, CollectionUpdateError> {
973 let read_hold = self
974 .instance(instance_id)?
975 .acquire_read_hold(collection_id)?;
976 Ok(read_hold)
977 }
978
979 fn determine_time_dependence(
981 &self,
982 instance_id: ComputeInstanceId,
983 dataflow: &DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
984 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
985 let is_continual_task = dataflow.continual_task_ids().next().is_some();
987 if is_continual_task {
988 return Ok(None);
989 }
990
991 let instance = self
992 .instance(instance_id)
993 .map_err(|err| TimeDependenceError::InstanceMissing(err.0))?;
994 let mut time_dependencies = Vec::new();
995
996 for id in dataflow.imported_index_ids() {
997 let dependence = instance
998 .get_time_dependence(id)
999 .map_err(|err| TimeDependenceError::CollectionMissing(err.0))?;
1000 time_dependencies.push(dependence);
1001 }
1002
1003 'source: for id in dataflow.imported_source_ids() {
1004 for instance in self.instances.values() {
1008 if let Ok(dependence) = instance.get_time_dependence(id) {
1009 time_dependencies.push(dependence);
1010 continue 'source;
1011 }
1012 }
1013
1014 time_dependencies.push(self.storage_collections.determine_time_dependence(id)?);
1016 }
1017
1018 Ok(TimeDependence::merge(
1019 time_dependencies,
1020 dataflow.refresh_schedule.as_ref(),
1021 ))
1022 }
1023
1024 #[mz_ore::instrument(level = "debug")]
1026 pub fn process(&mut self) -> Option<ComputeControllerResponse<T>> {
1027 if self.maintenance_scheduled {
1029 self.maintain();
1030 self.maintenance_scheduled = false;
1031 }
1032
1033 self.stashed_response.take()
1035 }
1036
1037 #[mz_ore::instrument(level = "debug")]
1038 fn maintain(&mut self) {
1039 for instance in self.instances.values_mut() {
1041 instance.call(Instance::maintain);
1042 }
1043 }
1044}
1045
1046#[derive(Debug)]
1047struct InstanceState<T: ComputeControllerTimestamp> {
1048 client: instance::Client<T>,
1049 replicas: BTreeSet<ReplicaId>,
1050 collections: BTreeMap<GlobalId, Collection<T>>,
1051}
1052
1053impl<T: ComputeControllerTimestamp> InstanceState<T> {
1054 fn new(client: instance::Client<T>, collections: BTreeMap<GlobalId, Collection<T>>) -> Self {
1055 Self {
1056 client,
1057 replicas: Default::default(),
1058 collections,
1059 }
1060 }
1061
1062 fn collection(&self, id: GlobalId) -> Result<&Collection<T>, CollectionMissing> {
1063 self.collections.get(&id).ok_or(CollectionMissing(id))
1064 }
1065
1066 fn call<F>(&self, f: F)
1067 where
1068 F: FnOnce(&mut Instance<T>) + Send + 'static,
1069 {
1070 let otel_ctx = OpenTelemetryContext::obtain();
1071 self.client
1072 .send(Box::new(move |instance| {
1073 let _span = debug_span!("instance::call").entered();
1074 otel_ctx.attach_as_parent();
1075
1076 f(instance)
1077 }))
1078 .expect("instance not dropped");
1079 }
1080
1081 async fn call_sync<F, R>(&self, f: F) -> R
1082 where
1083 F: FnOnce(&mut Instance<T>) -> R + Send + 'static,
1084 R: Send + 'static,
1085 {
1086 let (tx, rx) = oneshot::channel();
1087 let otel_ctx = OpenTelemetryContext::obtain();
1088 self.client
1089 .send(Box::new(move |instance| {
1090 let _span = debug_span!("instance::call_sync").entered();
1091 otel_ctx.attach_as_parent();
1092
1093 let result = f(instance);
1094 let _ = tx.send(result);
1095 }))
1096 .expect("instance not dropped");
1097
1098 rx.await.expect("instance not dropped")
1099 }
1100
1101 pub fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
1103 let collection = self.collection(id)?;
1113 let since = collection.shared.lock_read_capabilities(|caps| {
1114 let since = caps.frontier().to_owned();
1115 caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
1116 since
1117 });
1118
1119 let hold = ReadHold::new(id, since, self.client.read_hold_tx());
1120 Ok(hold)
1121 }
1122
1123 fn get_time_dependence(
1125 &self,
1126 id: GlobalId,
1127 ) -> Result<Option<TimeDependence>, CollectionMissing> {
1128 Ok(self.collection(id)?.time_dependence.clone())
1129 }
1130
1131 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1133 let Self {
1135 client: _,
1136 replicas,
1137 collections,
1138 } = self;
1139
1140 let instance = self.call_sync(|i| i.dump()).await?;
1141 let replicas: Vec<_> = replicas.iter().map(|id| id.to_string()).collect();
1142 let collections: BTreeMap<_, _> = collections
1143 .iter()
1144 .map(|(id, c)| (id.to_string(), format!("{c:?}")))
1145 .collect();
1146
1147 fn field(
1148 key: &str,
1149 value: impl Serialize,
1150 ) -> Result<(String, serde_json::Value), anyhow::Error> {
1151 let value = serde_json::to_value(value)?;
1152 Ok((key.to_string(), value))
1153 }
1154
1155 let map = serde_json::Map::from_iter([
1156 field("instance", instance)?,
1157 field("replicas", replicas)?,
1158 field("collections", collections)?,
1159 ]);
1160 Ok(serde_json::Value::Object(map))
1161 }
1162}
1163
1164#[derive(Debug)]
1165struct Collection<T> {
1166 write_only: bool,
1167 compute_dependencies: BTreeSet<GlobalId>,
1168 shared: SharedCollectionState<T>,
1169 time_dependence: Option<TimeDependence>,
1172}
1173
1174impl<T: Timestamp> Collection<T> {
1175 fn new_log() -> Self {
1176 let as_of = Antichain::from_elem(T::minimum());
1177 Self {
1178 write_only: false,
1179 compute_dependencies: Default::default(),
1180 shared: SharedCollectionState::new(as_of),
1181 time_dependence: Some(TimeDependence::default()),
1182 }
1183 }
1184
1185 fn frontiers(&self) -> CollectionFrontiers<T> {
1186 let read_frontier = self
1187 .shared
1188 .lock_read_capabilities(|c| c.frontier().to_owned());
1189 let write_frontier = self.shared.lock_write_frontier(|f| f.clone());
1190 CollectionFrontiers {
1191 read_frontier,
1192 write_frontier,
1193 }
1194 }
1195}
1196
1197#[derive(Clone, Debug)]
1199pub struct CollectionFrontiers<T> {
1200 pub read_frontier: Antichain<T>,
1202 pub write_frontier: Antichain<T>,
1204}
1205
1206impl<T: Timestamp> Default for CollectionFrontiers<T> {
1207 fn default() -> Self {
1208 Self {
1209 read_frontier: Antichain::from_elem(T::minimum()),
1210 write_frontier: Antichain::from_elem(T::minimum()),
1211 }
1212 }
1213}