1use crate::CollectionMetadata;
13use std::collections::{BTreeMap, BTreeSet};
14use std::num::NonZeroI64;
15use std::sync::atomic::AtomicBool;
16use std::sync::{Arc, atomic};
17use std::time::{Duration, Instant};
18
19use anyhow::bail;
20use differential_dataflow::lattice::Lattice;
21use itertools::Itertools;
22use mz_build_info::BuildInfo;
23use mz_cluster_client::ReplicaId;
24use mz_cluster_client::client::{ClusterReplicaLocation, ClusterStartupEpoch, TimelyConfig};
25use mz_controller_types::dyncfgs::ENABLE_CREATE_SOCKETS_V2;
26use mz_dyncfg::ConfigSet;
27use mz_ore::cast::CastFrom;
28use mz_ore::now::NowFn;
29use mz_ore::retry::{Retry, RetryState};
30use mz_ore::task::AbortOnDropHandle;
31use mz_repr::GlobalId;
32use mz_service::client::{GenericClient, Partitioned};
33use mz_service::params::GrpcClientParameters;
34use mz_storage_client::client::{
35 RunIngestionCommand, RunSinkCommand, Status, StatusUpdate, StorageClient, StorageCommand,
36 StorageGrpcClient, StorageResponse,
37};
38use mz_storage_client::metrics::{InstanceMetrics, ReplicaMetrics};
39use mz_storage_types::dyncfgs::STORAGE_SINK_SNAPSHOT_FRONTIER;
40use mz_storage_types::sinks::StorageSinkDesc;
41use mz_storage_types::sources::{IngestionDescription, SourceConnection};
42use timely::order::TotalOrder;
43use timely::progress::{Antichain, Timestamp};
44use tokio::select;
45use tokio::sync::mpsc;
46use tracing::{debug, info, warn};
47
48use crate::history::CommandHistory;
49
50#[derive(Debug)]
60pub(crate) struct Instance<T> {
61 pub workload_class: Option<String>,
65 replicas: BTreeMap<ReplicaId, Replica<T>>,
67 active_ingestions: BTreeMap<GlobalId, ActiveIngestion>,
73 ingestion_exports: BTreeMap<GlobalId, GlobalId>,
75 active_exports: BTreeMap<GlobalId, ActiveExport>,
81 history: CommandHistory<T>,
84 epoch: ClusterStartupEpoch,
89 metrics: InstanceMetrics,
91 dyncfg: Arc<ConfigSet>,
93 now: NowFn,
95 response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
101}
102
103#[derive(Debug)]
104struct ActiveIngestion {
105 active_replicas: BTreeSet<ReplicaId>,
107}
108
109#[derive(Debug)]
110struct ActiveExport {
111 active_replicas: BTreeSet<ReplicaId>,
113}
114
115impl<T> Instance<T>
116where
117 T: Timestamp + Lattice + TotalOrder,
118 StorageGrpcClient: StorageClient<T>,
119{
120 pub fn new(
122 envd_epoch: NonZeroI64,
123 metrics: InstanceMetrics,
124 dyncfg: Arc<ConfigSet>,
125 now: NowFn,
126 instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
127 ) -> Self {
128 let enable_snapshot_frontier = STORAGE_SINK_SNAPSHOT_FRONTIER.handle(&dyncfg);
129 let history = CommandHistory::new(metrics.for_history(), enable_snapshot_frontier);
130 let epoch = ClusterStartupEpoch::new(envd_epoch, 0);
131
132 let mut instance = Self {
133 workload_class: None,
134 replicas: Default::default(),
135 active_ingestions: Default::default(),
136 ingestion_exports: Default::default(),
137 active_exports: BTreeMap::new(),
138 history,
139 epoch,
140 metrics,
141 dyncfg,
142 now,
143 response_tx: instance_response_tx,
144 };
145
146 instance.send(StorageCommand::CreateTimely {
147 config: Default::default(),
148 epoch,
149 });
150
151 instance
152 }
153
154 pub fn replica_ids(&self) -> impl Iterator<Item = ReplicaId> + '_ {
156 self.replicas.keys().copied()
157 }
158
159 pub fn add_replica(&mut self, id: ReplicaId, config: ReplicaConfig) {
161 self.history.reduce();
164
165 self.epoch.bump_replica();
166 let metrics = self.metrics.for_replica(id);
167 let replica = Replica::new(
168 id,
169 config,
170 self.epoch,
171 metrics,
172 Arc::clone(&self.dyncfg),
173 self.response_tx.clone(),
174 );
175
176 self.replicas.insert(id, replica);
177
178 self.update_scheduling(false);
179
180 self.replay_commands(id);
181 }
182
183 pub fn replay_commands(&mut self, replica_id: ReplicaId) {
185 let commands = self.history.iter().cloned();
186
187 let filtered_commands = commands
188 .filter_map(|command| match command {
189 StorageCommand::RunIngestion(ingestion) => {
190 if self.is_active_replica(&ingestion.id, &replica_id) {
191 Some(StorageCommand::RunIngestion(ingestion))
192 } else {
193 None
194 }
195 }
196 StorageCommand::RunSink(sink) => {
197 if self.is_active_replica(&sink.id, &replica_id) {
198 Some(StorageCommand::RunSink(sink))
199 } else {
200 None
201 }
202 }
203 StorageCommand::AllowCompaction(id, upper) => {
204 if self.is_active_replica(&id, &replica_id) {
205 Some(StorageCommand::AllowCompaction(id, upper))
206 } else {
207 None
208 }
209 }
210 command => Some(command),
211 })
212 .collect::<Vec<_>>();
213
214 let replica = self
215 .replicas
216 .get_mut(&replica_id)
217 .expect("replica must exist");
218
219 for command in filtered_commands {
221 replica.send(command);
222 }
223 }
224
225 pub fn drop_replica(&mut self, id: ReplicaId) {
227 let replica = self.replicas.remove(&id);
228
229 let mut needs_rescheduling = false;
230 for (ingestion_id, ingestion) in self.active_ingestions.iter_mut() {
231 let was_running = ingestion.active_replicas.remove(&id);
232 if was_running {
233 tracing::debug!(
234 %ingestion_id,
235 replica_id = %id,
236 "ingestion was running on dropped replica, updating scheduling decisions"
237 );
238 needs_rescheduling = true;
239 }
240 }
241 for (export_id, export) in self.active_exports.iter_mut() {
242 let was_running = export.active_replicas.remove(&id);
243 if was_running {
244 tracing::debug!(
245 %export_id,
246 replica_id = %id,
247 "export was running on dropped replica, updating scheduling decisions"
248 );
249 needs_rescheduling = true;
250 }
251 }
252
253 tracing::info!(%id, %needs_rescheduling, "dropped replica");
254
255 if needs_rescheduling {
256 self.update_scheduling(true);
257 }
258
259 if replica.is_some() && self.replicas.is_empty() {
260 self.update_paused_statuses();
261 }
262 }
263
264 pub fn rehydrate_failed_replicas(&mut self) {
266 let replicas = self.replicas.iter();
267 let failed_replicas: Vec<_> = replicas
268 .filter_map(|(id, replica)| replica.failed().then_some(*id))
269 .collect();
270
271 for id in failed_replicas {
272 let replica = self.replicas.remove(&id).expect("must exist");
273 self.add_replica(id, replica.config);
274 }
275 }
276
277 pub fn active_ingestions(&self) -> impl Iterator<Item = &GlobalId> {
279 self.active_ingestions.keys()
280 }
281
282 pub fn active_exports(&self) -> impl Iterator<Item = &GlobalId> {
284 self.active_exports.keys()
285 }
286
287 fn update_paused_statuses(&mut self) {
289 let now = mz_ore::now::to_datetime((self.now)());
290 let make_update = |id, object_type| StatusUpdate {
291 id,
292 status: Status::Paused,
293 timestamp: now,
294 error: None,
295 hints: BTreeSet::from([format!(
296 "There is currently no replica running this {object_type}"
297 )]),
298 namespaced_errors: Default::default(),
299 replica_id: None,
300 };
301
302 self.history.reduce();
303
304 let mut status_updates = Vec::new();
305 for command in self.history.iter() {
306 match command {
307 StorageCommand::RunIngestion(ingestion) => {
308 let subsource_ids = ingestion
318 .description
319 .collection_ids()
320 .filter(|id| id != &ingestion.description.remap_collection_id);
321 for id in subsource_ids {
322 status_updates.push(make_update(id, "source"));
323 }
324 }
325 StorageCommand::RunSink(sink) => {
326 status_updates.push(make_update(sink.id, "sink"));
327 }
328 _ => (),
329 }
330 }
331
332 for update in status_updates {
333 let _ = self
337 .response_tx
338 .send((None, StorageResponse::StatusUpdate(update)));
339 }
340 }
341
342 pub fn send(&mut self, command: StorageCommand<T>) {
344 self.history.push(command.clone());
346
347 match command.clone() {
348 StorageCommand::RunIngestion(ingestion) => {
349 self.absorb_ingestion(*ingestion.clone());
353
354 for replica in self.active_replicas(&ingestion.id) {
355 replica.send(StorageCommand::RunIngestion(ingestion.clone()));
356 }
357 }
358 StorageCommand::RunSink(sink) => {
359 self.absorb_export(*sink.clone());
363
364 for replica in self.active_replicas(&sink.id) {
365 replica.send(StorageCommand::RunSink(sink.clone()));
366 }
367 }
368 StorageCommand::AllowCompaction(id, frontier) => {
369 for replica in self.active_replicas(&id) {
372 replica.send(StorageCommand::AllowCompaction(
373 id.clone(),
374 frontier.clone(),
375 ));
376 }
377
378 self.absorb_compaction(id, frontier);
379 }
380 command => {
381 for replica in self.replicas.values_mut() {
382 replica.send(command.clone());
383 }
384 }
385 }
386
387 if command.installs_objects() && self.replicas.is_empty() {
388 self.update_paused_statuses();
389 }
390 }
391
392 fn absorb_ingestion(&mut self, ingestion: RunIngestionCommand) {
397 let existing_ingestion_state = self.active_ingestions.get_mut(&ingestion.id);
398
399 for id in ingestion.description.source_exports.keys() {
401 self.ingestion_exports.insert(id.clone(), ingestion.id);
402 }
403
404 if let Some(ingestion_state) = existing_ingestion_state {
405 tracing::debug!(
410 ingestion_id = %ingestion.id,
411 active_replicas = %ingestion_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
412 "updating ingestion"
413 );
414 } else {
415 let ingestion_state = ActiveIngestion {
417 active_replicas: BTreeSet::new(),
418 };
419 self.active_ingestions.insert(ingestion.id, ingestion_state);
420
421 self.update_scheduling(false);
423 }
424 }
425
426 fn absorb_export(&mut self, export: RunSinkCommand<T>) {
431 let existing_export_state = self.active_exports.get_mut(&export.id);
432
433 if let Some(export_state) = existing_export_state {
434 tracing::debug!(
439 export_id = %export.id,
440 active_replicas = %export_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
441 "updating export"
442 );
443 } else {
444 let export_state = ActiveExport {
446 active_replicas: BTreeSet::new(),
447 };
448 self.active_exports.insert(export.id, export_state);
449
450 self.update_scheduling(false);
452 }
453 }
454
455 fn update_scheduling(&mut self, send_commands: bool) {
474 #[derive(Debug)]
475 enum ObjectId {
476 Ingestion(GlobalId),
477 Export(GlobalId),
478 }
479 let mut scheduling_preferences: Vec<(ObjectId, bool)> = Vec::new();
484
485 for ingestion_id in self.active_ingestions.keys() {
486 let ingestion_description = self
487 .get_ingestion_description(ingestion_id)
488 .expect("missing ingestion description");
489
490 let prefers_single_replica = ingestion_description
491 .desc
492 .connection
493 .prefers_single_replica();
494
495 scheduling_preferences
496 .push((ObjectId::Ingestion(*ingestion_id), prefers_single_replica));
497 }
498
499 for export_id in self.active_exports.keys() {
500 scheduling_preferences.push((ObjectId::Export(*export_id), true));
502 }
503
504 let mut commands_by_replica: BTreeMap<ReplicaId, Vec<ObjectId>> = BTreeMap::new();
506
507 for (object_id, prefers_single_replica) in scheduling_preferences {
508 let active_replicas = match object_id {
509 ObjectId::Ingestion(ingestion_id) => {
510 &mut self
511 .active_ingestions
512 .get_mut(&ingestion_id)
513 .expect("missing ingestion state")
514 .active_replicas
515 }
516 ObjectId::Export(export_id) => {
517 &mut self
518 .active_exports
519 .get_mut(&export_id)
520 .expect("missing ingestion state")
521 .active_replicas
522 }
523 };
524
525 if prefers_single_replica {
526 if active_replicas.is_empty() {
528 let target_replica = self.replicas.keys().min().copied();
529 if let Some(first_replica_id) = target_replica {
530 tracing::info!(
531 object_id = ?object_id,
532 replica_id = %first_replica_id,
533 "scheduling single-replica object");
534 active_replicas.insert(first_replica_id);
535
536 commands_by_replica
537 .entry(first_replica_id)
538 .or_default()
539 .push(object_id);
540 }
541 } else {
542 tracing::info!(
543 ?object_id,
544 active_replicas = %active_replicas.iter().map(|id| id.to_string()).join(", "),
545 "single-replica object already running, not scheduling again",
546 );
547 }
548 } else {
549 let current_replica_ids: BTreeSet<_> = self.replicas.keys().copied().collect();
550 let unscheduled_replicas: Vec<_> = current_replica_ids
551 .difference(active_replicas)
552 .copied()
553 .collect();
554 for replica_id in unscheduled_replicas {
555 tracing::info!(
556 ?object_id,
557 %replica_id,
558 "scheduling multi-replica object"
559 );
560 active_replicas.insert(replica_id);
561 }
562 }
563 }
564
565 if send_commands {
566 for (replica_id, object_ids) in commands_by_replica {
567 let mut ingestion_commands = vec![];
568 let mut export_commands = vec![];
569 for object_id in object_ids {
570 match object_id {
571 ObjectId::Ingestion(id) => {
572 ingestion_commands.push(RunIngestionCommand {
573 id,
574 description: self
575 .get_ingestion_description(&id)
576 .expect("missing ingestion description")
577 .clone(),
578 });
579 }
580 ObjectId::Export(id) => {
581 export_commands.push(RunSinkCommand {
582 id,
583 description: self
584 .get_export_description(&id)
585 .expect("missing export description")
586 .clone(),
587 });
588 }
589 }
590 }
591 for ingestion in ingestion_commands {
592 let replica = self.replicas.get_mut(&replica_id).expect("missing replica");
593 let ingestion = Box::new(ingestion);
594 replica.send(StorageCommand::RunIngestion(ingestion));
595 }
596 for export in export_commands {
597 let replica = self.replicas.get_mut(&replica_id).expect("missing replica");
598 let export = Box::new(export);
599 replica.send(StorageCommand::RunSink(export));
600 }
601 }
602 }
603 }
604
605 pub fn get_ingestion_description(
613 &self,
614 id: &GlobalId,
615 ) -> Option<IngestionDescription<CollectionMetadata>> {
616 if !self.active_ingestions.contains_key(id) {
617 return None;
618 }
619
620 self.history.iter().rev().find_map(|command| {
621 if let StorageCommand::RunIngestion(ingestion) = command {
622 if &ingestion.id == id {
623 Some(ingestion.description.clone())
624 } else {
625 None
626 }
627 } else {
628 None
629 }
630 })
631 }
632
633 pub fn get_export_description(
641 &self,
642 id: &GlobalId,
643 ) -> Option<StorageSinkDesc<CollectionMetadata, T>> {
644 if !self.active_exports.contains_key(id) {
645 return None;
646 }
647
648 self.history.iter().rev().find_map(|command| {
649 if let StorageCommand::RunSink(sink) = command {
650 if &sink.id == id {
651 Some(sink.description.clone())
652 } else {
653 None
654 }
655 } else {
656 None
657 }
658 })
659 }
660
661 fn absorb_compaction(&mut self, id: GlobalId, frontier: Antichain<T>) {
663 tracing::debug!(?self.active_ingestions, ?id, ?frontier, "allow_compaction");
664
665 if frontier.is_empty() {
666 self.active_ingestions.remove(&id);
667 self.ingestion_exports.remove(&id);
668 self.active_exports.remove(&id);
669 }
670 }
671
672 fn active_replicas(&mut self, id: &GlobalId) -> Box<dyn Iterator<Item = &mut Replica<T>> + '_> {
674 if let Some(ingestion_id) = self.ingestion_exports.get(id) {
675 match self.active_ingestions.get(ingestion_id) {
676 Some(ingestion) => Box::new(self.replicas.iter_mut().filter_map(
677 move |(replica_id, replica)| {
678 if ingestion.active_replicas.contains(replica_id) {
679 Some(replica)
680 } else {
681 None
682 }
683 },
684 )),
685 None => {
686 Box::new(std::iter::empty())
688 }
689 }
690 } else if let Some(export) = self.active_exports.get(id) {
691 Box::new(
692 self.replicas
693 .iter_mut()
694 .filter_map(move |(replica_id, replica)| {
695 if export.active_replicas.contains(replica_id) {
696 Some(replica)
697 } else {
698 None
699 }
700 }),
701 )
702 } else {
703 Box::new(self.replicas.values_mut())
704 }
705 }
706
707 fn is_active_replica(&self, id: &GlobalId, replica_id: &ReplicaId) -> bool {
709 if let Some(ingestion_id) = self.ingestion_exports.get(id) {
710 match self.active_ingestions.get(ingestion_id) {
711 Some(ingestion) => ingestion.active_replicas.contains(replica_id),
712 None => {
713 false
715 }
716 }
717 } else if let Some(export) = self.active_exports.get(id) {
718 export.active_replicas.contains(replica_id)
719 } else {
720 true
722 }
723 }
724
725 pub(super) fn refresh_state_metrics(&self) {
734 let connected_replica_count = self.replicas.values().filter(|r| r.is_connected()).count();
735
736 self.metrics
737 .connected_replica_count
738 .set(u64::cast_from(connected_replica_count));
739 }
740
741 pub fn get_active_replicas_for_object(&self, id: &GlobalId) -> BTreeSet<ReplicaId> {
744 if let Some(ingestion_id) = self.ingestion_exports.get(id) {
745 match self.active_ingestions.get(ingestion_id) {
747 Some(ingestion) => ingestion.active_replicas.clone(),
748 None => {
749 BTreeSet::new()
751 }
752 }
753 } else {
754 self.replicas.keys().copied().collect()
756 }
757 }
758}
759
760#[derive(Clone, Debug)]
762pub(super) struct ReplicaConfig {
763 pub build_info: &'static BuildInfo,
764 pub location: ClusterReplicaLocation,
765 pub grpc_client: GrpcClientParameters,
766}
767
768#[derive(Debug)]
770pub struct Replica<T> {
771 config: ReplicaConfig,
773 command_tx: mpsc::UnboundedSender<StorageCommand<T>>,
778 task: AbortOnDropHandle<()>,
780 connected: Arc<AtomicBool>,
782}
783
784impl<T> Replica<T>
785where
786 T: Timestamp + Lattice,
787 StorageGrpcClient: StorageClient<T>,
788{
789 fn new(
791 id: ReplicaId,
792 config: ReplicaConfig,
793 epoch: ClusterStartupEpoch,
794 metrics: ReplicaMetrics,
795 dyncfg: Arc<ConfigSet>,
796 response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
797 ) -> Self {
798 let (command_tx, command_rx) = mpsc::unbounded_channel();
799 let connected = Arc::new(AtomicBool::new(false));
800
801 let task = mz_ore::task::spawn(
802 || "storage-replica-{id}",
803 ReplicaTask {
804 replica_id: id,
805 config: config.clone(),
806 epoch,
807 metrics: metrics.clone(),
808 connected: Arc::clone(&connected),
809 command_rx,
810 response_tx,
811 dyncfg,
812 }
813 .run(),
814 );
815
816 Self {
817 config,
818 command_tx,
819 task: task.abort_on_drop(),
820 connected,
821 }
822 }
823
824 fn send(&self, command: StorageCommand<T>) {
826 let _ = self.command_tx.send(command);
828 }
829
830 fn failed(&self) -> bool {
833 self.task.is_finished()
834 }
835
836 pub(super) fn is_connected(&self) -> bool {
838 self.connected.load(atomic::Ordering::Relaxed)
839 }
840}
841
842type ReplicaClient<T> = Partitioned<StorageGrpcClient, StorageCommand<T>, StorageResponse<T>>;
843
844struct ReplicaTask<T> {
846 replica_id: ReplicaId,
848 config: ReplicaConfig,
850 epoch: ClusterStartupEpoch,
852 metrics: ReplicaMetrics,
854 connected: Arc<AtomicBool>,
856 command_rx: mpsc::UnboundedReceiver<StorageCommand<T>>,
858 response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
860 dyncfg: Arc<ConfigSet>,
862}
863
864impl<T> ReplicaTask<T>
865where
866 T: Timestamp + Lattice,
867 StorageGrpcClient: StorageClient<T>,
868{
869 async fn run(self) {
871 let replica_id = self.replica_id;
872 info!(%replica_id, "starting replica task");
873
874 let client = self.connect().await;
875 match self.run_message_loop(client).await {
876 Ok(()) => info!(%replica_id, "stopped replica task"),
877 Err(error) => warn!(%replica_id, %error, "replica task failed"),
878 }
879 }
880
881 async fn connect(&self) -> ReplicaClient<T> {
886 let try_connect = |retry: RetryState| {
887 let addrs = &self.config.location.ctl_addrs;
888 let dests = addrs
889 .iter()
890 .map(|addr| (addr.clone(), self.metrics.clone()))
891 .collect();
892 let version = self.config.build_info.semver_version();
893 let client_params = &self.config.grpc_client;
894
895 async move {
896 let connect_start = Instant::now();
897 let connect_result =
898 StorageGrpcClient::connect_partitioned(dests, version, client_params).await;
899 self.metrics.observe_connect_time(connect_start.elapsed());
900
901 connect_result.inspect_err(|error| {
902 let next_backoff = retry.next_backoff.unwrap();
903 if retry.i >= mz_service::retry::INFO_MIN_RETRIES {
904 info!(
905 replica_id = %self.replica_id, ?next_backoff,
906 "error connecting to replica: {error:#}",
907 );
908 } else {
909 debug!(
910 replica_id = %self.replica_id, ?next_backoff,
911 "error connecting to replica: {error:#}",
912 );
913 }
914 })
915 }
916 };
917
918 let client = Retry::default()
919 .clamp_backoff(Duration::from_secs(1))
920 .retry_async(try_connect)
921 .await
922 .expect("retries forever");
923
924 self.metrics.observe_connect();
925 self.connected.store(true, atomic::Ordering::Relaxed);
926
927 client
928 }
929
930 async fn run_message_loop(mut self, mut client: ReplicaClient<T>) -> Result<(), anyhow::Error> {
936 loop {
937 select! {
938 command = self.command_rx.recv() => {
941 let Some(mut command) = command else {
942 tracing::debug!(%self.replica_id, "controller is no longer interested in this replica, shutting down message loop");
943 break;
944 };
945
946 self.specialize_command(&mut command);
947 client.send(command).await?;
948 },
949 response = client.recv() => {
952 let Some(response) = response? else {
953 bail!("replica unexpectedly gracefully terminated connection");
954 };
955
956 if self.response_tx.send((Some(self.replica_id), response)).is_err() {
957 tracing::debug!(%self.replica_id, "controller (receiver) is no longer interested in this replica, shutting down message loop");
958 break;
959 }
960 }
961 }
962 }
963
964 Ok(())
965 }
966
967 fn specialize_command(&self, command: &mut StorageCommand<T>) {
972 if let StorageCommand::CreateTimely { config, epoch } = command {
973 **config = TimelyConfig {
974 workers: self.config.location.workers,
975 process: 0,
977 addresses: self.config.location.dataflow_addrs.clone(),
978 arrangement_exert_proportionality: 1337,
981 enable_zero_copy: false,
984 enable_zero_copy_lgalloc: false,
986 zero_copy_limit: None,
988 enable_create_sockets_v2: ENABLE_CREATE_SOCKETS_V2.get(&self.dyncfg),
989 };
990 *epoch = self.epoch;
991 }
992 }
993}