1use crate::CollectionMetadata;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::atomic::AtomicBool;
15use std::sync::{Arc, atomic};
16use std::time::{Duration, Instant};
17
18use anyhow::bail;
19use itertools::Itertools;
20use mz_build_info::BuildInfo;
21use mz_cluster_client::ReplicaId;
22use mz_cluster_client::client::ClusterReplicaLocation;
23use mz_ore::cast::CastFrom;
24use mz_ore::now::NowFn;
25use mz_ore::retry::{Retry, RetryState};
26use mz_ore::task::AbortOnDropHandle;
27use mz_repr::{GlobalId, Timestamp};
28use mz_service::client::{GenericClient, Partitioned};
29use mz_service::params::GrpcClientParameters;
30use mz_service::transport;
31use mz_storage_client::client::{
32 RunIngestionCommand, RunSinkCommand, Status, StatusUpdate, StorageCommand, StorageResponse,
33};
34use mz_storage_client::metrics::{InstanceMetrics, ReplicaMetrics};
35use mz_storage_types::sinks::StorageSinkDesc;
36use mz_storage_types::sources::{IngestionDescription, SourceConnection};
37use timely::progress::Antichain;
38use tokio::select;
39use tokio::sync::mpsc;
40use tracing::{debug, info, warn};
41use uuid::Uuid;
42
43use crate::history::CommandHistory;
44
45#[derive(Debug)]
55pub(crate) struct Instance {
56 pub workload_class: Option<String>,
60 replicas: BTreeMap<ReplicaId, Replica>,
62 active_ingestions: BTreeMap<GlobalId, ActiveIngestion>,
68 ingestion_exports: BTreeMap<GlobalId, GlobalId>,
70 active_exports: BTreeMap<GlobalId, ActiveExport>,
76 history: CommandHistory,
79 metrics: InstanceMetrics,
81 now: NowFn,
83 response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse)>,
89}
90
91#[derive(Debug)]
92struct ActiveIngestion {
93 active_replicas: BTreeSet<ReplicaId>,
95}
96
97#[derive(Debug)]
98struct ActiveExport {
99 active_replicas: BTreeSet<ReplicaId>,
101}
102
103impl Instance {
104 pub fn new(
106 workload_class: Option<String>,
107 metrics: InstanceMetrics,
108 now: NowFn,
109 instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse)>,
110 ) -> Self {
111 let history = CommandHistory::new(metrics.for_history());
112
113 let mut instance = Self {
114 workload_class,
115 replicas: Default::default(),
116 active_ingestions: Default::default(),
117 ingestion_exports: Default::default(),
118 active_exports: BTreeMap::new(),
119 history,
120 metrics,
121 now,
122 response_tx: instance_response_tx,
123 };
124
125 instance.send(StorageCommand::Hello {
126 nonce: Default::default(),
129 });
130
131 instance
132 }
133
134 pub fn replica_ids(&self) -> impl Iterator<Item = ReplicaId> + '_ {
136 self.replicas.keys().copied()
137 }
138
139 pub fn add_replica(&mut self, id: ReplicaId, config: ReplicaConfig) {
141 self.history.reduce();
144
145 let metrics = self.metrics.for_replica(id);
146 let replica = Replica::new(id, config, metrics, self.response_tx.clone());
147
148 self.replicas.insert(id, replica);
149
150 self.update_scheduling(false);
151
152 self.replay_commands(id);
153 }
154
155 pub fn replay_commands(&mut self, replica_id: ReplicaId) {
157 let commands = self.history.iter().cloned();
158
159 let filtered_commands = commands
160 .filter_map(|command| match command {
161 StorageCommand::RunIngestion(ingestion) => {
162 if self.is_active_replica(&ingestion.id, &replica_id) {
163 Some(StorageCommand::RunIngestion(ingestion))
164 } else {
165 None
166 }
167 }
168 StorageCommand::RunSink(sink) => {
169 if self.is_active_replica(&sink.id, &replica_id) {
170 Some(StorageCommand::RunSink(sink))
171 } else {
172 None
173 }
174 }
175 StorageCommand::AllowCompaction(id, upper) => {
176 if self.is_active_replica(&id, &replica_id) {
177 Some(StorageCommand::AllowCompaction(id, upper))
178 } else {
179 None
180 }
181 }
182 command => Some(command),
183 })
184 .collect::<Vec<_>>();
185
186 let replica = self
187 .replicas
188 .get_mut(&replica_id)
189 .expect("replica must exist");
190
191 for command in filtered_commands {
193 replica.send(command);
194 }
195 }
196
197 pub fn drop_replica(&mut self, id: ReplicaId) {
199 let replica = self.replicas.remove(&id);
200
201 let mut needs_rescheduling = false;
202 for (ingestion_id, ingestion) in self.active_ingestions.iter_mut() {
203 let was_running = ingestion.active_replicas.remove(&id);
204 if was_running {
205 tracing::debug!(
206 %ingestion_id,
207 replica_id = %id,
208 "ingestion was running on dropped replica, updating scheduling decisions"
209 );
210 needs_rescheduling = true;
211 }
212 }
213 for (export_id, export) in self.active_exports.iter_mut() {
214 let was_running = export.active_replicas.remove(&id);
215 if was_running {
216 tracing::debug!(
217 %export_id,
218 replica_id = %id,
219 "export was running on dropped replica, updating scheduling decisions"
220 );
221 needs_rescheduling = true;
222 }
223 }
224
225 tracing::info!(%id, %needs_rescheduling, "dropped replica");
226
227 if needs_rescheduling {
228 self.update_scheduling(true);
229 }
230
231 if replica.is_some() && self.replicas.is_empty() {
232 self.update_paused_statuses();
233 }
234 }
235
236 pub fn rehydrate_failed_replicas(&mut self) {
238 let replicas = self.replicas.iter();
239 let failed_replicas: Vec<_> = replicas
240 .filter_map(|(id, replica)| replica.failed().then_some(*id))
241 .collect();
242
243 for id in failed_replicas {
244 let replica = self.replicas.remove(&id).expect("must exist");
245 self.add_replica(id, replica.config);
246 }
247 }
248
249 pub fn active_ingestions(&self) -> impl Iterator<Item = &GlobalId> {
252 self.active_ingestions.keys()
253 }
254
255 pub fn active_ingestion_exports(&self) -> impl Iterator<Item = &GlobalId> {
263 let ingestion_exports = self.ingestion_exports.keys();
264 self.active_ingestions.keys().chain(ingestion_exports)
265 }
266
267 pub fn active_exports(&self) -> impl Iterator<Item = &GlobalId> {
269 self.active_exports.keys()
270 }
271
272 fn update_paused_statuses(&mut self) {
274 let now = mz_ore::now::to_datetime((self.now)());
275 let make_update = |id, object_type| StatusUpdate {
276 id,
277 status: Status::Paused,
278 timestamp: now,
279 error: None,
280 hints: BTreeSet::from([format!(
281 "There is currently no replica running this {object_type}"
282 )]),
283 namespaced_errors: Default::default(),
284 replica_id: None,
285 };
286
287 self.history.reduce();
288
289 let mut status_updates = Vec::new();
290 for command in self.history.iter() {
291 match command {
292 StorageCommand::RunIngestion(ingestion) => {
293 let old_style_ingestion =
294 ingestion.id != ingestion.description.remap_collection_id;
295 let subsource_ids = ingestion.description.collection_ids().filter(|id| {
296 let should_discard =
301 old_style_ingestion && id == &ingestion.description.remap_collection_id;
302 !should_discard
303 });
304 for id in subsource_ids {
305 status_updates.push(make_update(id, "source"));
306 }
307 }
308 StorageCommand::RunSink(sink) => {
309 status_updates.push(make_update(sink.id, "sink"));
310 }
311 _ => (),
312 }
313 }
314
315 for update in status_updates {
316 let _ = self
320 .response_tx
321 .send((None, StorageResponse::StatusUpdate(update)));
322 }
323 }
324
325 pub fn send(&mut self, command: StorageCommand) {
327 self.history.push(command.clone());
329
330 match command.clone() {
331 StorageCommand::RunIngestion(ingestion) => {
332 self.absorb_ingestion(*ingestion.clone());
336
337 for replica in self.active_replicas(&ingestion.id) {
338 replica.send(StorageCommand::RunIngestion(ingestion.clone()));
339 }
340 }
341 StorageCommand::RunSink(sink) => {
342 self.absorb_export(*sink.clone());
346
347 for replica in self.active_replicas(&sink.id) {
348 replica.send(StorageCommand::RunSink(sink.clone()));
349 }
350 }
351 StorageCommand::AllowCompaction(id, frontier) => {
352 for replica in self.active_replicas(&id) {
355 replica.send(StorageCommand::AllowCompaction(
356 id.clone(),
357 frontier.clone(),
358 ));
359 }
360
361 self.absorb_compaction(id, frontier);
362 }
363 command => {
364 for replica in self.replicas.values_mut() {
365 replica.send(command.clone());
366 }
367 }
368 }
369
370 if command.installs_objects() && self.replicas.is_empty() {
371 self.update_paused_statuses();
372 }
373 }
374
375 fn absorb_ingestion(&mut self, ingestion: RunIngestionCommand) {
380 let existing_ingestion_state = self.active_ingestions.get_mut(&ingestion.id);
381
382 for id in ingestion.description.source_exports.keys() {
384 self.ingestion_exports.insert(id.clone(), ingestion.id);
385 }
386
387 if let Some(ingestion_state) = existing_ingestion_state {
388 tracing::debug!(
393 ingestion_id = %ingestion.id,
394 active_replicas = %ingestion_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
395 "updating ingestion"
396 );
397 } else {
398 let ingestion_state = ActiveIngestion {
400 active_replicas: BTreeSet::new(),
401 };
402 self.active_ingestions.insert(ingestion.id, ingestion_state);
403
404 self.update_scheduling(false);
406 }
407 }
408
409 fn absorb_export(&mut self, export: RunSinkCommand) {
414 let existing_export_state = self.active_exports.get_mut(&export.id);
415
416 if let Some(export_state) = existing_export_state {
417 tracing::debug!(
422 export_id = %export.id,
423 active_replicas = %export_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
424 "updating export"
425 );
426 } else {
427 let export_state = ActiveExport {
429 active_replicas: BTreeSet::new(),
430 };
431 self.active_exports.insert(export.id, export_state);
432
433 self.update_scheduling(false);
435 }
436 }
437
438 fn update_scheduling(&mut self, send_commands: bool) {
457 #[derive(Debug)]
458 enum ObjectId {
459 Ingestion(GlobalId),
460 Export(GlobalId),
461 }
462 let mut scheduling_preferences: Vec<(ObjectId, bool)> = Vec::new();
467
468 for ingestion_id in self.active_ingestions.keys() {
469 let ingestion_description = self
470 .get_ingestion_description(ingestion_id)
471 .expect("missing ingestion description");
472
473 let prefers_single_replica = ingestion_description
474 .desc
475 .connection
476 .prefers_single_replica();
477
478 scheduling_preferences
479 .push((ObjectId::Ingestion(*ingestion_id), prefers_single_replica));
480 }
481
482 for export_id in self.active_exports.keys() {
483 scheduling_preferences.push((ObjectId::Export(*export_id), true));
485 }
486
487 let mut commands_by_replica: BTreeMap<ReplicaId, Vec<ObjectId>> = BTreeMap::new();
489
490 for (object_id, prefers_single_replica) in scheduling_preferences {
491 let active_replicas = match object_id {
492 ObjectId::Ingestion(ingestion_id) => {
493 &mut self
494 .active_ingestions
495 .get_mut(&ingestion_id)
496 .expect("missing ingestion state")
497 .active_replicas
498 }
499 ObjectId::Export(export_id) => {
500 &mut self
501 .active_exports
502 .get_mut(&export_id)
503 .expect("missing ingestion state")
504 .active_replicas
505 }
506 };
507
508 if prefers_single_replica {
509 if active_replicas.is_empty() {
511 let target_replica = self.replicas.keys().min().copied();
512 if let Some(first_replica_id) = target_replica {
513 tracing::info!(
514 object_id = ?object_id,
515 replica_id = %first_replica_id,
516 "scheduling single-replica object");
517 active_replicas.insert(first_replica_id);
518
519 commands_by_replica
520 .entry(first_replica_id)
521 .or_default()
522 .push(object_id);
523 }
524 } else {
525 tracing::info!(
526 ?object_id,
527 active_replicas = %active_replicas.iter().map(|id| id.to_string()).join(", "),
528 "single-replica object already running, not scheduling again",
529 );
530 }
531 } else {
532 let current_replica_ids: BTreeSet<_> = self.replicas.keys().copied().collect();
533 let unscheduled_replicas: Vec<_> = current_replica_ids
534 .difference(active_replicas)
535 .copied()
536 .collect();
537 for replica_id in unscheduled_replicas {
538 tracing::info!(
539 ?object_id,
540 %replica_id,
541 "scheduling multi-replica object"
542 );
543 active_replicas.insert(replica_id);
544 }
545 }
546 }
547
548 if send_commands {
549 for (replica_id, object_ids) in commands_by_replica {
550 let mut ingestion_commands = vec![];
551 let mut export_commands = vec![];
552 for object_id in object_ids {
553 match object_id {
554 ObjectId::Ingestion(id) => {
555 ingestion_commands.push(RunIngestionCommand {
556 id,
557 description: self
558 .get_ingestion_description(&id)
559 .expect("missing ingestion description")
560 .clone(),
561 });
562 }
563 ObjectId::Export(id) => {
564 export_commands.push(RunSinkCommand {
565 id,
566 description: self
567 .get_export_description(&id)
568 .expect("missing export description")
569 .clone(),
570 });
571 }
572 }
573 }
574 for ingestion in ingestion_commands {
575 let replica = self.replicas.get_mut(&replica_id).expect("missing replica");
576 let ingestion = Box::new(ingestion);
577 replica.send(StorageCommand::RunIngestion(ingestion));
578 }
579 for export in export_commands {
580 let replica = self.replicas.get_mut(&replica_id).expect("missing replica");
581 let export = Box::new(export);
582 replica.send(StorageCommand::RunSink(export));
583 }
584 }
585 }
586 }
587
588 pub fn get_ingestion_description(
596 &self,
597 id: &GlobalId,
598 ) -> Option<IngestionDescription<CollectionMetadata>> {
599 if !self.active_ingestions.contains_key(id) {
600 return None;
601 }
602
603 self.history.iter().rev().find_map(|command| {
604 if let StorageCommand::RunIngestion(ingestion) = command {
605 if &ingestion.id == id {
606 Some(ingestion.description.clone())
607 } else {
608 None
609 }
610 } else {
611 None
612 }
613 })
614 }
615
616 pub fn get_export_description(
624 &self,
625 id: &GlobalId,
626 ) -> Option<StorageSinkDesc<CollectionMetadata>> {
627 if !self.active_exports.contains_key(id) {
628 return None;
629 }
630
631 self.history.iter().rev().find_map(|command| {
632 if let StorageCommand::RunSink(sink) = command {
633 if &sink.id == id {
634 Some(sink.description.clone())
635 } else {
636 None
637 }
638 } else {
639 None
640 }
641 })
642 }
643
644 fn absorb_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
646 tracing::debug!(?self.active_ingestions, ?id, ?frontier, "allow_compaction");
647
648 if frontier.is_empty() {
649 self.active_ingestions.remove(&id);
650 self.ingestion_exports.remove(&id);
651 self.active_exports.remove(&id);
652 }
653 }
654
655 fn active_replicas(&mut self, id: &GlobalId) -> Box<dyn Iterator<Item = &mut Replica> + '_> {
657 if let Some(ingestion_id) = self.ingestion_exports.get(id) {
658 match self.active_ingestions.get(ingestion_id) {
659 Some(ingestion) => Box::new(self.replicas.iter_mut().filter_map(
660 move |(replica_id, replica)| {
661 if ingestion.active_replicas.contains(replica_id) {
662 Some(replica)
663 } else {
664 None
665 }
666 },
667 )),
668 None => {
669 Box::new(std::iter::empty())
671 }
672 }
673 } else if let Some(ingestion) = self.active_ingestions.get(id) {
674 Box::new(
675 self.replicas
676 .iter_mut()
677 .filter_map(move |(replica_id, replica)| {
678 if ingestion.active_replicas.contains(replica_id) {
679 Some(replica)
680 } else {
681 None
682 }
683 }),
684 )
685 } else if let Some(export) = self.active_exports.get(id) {
686 Box::new(
687 self.replicas
688 .iter_mut()
689 .filter_map(move |(replica_id, replica)| {
690 if export.active_replicas.contains(replica_id) {
691 Some(replica)
692 } else {
693 None
694 }
695 }),
696 )
697 } else {
698 Box::new(self.replicas.values_mut())
699 }
700 }
701
702 fn is_active_replica(&self, id: &GlobalId, replica_id: &ReplicaId) -> bool {
704 if let Some(ingestion_id) = self.ingestion_exports.get(id) {
705 match self.active_ingestions.get(ingestion_id) {
706 Some(ingestion) => ingestion.active_replicas.contains(replica_id),
707 None => {
708 false
710 }
711 }
712 } else if let Some(ingestion) = self.active_ingestions.get(id) {
713 ingestion.active_replicas.contains(replica_id)
714 } else if let Some(export) = self.active_exports.get(id) {
715 export.active_replicas.contains(replica_id)
716 } else {
717 true
719 }
720 }
721
722 pub(super) fn refresh_state_metrics(&self) {
731 let connected_replica_count = self.replicas.values().filter(|r| r.is_connected()).count();
732
733 self.metrics
734 .connected_replica_count
735 .set(u64::cast_from(connected_replica_count));
736 }
737
738 pub fn get_active_replicas_for_object(&self, id: &GlobalId) -> BTreeSet<ReplicaId> {
741 if let Some(ingestion_id) = self.ingestion_exports.get(id) {
742 match self.active_ingestions.get(ingestion_id) {
744 Some(ingestion) => ingestion.active_replicas.clone(),
745 None => {
746 BTreeSet::new()
748 }
749 }
750 } else {
751 self.replicas.keys().copied().collect()
753 }
754 }
755}
756
757#[derive(Clone, Debug)]
759pub(super) struct ReplicaConfig {
760 pub build_info: &'static BuildInfo,
761 pub location: ClusterReplicaLocation,
762 pub grpc_client: GrpcClientParameters,
763}
764
765#[derive(Debug)]
767pub struct Replica {
768 config: ReplicaConfig,
770 command_tx: mpsc::UnboundedSender<StorageCommand>,
775 task: AbortOnDropHandle<()>,
777 connected: Arc<AtomicBool>,
779}
780
781impl Replica {
782 fn new(
784 id: ReplicaId,
785 config: ReplicaConfig,
786 metrics: ReplicaMetrics,
787 response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse)>,
788 ) -> Self {
789 let (command_tx, command_rx) = mpsc::unbounded_channel();
790 let connected = Arc::new(AtomicBool::new(false));
791
792 let task = mz_ore::task::spawn(
793 || "storage-replica-{id}",
794 ReplicaTask {
795 replica_id: id,
796 config: config.clone(),
797 metrics: metrics.clone(),
798 connected: Arc::clone(&connected),
799 command_rx,
800 response_tx,
801 }
802 .run(),
803 );
804
805 Self {
806 config,
807 command_tx,
808 task: task.abort_on_drop(),
809 connected,
810 }
811 }
812
813 fn send(&self, command: StorageCommand) {
815 let _ = self.command_tx.send(command);
817 }
818
819 fn failed(&self) -> bool {
822 self.task.is_finished()
823 }
824
825 pub(super) fn is_connected(&self) -> bool {
827 self.connected.load(atomic::Ordering::Relaxed)
828 }
829}
830
831type StorageCtpClient = transport::Client<StorageCommand, StorageResponse>;
832type ReplicaClient = Partitioned<StorageCtpClient, StorageCommand, StorageResponse>;
833
834struct ReplicaTask {
836 replica_id: ReplicaId,
838 config: ReplicaConfig,
840 metrics: ReplicaMetrics,
842 connected: Arc<AtomicBool>,
844 command_rx: mpsc::UnboundedReceiver<StorageCommand>,
846 response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse)>,
848}
849
850impl ReplicaTask {
851 async fn run(self) {
853 let replica_id = self.replica_id;
854 info!(%replica_id, "starting replica task");
855
856 let client = self.connect().await;
857 match self.run_message_loop(client).await {
858 Ok(()) => info!(%replica_id, "stopped replica task"),
859 Err(error) => warn!(%replica_id, %error, "replica task failed"),
860 }
861 }
862
863 async fn connect(&self) -> ReplicaClient {
868 let try_connect = async move |retry: RetryState| {
869 let version = self.config.build_info.semver_version();
870 let client_params = &self.config.grpc_client;
871
872 let connect_start = Instant::now();
873 let connect_timeout = client_params.connect_timeout.unwrap_or(Duration::MAX);
874 let keepalive_timeout = client_params
875 .http2_keep_alive_timeout
876 .unwrap_or(Duration::MAX);
877
878 let connect_result = StorageCtpClient::connect_partitioned(
879 self.config.location.ctl_addrs.clone(),
880 version,
881 connect_timeout,
882 keepalive_timeout,
883 self.metrics.clone(),
884 )
885 .await;
886
887 self.metrics.observe_connect_time(connect_start.elapsed());
888
889 connect_result.inspect_err(|error| {
890 let next_backoff = retry.next_backoff.unwrap();
891 if retry.i >= mz_service::retry::INFO_MIN_RETRIES {
892 info!(
893 replica_id = %self.replica_id, ?next_backoff,
894 "error connecting to replica: {error:#}",
895 );
896 } else {
897 debug!(
898 replica_id = %self.replica_id, ?next_backoff,
899 "error connecting to replica: {error:#}",
900 );
901 }
902 })
903 };
904
905 let client = Retry::default()
906 .clamp_backoff(Duration::from_secs(1))
907 .retry_async(try_connect)
908 .await
909 .expect("retries forever");
910
911 self.metrics.observe_connect();
912 self.connected.store(true, atomic::Ordering::Relaxed);
913
914 client
915 }
916
917 async fn run_message_loop(mut self, mut client: ReplicaClient) -> Result<(), anyhow::Error> {
923 loop {
924 select! {
925 command = self.command_rx.recv() => {
928 let Some(mut command) = command else {
929 tracing::debug!(%self.replica_id, "controller is no longer interested in this replica, shutting down message loop");
930 break;
931 };
932
933 self.specialize_command(&mut command);
934 client.send(command).await?;
935 },
936 response = client.recv() => {
939 let Some(response) = response? else {
940 bail!("replica unexpectedly gracefully terminated connection");
941 };
942
943 if self.response_tx.send((Some(self.replica_id), response)).is_err() {
944 tracing::debug!(%self.replica_id, "controller (receiver) is no longer interested in this replica, shutting down message loop");
945 break;
946 }
947 }
948 }
949 }
950
951 Ok(())
952 }
953
954 fn specialize_command(&self, command: &mut StorageCommand) {
959 if let StorageCommand::Hello { nonce } = command {
960 *nonce = Uuid::new_v4();
961 }
962 }
963}