1use std::collections::{BTreeMap, BTreeSet};
32use std::num::NonZeroI64;
33use std::sync::{Arc, Mutex};
34use std::time::Duration;
35
36use futures::stream::FuturesUnordered;
37use futures::{FutureExt, StreamExt};
38use mz_build_info::BuildInfo;
39use mz_cluster_client::client::ClusterReplicaLocation;
40use mz_cluster_client::metrics::ControllerMetrics;
41use mz_cluster_client::{ReplicaId, WallclockLagFn};
42use mz_compute_types::ComputeInstanceId;
43use mz_compute_types::config::ComputeReplicaConfig;
44use mz_compute_types::dataflows::DataflowDescription;
45use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
46use mz_controller_types::dyncfgs::{
47 ENABLE_TIMELY_ZERO_COPY, ENABLE_TIMELY_ZERO_COPY_LGALLOC, TIMELY_ZERO_COPY_LIMIT,
48};
49use mz_dyncfg::ConfigSet;
50use mz_expr::RowSetFinishing;
51use mz_ore::cast::CastFrom;
52use mz_ore::collections::CollectionExt;
53use mz_ore::metrics::MetricsRegistry;
54use mz_ore::now::NowFn;
55use mz_ore::tracing::OpenTelemetryContext;
56use mz_repr::{Datum, GlobalId, Row, TimestampManipulation};
57use mz_storage_client::controller::StorageController;
58use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
59use mz_storage_types::read_holds::ReadHold;
60use mz_storage_types::read_policy::ReadPolicy;
61use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
62use prometheus::proto::LabelPair;
63use serde::{Deserialize, Serialize};
64use timely::PartialOrder;
65use timely::progress::{Antichain, Timestamp};
66use tokio::sync::{mpsc, oneshot};
67use tokio::time::{self, MissedTickBehavior};
68use tracing::debug_span;
69use uuid::Uuid;
70
71use crate::controller::error::{
72 CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
73 HydrationCheckBadTarget, InstanceExists, InstanceMissing, PeekError, ReadPolicyError,
74 ReplicaCreationError, ReplicaDropError,
75};
76use crate::controller::instance::{Instance, SharedCollectionState};
77use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection_sink};
78use crate::controller::replica::ReplicaConfig;
79use crate::logging::{LogVariant, LoggingConfig};
80use crate::metrics::ComputeControllerMetrics;
81use crate::protocol::command::{ComputeParameters, InitialComputeParameters, PeekTarget};
82use crate::protocol::response::{PeekResponse, SubscribeBatch};
83use crate::service::{ComputeClient, ComputeGrpcClient};
84
85mod instance;
86mod introspection;
87mod replica;
88mod sequential_hydration;
89
90pub mod error;
91
92pub(crate) type StorageCollections<T> = Arc<
93 dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T> + Send + Sync,
94>;
95
96pub trait ComputeControllerTimestamp: TimestampManipulation + Into<Datum<'static>> + Sync {}
99
100impl ComputeControllerTimestamp for mz_repr::Timestamp {}
101
102#[derive(Debug)]
104pub enum ComputeControllerResponse<T> {
105 PeekNotification(Uuid, PeekNotification, OpenTelemetryContext),
107 SubscribeResponse(GlobalId, SubscribeBatch<T>),
109 CopyToResponse(GlobalId, Result<u64, anyhow::Error>),
120 FrontierUpper {
125 id: GlobalId,
127 upper: Antichain<T>,
129 },
130}
131
132#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
134pub enum PeekNotification {
135 Success {
137 rows: u64,
139 result_size: u64,
141 },
142 Error(String),
144 Canceled,
146}
147
148impl PeekNotification {
149 fn new(peek_response: &PeekResponse, offset: usize, limit: Option<usize>) -> Self {
152 match peek_response {
153 PeekResponse::Rows(rows) => Self::Success {
154 rows: u64::cast_from(rows.count(offset, limit)),
155 result_size: u64::cast_from(rows.byte_len()),
156 },
157 PeekResponse::Error(err) => Self::Error(err.clone()),
158 PeekResponse::Canceled => Self::Canceled,
159 }
160 }
161}
162
163pub struct ComputeController<T: ComputeControllerTimestamp> {
165 instances: BTreeMap<ComputeInstanceId, InstanceState<T>>,
166 instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>,
170 build_info: &'static BuildInfo,
171 storage_collections: StorageCollections<T>,
173 initialized: bool,
175 read_only: bool,
181 config: ComputeParameters,
183 initial_config: InitialComputeParameters,
185 stashed_response: Option<ComputeControllerResponse<T>>,
187 envd_epoch: NonZeroI64,
189 metrics: ComputeControllerMetrics,
191 now: NowFn,
193 wallclock_lag: WallclockLagFn<T>,
195 dyncfg: Arc<ConfigSet>,
200
201 response_rx: mpsc::UnboundedReceiver<ComputeControllerResponse<T>>,
203 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
205 introspection_rx: Option<mpsc::UnboundedReceiver<IntrospectionUpdates>>,
210 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
212
213 maintenance_ticker: tokio::time::Interval,
215 maintenance_scheduled: bool,
217}
218
219impl<T: ComputeControllerTimestamp> ComputeController<T> {
220 pub fn new(
222 build_info: &'static BuildInfo,
223 storage_collections: StorageCollections<T>,
224 envd_epoch: NonZeroI64,
225 read_only: bool,
226 metrics_registry: &MetricsRegistry,
227 controller_metrics: ControllerMetrics,
228 now: NowFn,
229 wallclock_lag: WallclockLagFn<T>,
230 ) -> Self {
231 let (response_tx, response_rx) = mpsc::unbounded_channel();
232 let (introspection_tx, introspection_rx) = mpsc::unbounded_channel();
233
234 let mut maintenance_ticker = time::interval(Duration::from_secs(1));
235 maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
236
237 let instance_workload_classes = Arc::new(Mutex::new(BTreeMap::<
238 ComputeInstanceId,
239 Option<String>,
240 >::new()));
241
242 metrics_registry.register_postprocessor({
246 let instance_workload_classes = Arc::clone(&instance_workload_classes);
247 move |metrics| {
248 let instance_workload_classes = instance_workload_classes
249 .lock()
250 .expect("lock poisoned")
251 .iter()
252 .map(|(id, workload_class)| (id.to_string(), workload_class.clone()))
253 .collect::<BTreeMap<String, Option<String>>>();
254 for metric in metrics {
255 'metric: for metric in metric.mut_metric() {
256 for label in metric.get_label() {
257 if label.get_name() == "instance_id" {
258 if let Some(workload_class) = instance_workload_classes
259 .get(label.get_value())
260 .cloned()
261 .flatten()
262 {
263 let mut label = LabelPair::default();
264 label.set_name("workload_class".into());
265 label.set_value(workload_class.clone());
266
267 let mut labels = metric.take_label();
268 labels.push(label);
269 metric.set_label(labels);
270 }
271 continue 'metric;
272 }
273 }
274 }
275 }
276 }
277 });
278
279 let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics);
280
281 let initial_config = InitialComputeParameters {
282 arrangement_exert_proportionality: 16,
283 enable_zero_copy: false,
284 enable_zero_copy_lgalloc: false,
285 zero_copy_limit: None,
286 };
287
288 Self {
289 instances: BTreeMap::new(),
290 instance_workload_classes,
291 build_info,
292 storage_collections,
293 initialized: false,
294 read_only,
295 config: Default::default(),
296 initial_config,
297 stashed_response: None,
298 envd_epoch,
299 metrics,
300 now,
301 wallclock_lag,
302 dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()),
303 response_rx,
304 response_tx,
305 introspection_rx: Some(introspection_rx),
306 introspection_tx,
307 maintenance_ticker,
308 maintenance_scheduled: false,
309 }
310 }
311
312 pub fn start_introspection_sink(
317 &mut self,
318 storage_controller: &dyn StorageController<Timestamp = T>,
319 ) {
320 if let Some(rx) = self.introspection_rx.take() {
321 spawn_introspection_sink(rx, storage_controller);
322 }
323 }
324
325 pub fn instance_exists(&self, id: ComputeInstanceId) -> bool {
327 self.instances.contains_key(&id)
328 }
329
330 fn instance(&self, id: ComputeInstanceId) -> Result<&InstanceState<T>, InstanceMissing> {
332 self.instances.get(&id).ok_or(InstanceMissing(id))
333 }
334
335 fn instance_mut(
337 &mut self,
338 id: ComputeInstanceId,
339 ) -> Result<&mut InstanceState<T>, InstanceMissing> {
340 self.instances.get_mut(&id).ok_or(InstanceMissing(id))
341 }
342
343 pub fn collection_ids(
345 &self,
346 instance_id: ComputeInstanceId,
347 ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
348 let instance = self.instance(instance_id)?;
349 let ids = instance.collections.keys().copied();
350 Ok(ids)
351 }
352
353 pub fn collection_frontiers(
358 &self,
359 collection_id: GlobalId,
360 instance_id: Option<ComputeInstanceId>,
361 ) -> Result<CollectionFrontiers<T>, CollectionLookupError> {
362 let collection = match instance_id {
363 Some(id) => self.instance(id)?.collection(collection_id)?,
364 None => self
365 .instances
366 .values()
367 .find_map(|i| i.collections.get(&collection_id))
368 .ok_or(CollectionMissing(collection_id))?,
369 };
370
371 Ok(collection.frontiers())
372 }
373
374 pub fn collection_reverse_dependencies(
376 &self,
377 instance_id: ComputeInstanceId,
378 id: GlobalId,
379 ) -> Result<impl Iterator<Item = GlobalId> + '_, InstanceMissing> {
380 let instance = self.instance(instance_id)?;
381 let collections = instance.collections.iter();
382 let ids = collections
383 .filter_map(move |(cid, c)| c.compute_dependencies.contains(&id).then_some(*cid));
384 Ok(ids)
385 }
386
387 pub fn set_arrangement_exert_proportionality(&mut self, value: u32) {
389 self.initial_config.arrangement_exert_proportionality = value;
390 }
391
392 pub fn set_enable_zero_copy(&mut self, value: bool) {
394 self.initial_config.enable_zero_copy = value;
395 }
396
397 pub fn set_enable_zero_copy_lgalloc(&mut self, value: bool) {
399 self.initial_config.enable_zero_copy_lgalloc = value;
400 }
401
402 pub fn set_zero_copy_limit(&mut self, value: Option<usize>) {
404 self.initial_config.zero_copy_limit = value;
405 }
406
407 pub async fn clusters_hydrated(&self, exclude_collections: &BTreeSet<GlobalId>) -> bool {
414 let instances = self.instances.iter();
415 let mut pending: FuturesUnordered<_> = instances
416 .map(|(id, instance)| {
417 let exclude_collections = exclude_collections.clone();
418 instance
419 .call_sync(move |i| i.collections_hydrated(&exclude_collections))
420 .map(move |x| (id, x))
421 })
422 .collect();
423
424 let mut result = true;
425 while let Some((id, hydrated)) = pending.next().await {
426 if !hydrated {
427 result = false;
428
429 tracing::info!("cluster {id} is not hydrated");
432 }
433 }
434
435 result
436 }
437
438 pub async fn collection_hydrated(
444 &self,
445 instance_id: ComputeInstanceId,
446 collection_id: GlobalId,
447 ) -> Result<bool, anyhow::Error> {
448 let instance = self.instance(instance_id)?;
449
450 let res = instance
451 .call_sync(move |i| i.collection_hydrated(collection_id))
452 .await?;
453
454 Ok(res)
455 }
456
457 pub fn collections_hydrated_for_replicas(
464 &self,
465 instance_id: ComputeInstanceId,
466 replicas: Vec<ReplicaId>,
467 exclude_collections: BTreeSet<GlobalId>,
468 ) -> Result<oneshot::Receiver<bool>, anyhow::Error> {
469 let instance = self.instance(instance_id)?;
470
471 if instance.replicas.is_empty() && !replicas.iter().any(|id| instance.replicas.contains(id))
473 {
474 return Err(HydrationCheckBadTarget(replicas).into());
475 }
476
477 let (tx, rx) = oneshot::channel();
478 instance.call(move |i| {
479 let result = i
480 .collections_hydrated_on_replicas(Some(replicas), &exclude_collections)
481 .expect("validated");
482 let _ = tx.send(result);
483 });
484
485 Ok(rx)
486 }
487
488 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
492 let Self {
499 instances,
500 instance_workload_classes,
501 build_info: _,
502 storage_collections: _,
503 initialized,
504 read_only,
505 config: _,
506 initial_config,
507 stashed_response,
508 envd_epoch,
509 metrics: _,
510 now: _,
511 wallclock_lag: _,
512 dyncfg: _,
513 response_rx: _,
514 response_tx: _,
515 introspection_rx: _,
516 introspection_tx: _,
517 maintenance_ticker: _,
518 maintenance_scheduled,
519 } = self;
520
521 let mut instances_dump = BTreeMap::new();
522 for (id, instance) in instances {
523 let dump = instance.dump().await?;
524 instances_dump.insert(id.to_string(), dump);
525 }
526
527 let instance_workload_classes: BTreeMap<_, _> = instance_workload_classes
528 .lock()
529 .expect("lock poisoned")
530 .iter()
531 .map(|(id, wc)| (id.to_string(), format!("{wc:?}")))
532 .collect();
533
534 fn field(
535 key: &str,
536 value: impl Serialize,
537 ) -> Result<(String, serde_json::Value), anyhow::Error> {
538 let value = serde_json::to_value(value)?;
539 Ok((key.to_string(), value))
540 }
541
542 let map = serde_json::Map::from_iter([
543 field("instances", instances_dump)?,
544 field("instance_workload_classes", instance_workload_classes)?,
545 field("initialized", initialized)?,
546 field("read_only", read_only)?,
547 field("initial_config", initial_config)?,
548 field("stashed_response", format!("{stashed_response:?}"))?,
549 field("envd_epoch", envd_epoch)?,
550 field("maintenance_scheduled", maintenance_scheduled)?,
551 ]);
552 Ok(serde_json::Value::Object(map))
553 }
554}
555
556impl<T> ComputeController<T>
557where
558 T: ComputeControllerTimestamp,
559 ComputeGrpcClient: ComputeClient<T>,
560{
561 pub fn create_instance(
563 &mut self,
564 id: ComputeInstanceId,
565 arranged_logs: BTreeMap<LogVariant, GlobalId>,
566 workload_class: Option<String>,
567 ) -> Result<(), InstanceExists> {
568 if self.instances.contains_key(&id) {
569 return Err(InstanceExists(id));
570 }
571
572 let mut collections = BTreeMap::new();
573 let mut logs = Vec::with_capacity(arranged_logs.len());
574 for (&log, &id) in &arranged_logs {
575 let collection = Collection::new_log();
576 let shared = collection.shared.clone();
577 collections.insert(id, collection);
578 logs.push((log, id, shared));
579 }
580
581 let client = instance::Client::spawn(
582 id,
583 self.build_info,
584 Arc::clone(&self.storage_collections),
585 logs,
586 self.envd_epoch,
587 self.metrics.for_instance(id),
588 self.now.clone(),
589 self.wallclock_lag.clone(),
590 Arc::clone(&self.dyncfg),
591 self.response_tx.clone(),
592 self.introspection_tx.clone(),
593 );
594
595 let instance = InstanceState::new(client, collections);
596 self.instances.insert(id, instance);
597
598 self.instance_workload_classes
599 .lock()
600 .expect("lock poisoned")
601 .insert(id, workload_class.clone());
602
603 let instance = self.instances.get_mut(&id).expect("instance just added");
604 if self.initialized {
605 instance.call(Instance::initialization_complete);
606 }
607
608 if !self.read_only {
609 instance.call(Instance::allow_writes);
610 }
611
612 let mut config_params = self.config.clone();
613 config_params.workload_class = Some(workload_class);
614 instance.call(|i| i.update_configuration(config_params));
615
616 Ok(())
617 }
618
619 pub fn update_instance_workload_class(
621 &mut self,
622 id: ComputeInstanceId,
623 workload_class: Option<String>,
624 ) -> Result<(), InstanceMissing> {
625 let _ = self.instance(id)?;
627
628 self.instance_workload_classes
629 .lock()
630 .expect("lock poisoned")
631 .insert(id, workload_class);
632
633 self.update_configuration(Default::default());
635
636 Ok(())
637 }
638
639 pub fn drop_instance(&mut self, id: ComputeInstanceId) {
645 if let Some(instance) = self.instances.remove(&id) {
646 instance.call(|i| i.shutdown());
647 }
648
649 self.instance_workload_classes
650 .lock()
651 .expect("lock poisoned")
652 .remove(&id);
653 }
654
655 pub fn dyncfg(&self) -> &Arc<ConfigSet> {
657 &self.dyncfg
658 }
659
660 pub fn update_configuration(&mut self, config_params: ComputeParameters) {
662 config_params.dyncfg_updates.apply(&self.dyncfg);
664
665 self.set_enable_zero_copy(ENABLE_TIMELY_ZERO_COPY.get(&self.dyncfg));
667 self.set_enable_zero_copy_lgalloc(ENABLE_TIMELY_ZERO_COPY_LGALLOC.get(&self.dyncfg));
668 self.set_zero_copy_limit(TIMELY_ZERO_COPY_LIMIT.get(&self.dyncfg));
669
670 let instance_workload_classes = self
671 .instance_workload_classes
672 .lock()
673 .expect("lock poisoned");
674
675 for (id, instance) in self.instances.iter_mut() {
678 let mut params = config_params.clone();
679 params.workload_class = Some(instance_workload_classes[id].clone());
680 instance.call(|i| i.update_configuration(params));
681 }
682
683 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(&self.dyncfg);
684 match overflowing_behavior.parse() {
685 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
686 Err(err) => {
687 tracing::error!(
688 err,
689 overflowing_behavior,
690 "Invalid value for ore_overflowing_behavior"
691 );
692 }
693 }
694
695 self.config.update(config_params);
697 }
698
699 pub fn initialization_complete(&mut self) {
705 self.initialized = true;
706 for instance in self.instances.values_mut() {
707 instance.call(Instance::initialization_complete);
708 }
709 }
710
711 pub async fn ready(&mut self) {
719 if self.stashed_response.is_some() {
720 return;
722 }
723 if self.maintenance_scheduled {
724 return;
726 }
727
728 tokio::select! {
729 resp = self.response_rx.recv() => {
730 let resp = resp.expect("`self.response_tx` not dropped");
731 self.stashed_response = Some(resp);
732 }
733 _ = self.maintenance_ticker.tick() => {
734 self.maintenance_scheduled = true;
735 },
736 }
737 }
738
739 pub fn add_replica_to_instance(
741 &mut self,
742 instance_id: ComputeInstanceId,
743 replica_id: ReplicaId,
744 location: ClusterReplicaLocation,
745 config: ComputeReplicaConfig,
746 ) -> Result<(), ReplicaCreationError> {
747 use ReplicaCreationError::*;
748
749 let instance = self.instance(instance_id)?;
750
751 if instance.replicas.contains(&replica_id) {
753 return Err(ReplicaExists(replica_id));
754 }
755
756 let (enable_logging, interval) = match config.logging.interval {
757 Some(interval) => (true, interval),
758 None => (false, Duration::from_secs(1)),
759 };
760
761 let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.dyncfg);
762
763 let replica_config = ReplicaConfig {
764 location,
765 logging: LoggingConfig {
766 interval,
767 enable_logging,
768 log_logging: config.logging.log_logging,
769 index_logs: Default::default(),
770 },
771 grpc_client: self.config.grpc_client.clone(),
772 expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
773 initial_config: self.initial_config.clone(),
774 };
775
776 let instance = self.instance_mut(instance_id).expect("validated");
777 instance.replicas.insert(replica_id);
778
779 instance.call(move |i| {
780 i.add_replica(replica_id, replica_config)
781 .expect("validated")
782 });
783
784 Ok(())
785 }
786
787 pub fn drop_replica(
789 &mut self,
790 instance_id: ComputeInstanceId,
791 replica_id: ReplicaId,
792 ) -> Result<(), ReplicaDropError> {
793 use ReplicaDropError::*;
794
795 let instance = self.instance_mut(instance_id)?;
796
797 if !instance.replicas.contains(&replica_id) {
799 return Err(ReplicaMissing(replica_id));
800 }
801
802 instance.replicas.remove(&replica_id);
803
804 instance.call(move |i| i.remove_replica(replica_id).expect("validated"));
805
806 Ok(())
807 }
808
809 pub fn create_dataflow(
815 &mut self,
816 instance_id: ComputeInstanceId,
817 mut dataflow: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
818 subscribe_target_replica: Option<ReplicaId>,
819 ) -> Result<(), DataflowCreationError> {
820 use DataflowCreationError::*;
821
822 let instance = self.instance(instance_id)?;
823
824 if let Some(replica_id) = subscribe_target_replica {
826 if !instance.replicas.contains(&replica_id) {
827 return Err(ReplicaMissing(replica_id));
828 }
829 }
830
831 let as_of = dataflow.as_of.as_ref().ok_or(MissingAsOf)?;
833 if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() {
834 return Err(EmptyAsOfForSubscribe);
835 }
836 if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() {
837 return Err(EmptyAsOfForCopyTo);
838 }
839
840 let storage_ids = dataflow.imported_source_ids().collect();
842 let mut import_read_holds = self.storage_collections.acquire_read_holds(storage_ids)?;
843 for id in dataflow.imported_index_ids() {
844 let read_hold = instance.acquire_read_hold(id)?;
845 import_read_holds.push(read_hold);
846 }
847 for hold in &import_read_holds {
848 if PartialOrder::less_than(as_of, hold.since()) {
849 return Err(SinceViolation(hold.id()));
850 }
851 }
852
853 for id in dataflow.persist_sink_ids() {
855 if self.storage_collections.check_exists(id).is_err() {
856 return Err(CollectionMissing(id));
857 }
858 }
859 let time_dependence = self
860 .determine_time_dependence(instance_id, &dataflow)
861 .expect("must exist");
862
863 let instance = self.instance_mut(instance_id).expect("validated");
864
865 let mut shared_collection_state = BTreeMap::new();
866 for id in dataflow.export_ids() {
867 let shared = SharedCollectionState::new(as_of.clone());
868 let collection = Collection {
869 write_only: dataflow.sink_exports.contains_key(&id),
870 compute_dependencies: dataflow.imported_index_ids().collect(),
871 shared: shared.clone(),
872 time_dependence: time_dependence.clone(),
873 };
874 instance.collections.insert(id, collection);
875 shared_collection_state.insert(id, shared);
876 }
877
878 dataflow.time_dependence = time_dependence;
879
880 instance.call(move |i| {
881 i.create_dataflow(
882 dataflow,
883 import_read_holds,
884 subscribe_target_replica,
885 shared_collection_state,
886 )
887 .expect("validated")
888 });
889
890 Ok(())
891 }
892
893 pub fn drop_collections(
896 &mut self,
897 instance_id: ComputeInstanceId,
898 collection_ids: Vec<GlobalId>,
899 ) -> Result<(), CollectionUpdateError> {
900 let instance = self.instance_mut(instance_id)?;
901
902 for id in &collection_ids {
904 instance.collection(*id)?;
905 }
906
907 for id in &collection_ids {
908 instance.collections.remove(id);
909 }
910
911 instance.call(|i| i.drop_collections(collection_ids).expect("validated"));
912
913 Ok(())
914 }
915
916 pub fn peek(
918 &self,
919 instance_id: ComputeInstanceId,
920 peek_target: PeekTarget,
921 literal_constraints: Option<Vec<Row>>,
922 uuid: Uuid,
923 timestamp: T,
924 finishing: RowSetFinishing,
925 map_filter_project: mz_expr::SafeMfpPlan,
926 target_replica: Option<ReplicaId>,
927 peek_response_tx: oneshot::Sender<PeekResponse>,
928 ) -> Result<(), PeekError> {
929 use PeekError::*;
930
931 let instance = self.instance(instance_id)?;
932
933 if let Some(replica_id) = target_replica {
935 if !instance.replicas.contains(&replica_id) {
936 return Err(ReplicaMissing(replica_id));
937 }
938 }
939
940 let read_hold = match &peek_target {
942 PeekTarget::Index { id } => instance.acquire_read_hold(*id)?,
943 PeekTarget::Persist { id, .. } => self
944 .storage_collections
945 .acquire_read_holds(vec![*id])?
946 .into_element(),
947 };
948 if !read_hold.since().less_equal(×tamp) {
949 return Err(SinceViolation(peek_target.id()));
950 }
951
952 instance.call(move |i| {
953 i.peek(
954 peek_target,
955 literal_constraints,
956 uuid,
957 timestamp,
958 finishing,
959 map_filter_project,
960 read_hold,
961 target_replica,
962 peek_response_tx,
963 )
964 .expect("validated")
965 });
966
967 Ok(())
968 }
969
970 pub fn cancel_peek(
980 &self,
981 instance_id: ComputeInstanceId,
982 uuid: Uuid,
983 reason: PeekResponse,
984 ) -> Result<(), InstanceMissing> {
985 self.instance(instance_id)?
986 .call(move |i| i.cancel_peek(uuid, reason));
987 Ok(())
988 }
989
990 pub fn set_read_policy(
1002 &self,
1003 instance_id: ComputeInstanceId,
1004 policies: Vec<(GlobalId, ReadPolicy<T>)>,
1005 ) -> Result<(), ReadPolicyError> {
1006 use ReadPolicyError::*;
1007
1008 let instance = self.instance(instance_id)?;
1009
1010 for (id, _) in &policies {
1012 let collection = instance.collection(*id)?;
1013 if collection.write_only {
1014 return Err(WriteOnlyCollection(*id));
1015 }
1016 }
1017
1018 self.instance(instance_id)?
1019 .call(|i| i.set_read_policy(policies).expect("validated"));
1020
1021 Ok(())
1022 }
1023
1024 pub fn acquire_read_hold(
1026 &self,
1027 instance_id: ComputeInstanceId,
1028 collection_id: GlobalId,
1029 ) -> Result<ReadHold<T>, CollectionUpdateError> {
1030 let read_hold = self
1031 .instance(instance_id)?
1032 .acquire_read_hold(collection_id)?;
1033 Ok(read_hold)
1034 }
1035
1036 fn determine_time_dependence(
1038 &self,
1039 instance_id: ComputeInstanceId,
1040 dataflow: &DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
1041 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
1042 let is_continual_task = dataflow.continual_task_ids().next().is_some();
1044 if is_continual_task {
1045 return Ok(None);
1046 }
1047
1048 let instance = self
1049 .instance(instance_id)
1050 .map_err(|err| TimeDependenceError::InstanceMissing(err.0))?;
1051 let mut time_dependencies = Vec::new();
1052
1053 for id in dataflow.imported_index_ids() {
1054 let dependence = instance
1055 .get_time_dependence(id)
1056 .map_err(|err| TimeDependenceError::CollectionMissing(err.0))?;
1057 time_dependencies.push(dependence);
1058 }
1059
1060 'source: for id in dataflow.imported_source_ids() {
1061 for instance in self.instances.values() {
1065 if let Ok(dependence) = instance.get_time_dependence(id) {
1066 time_dependencies.push(dependence);
1067 continue 'source;
1068 }
1069 }
1070
1071 time_dependencies.push(self.storage_collections.determine_time_dependence(id)?);
1073 }
1074
1075 Ok(TimeDependence::merge(
1076 time_dependencies,
1077 dataflow.refresh_schedule.as_ref(),
1078 ))
1079 }
1080
1081 #[mz_ore::instrument(level = "debug")]
1083 pub fn process(&mut self) -> Option<ComputeControllerResponse<T>> {
1084 if self.maintenance_scheduled {
1086 self.maintain();
1087 self.maintenance_scheduled = false;
1088 }
1089
1090 self.stashed_response.take()
1092 }
1093
1094 #[mz_ore::instrument(level = "debug")]
1095 fn maintain(&mut self) {
1096 for instance in self.instances.values_mut() {
1098 instance.call(Instance::maintain);
1099 }
1100 }
1101}
1102
1103#[derive(Debug)]
1104struct InstanceState<T: ComputeControllerTimestamp> {
1105 client: instance::Client<T>,
1106 replicas: BTreeSet<ReplicaId>,
1107 collections: BTreeMap<GlobalId, Collection<T>>,
1108}
1109
1110impl<T: ComputeControllerTimestamp> InstanceState<T> {
1111 fn new(client: instance::Client<T>, collections: BTreeMap<GlobalId, Collection<T>>) -> Self {
1112 Self {
1113 client,
1114 replicas: Default::default(),
1115 collections,
1116 }
1117 }
1118
1119 fn collection(&self, id: GlobalId) -> Result<&Collection<T>, CollectionMissing> {
1120 self.collections.get(&id).ok_or(CollectionMissing(id))
1121 }
1122
1123 fn call<F>(&self, f: F)
1124 where
1125 F: FnOnce(&mut Instance<T>) + Send + 'static,
1126 {
1127 let otel_ctx = OpenTelemetryContext::obtain();
1128 self.client
1129 .send(Box::new(move |instance| {
1130 let _span = debug_span!("instance::call").entered();
1131 otel_ctx.attach_as_parent();
1132
1133 f(instance)
1134 }))
1135 .expect("instance not dropped");
1136 }
1137
1138 async fn call_sync<F, R>(&self, f: F) -> R
1139 where
1140 F: FnOnce(&mut Instance<T>) -> R + Send + 'static,
1141 R: Send + 'static,
1142 {
1143 let (tx, rx) = oneshot::channel();
1144 let otel_ctx = OpenTelemetryContext::obtain();
1145 self.client
1146 .send(Box::new(move |instance| {
1147 let _span = debug_span!("instance::call_sync").entered();
1148 otel_ctx.attach_as_parent();
1149
1150 let result = f(instance);
1151 let _ = tx.send(result);
1152 }))
1153 .expect("instance not dropped");
1154
1155 rx.await.expect("instance not dropped")
1156 }
1157
1158 pub fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
1160 let collection = self.collection(id)?;
1170 let since = collection.shared.lock_read_capabilities(|caps| {
1171 let since = caps.frontier().to_owned();
1172 caps.update_iter(since.iter().map(|t| (t.clone(), 1)));
1173 since
1174 });
1175
1176 let hold = ReadHold::new(id, since, self.client.read_hold_tx());
1177 Ok(hold)
1178 }
1179
1180 fn get_time_dependence(
1182 &self,
1183 id: GlobalId,
1184 ) -> Result<Option<TimeDependence>, CollectionMissing> {
1185 Ok(self.collection(id)?.time_dependence.clone())
1186 }
1187
1188 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1190 let Self {
1192 client: _,
1193 replicas,
1194 collections,
1195 } = self;
1196
1197 let instance = self.call_sync(|i| i.dump()).await?;
1198 let replicas: Vec<_> = replicas.iter().map(|id| id.to_string()).collect();
1199 let collections: BTreeMap<_, _> = collections
1200 .iter()
1201 .map(|(id, c)| (id.to_string(), format!("{c:?}")))
1202 .collect();
1203
1204 fn field(
1205 key: &str,
1206 value: impl Serialize,
1207 ) -> Result<(String, serde_json::Value), anyhow::Error> {
1208 let value = serde_json::to_value(value)?;
1209 Ok((key.to_string(), value))
1210 }
1211
1212 let map = serde_json::Map::from_iter([
1213 field("instance", instance)?,
1214 field("replicas", replicas)?,
1215 field("collections", collections)?,
1216 ]);
1217 Ok(serde_json::Value::Object(map))
1218 }
1219}
1220
1221#[derive(Debug)]
1222struct Collection<T> {
1223 write_only: bool,
1224 compute_dependencies: BTreeSet<GlobalId>,
1225 shared: SharedCollectionState<T>,
1226 time_dependence: Option<TimeDependence>,
1229}
1230
1231impl<T: Timestamp> Collection<T> {
1232 fn new_log() -> Self {
1233 let as_of = Antichain::from_elem(T::minimum());
1234 Self {
1235 write_only: false,
1236 compute_dependencies: Default::default(),
1237 shared: SharedCollectionState::new(as_of),
1238 time_dependence: Some(TimeDependence::default()),
1239 }
1240 }
1241
1242 fn frontiers(&self) -> CollectionFrontiers<T> {
1243 let read_frontier = self
1244 .shared
1245 .lock_read_capabilities(|c| c.frontier().to_owned());
1246 let write_frontier = self.shared.lock_write_frontier(|f| f.clone());
1247 CollectionFrontiers {
1248 read_frontier,
1249 write_frontier,
1250 }
1251 }
1252}
1253
1254#[derive(Clone, Debug)]
1256pub struct CollectionFrontiers<T> {
1257 pub read_frontier: Antichain<T>,
1259 pub write_frontier: Antichain<T>,
1261}
1262
1263impl<T: Timestamp> Default for CollectionFrontiers<T> {
1264 fn default() -> Self {
1265 Self {
1266 read_frontier: Antichain::from_elem(T::minimum()),
1267 write_frontier: Antichain::from_elem(T::minimum()),
1268 }
1269 }
1270}