1use crate::CollectionMetadata;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::atomic::AtomicBool;
15use std::sync::{Arc, atomic};
16use std::time::{Duration, Instant};
17
18use anyhow::bail;
19use differential_dataflow::lattice::Lattice;
20use itertools::Itertools;
21use mz_build_info::BuildInfo;
22use mz_cluster_client::ReplicaId;
23use mz_cluster_client::client::ClusterReplicaLocation;
24use mz_ore::cast::CastFrom;
25use mz_ore::now::NowFn;
26use mz_ore::retry::{Retry, RetryState};
27use mz_ore::task::AbortOnDropHandle;
28use mz_repr::GlobalId;
29use mz_service::client::{GenericClient, Partitioned};
30use mz_service::params::GrpcClientParameters;
31use mz_service::transport;
32use mz_storage_client::client::{
33 RunIngestionCommand, RunSinkCommand, Status, StatusUpdate, StorageCommand, StorageResponse,
34};
35use mz_storage_client::metrics::{InstanceMetrics, ReplicaMetrics};
36use mz_storage_types::sinks::StorageSinkDesc;
37use mz_storage_types::sources::{IngestionDescription, SourceConnection};
38use timely::order::TotalOrder;
39use timely::progress::{Antichain, Timestamp};
40use tokio::select;
41use tokio::sync::mpsc;
42use tracing::{debug, info, warn};
43use uuid::Uuid;
44
45use crate::history::CommandHistory;
46
47#[derive(Debug)]
57pub(crate) struct Instance<T> {
58 pub workload_class: Option<String>,
62 replicas: BTreeMap<ReplicaId, Replica<T>>,
64 active_ingestions: BTreeMap<GlobalId, ActiveIngestion>,
70 ingestion_exports: BTreeMap<GlobalId, GlobalId>,
72 active_exports: BTreeMap<GlobalId, ActiveExport>,
78 history: CommandHistory<T>,
81 metrics: InstanceMetrics,
83 now: NowFn,
85 response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
91}
92
93#[derive(Debug)]
94struct ActiveIngestion {
95 active_replicas: BTreeSet<ReplicaId>,
97}
98
99#[derive(Debug)]
100struct ActiveExport {
101 active_replicas: BTreeSet<ReplicaId>,
103}
104
105impl<T> Instance<T>
106where
107 T: Timestamp + Lattice + TotalOrder + Sync,
108{
109 pub fn new(
111 workload_class: Option<String>,
112 metrics: InstanceMetrics,
113 now: NowFn,
114 instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
115 ) -> Self {
116 let history = CommandHistory::new(metrics.for_history());
117
118 let mut instance = Self {
119 workload_class,
120 replicas: Default::default(),
121 active_ingestions: Default::default(),
122 ingestion_exports: Default::default(),
123 active_exports: BTreeMap::new(),
124 history,
125 metrics,
126 now,
127 response_tx: instance_response_tx,
128 };
129
130 instance.send(StorageCommand::Hello {
131 nonce: Default::default(),
134 });
135
136 instance
137 }
138
139 pub fn replica_ids(&self) -> impl Iterator<Item = ReplicaId> + '_ {
141 self.replicas.keys().copied()
142 }
143
144 pub fn add_replica(&mut self, id: ReplicaId, config: ReplicaConfig) {
146 self.history.reduce();
149
150 let metrics = self.metrics.for_replica(id);
151 let replica = Replica::new(id, config, metrics, self.response_tx.clone());
152
153 self.replicas.insert(id, replica);
154
155 self.update_scheduling(false);
156
157 self.replay_commands(id);
158 }
159
160 pub fn replay_commands(&mut self, replica_id: ReplicaId) {
162 let commands = self.history.iter().cloned();
163
164 let filtered_commands = commands
165 .filter_map(|command| match command {
166 StorageCommand::RunIngestion(ingestion) => {
167 if self.is_active_replica(&ingestion.id, &replica_id) {
168 Some(StorageCommand::RunIngestion(ingestion))
169 } else {
170 None
171 }
172 }
173 StorageCommand::RunSink(sink) => {
174 if self.is_active_replica(&sink.id, &replica_id) {
175 Some(StorageCommand::RunSink(sink))
176 } else {
177 None
178 }
179 }
180 StorageCommand::AllowCompaction(id, upper) => {
181 if self.is_active_replica(&id, &replica_id) {
182 Some(StorageCommand::AllowCompaction(id, upper))
183 } else {
184 None
185 }
186 }
187 command => Some(command),
188 })
189 .collect::<Vec<_>>();
190
191 let replica = self
192 .replicas
193 .get_mut(&replica_id)
194 .expect("replica must exist");
195
196 for command in filtered_commands {
198 replica.send(command);
199 }
200 }
201
202 pub fn drop_replica(&mut self, id: ReplicaId) {
204 let replica = self.replicas.remove(&id);
205
206 let mut needs_rescheduling = false;
207 for (ingestion_id, ingestion) in self.active_ingestions.iter_mut() {
208 let was_running = ingestion.active_replicas.remove(&id);
209 if was_running {
210 tracing::debug!(
211 %ingestion_id,
212 replica_id = %id,
213 "ingestion was running on dropped replica, updating scheduling decisions"
214 );
215 needs_rescheduling = true;
216 }
217 }
218 for (export_id, export) in self.active_exports.iter_mut() {
219 let was_running = export.active_replicas.remove(&id);
220 if was_running {
221 tracing::debug!(
222 %export_id,
223 replica_id = %id,
224 "export was running on dropped replica, updating scheduling decisions"
225 );
226 needs_rescheduling = true;
227 }
228 }
229
230 tracing::info!(%id, %needs_rescheduling, "dropped replica");
231
232 if needs_rescheduling {
233 self.update_scheduling(true);
234 }
235
236 if replica.is_some() && self.replicas.is_empty() {
237 self.update_paused_statuses();
238 }
239 }
240
241 pub fn rehydrate_failed_replicas(&mut self) {
243 let replicas = self.replicas.iter();
244 let failed_replicas: Vec<_> = replicas
245 .filter_map(|(id, replica)| replica.failed().then_some(*id))
246 .collect();
247
248 for id in failed_replicas {
249 let replica = self.replicas.remove(&id).expect("must exist");
250 self.add_replica(id, replica.config);
251 }
252 }
253
254 pub fn active_ingestions(&self) -> impl Iterator<Item = &GlobalId> {
257 self.active_ingestions.keys()
258 }
259
260 pub fn active_ingestion_exports(&self) -> impl Iterator<Item = &GlobalId> {
268 let ingestion_exports = self.ingestion_exports.keys();
269 self.active_ingestions.keys().chain(ingestion_exports)
270 }
271
272 pub fn active_exports(&self) -> impl Iterator<Item = &GlobalId> {
274 self.active_exports.keys()
275 }
276
277 fn update_paused_statuses(&mut self) {
279 let now = mz_ore::now::to_datetime((self.now)());
280 let make_update = |id, object_type| StatusUpdate {
281 id,
282 status: Status::Paused,
283 timestamp: now,
284 error: None,
285 hints: BTreeSet::from([format!(
286 "There is currently no replica running this {object_type}"
287 )]),
288 namespaced_errors: Default::default(),
289 replica_id: None,
290 };
291
292 self.history.reduce();
293
294 let mut status_updates = Vec::new();
295 for command in self.history.iter() {
296 match command {
297 StorageCommand::RunIngestion(ingestion) => {
298 let subsource_ids = ingestion
308 .description
309 .collection_ids()
310 .filter(|id| id != &ingestion.description.remap_collection_id);
311 for id in subsource_ids {
312 status_updates.push(make_update(id, "source"));
313 }
314 }
315 StorageCommand::RunSink(sink) => {
316 status_updates.push(make_update(sink.id, "sink"));
317 }
318 _ => (),
319 }
320 }
321
322 for update in status_updates {
323 let _ = self
327 .response_tx
328 .send((None, StorageResponse::StatusUpdate(update)));
329 }
330 }
331
332 pub fn send(&mut self, command: StorageCommand<T>) {
334 self.history.push(command.clone());
336
337 match command.clone() {
338 StorageCommand::RunIngestion(ingestion) => {
339 self.absorb_ingestion(*ingestion.clone());
343
344 for replica in self.active_replicas(&ingestion.id) {
345 replica.send(StorageCommand::RunIngestion(ingestion.clone()));
346 }
347 }
348 StorageCommand::RunSink(sink) => {
349 self.absorb_export(*sink.clone());
353
354 for replica in self.active_replicas(&sink.id) {
355 replica.send(StorageCommand::RunSink(sink.clone()));
356 }
357 }
358 StorageCommand::AllowCompaction(id, frontier) => {
359 for replica in self.active_replicas(&id) {
362 replica.send(StorageCommand::AllowCompaction(
363 id.clone(),
364 frontier.clone(),
365 ));
366 }
367
368 self.absorb_compaction(id, frontier);
369 }
370 command => {
371 for replica in self.replicas.values_mut() {
372 replica.send(command.clone());
373 }
374 }
375 }
376
377 if command.installs_objects() && self.replicas.is_empty() {
378 self.update_paused_statuses();
379 }
380 }
381
382 fn absorb_ingestion(&mut self, ingestion: RunIngestionCommand) {
387 let existing_ingestion_state = self.active_ingestions.get_mut(&ingestion.id);
388
389 for id in ingestion.description.source_exports.keys() {
391 self.ingestion_exports.insert(id.clone(), ingestion.id);
392 }
393
394 if let Some(ingestion_state) = existing_ingestion_state {
395 tracing::debug!(
400 ingestion_id = %ingestion.id,
401 active_replicas = %ingestion_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
402 "updating ingestion"
403 );
404 } else {
405 let ingestion_state = ActiveIngestion {
407 active_replicas: BTreeSet::new(),
408 };
409 self.active_ingestions.insert(ingestion.id, ingestion_state);
410
411 self.update_scheduling(false);
413 }
414 }
415
416 fn absorb_export(&mut self, export: RunSinkCommand<T>) {
421 let existing_export_state = self.active_exports.get_mut(&export.id);
422
423 if let Some(export_state) = existing_export_state {
424 tracing::debug!(
429 export_id = %export.id,
430 active_replicas = %export_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
431 "updating export"
432 );
433 } else {
434 let export_state = ActiveExport {
436 active_replicas: BTreeSet::new(),
437 };
438 self.active_exports.insert(export.id, export_state);
439
440 self.update_scheduling(false);
442 }
443 }
444
445 fn update_scheduling(&mut self, send_commands: bool) {
464 #[derive(Debug)]
465 enum ObjectId {
466 Ingestion(GlobalId),
467 Export(GlobalId),
468 }
469 let mut scheduling_preferences: Vec<(ObjectId, bool)> = Vec::new();
474
475 for ingestion_id in self.active_ingestions.keys() {
476 let ingestion_description = self
477 .get_ingestion_description(ingestion_id)
478 .expect("missing ingestion description");
479
480 let prefers_single_replica = ingestion_description
481 .desc
482 .connection
483 .prefers_single_replica();
484
485 scheduling_preferences
486 .push((ObjectId::Ingestion(*ingestion_id), prefers_single_replica));
487 }
488
489 for export_id in self.active_exports.keys() {
490 scheduling_preferences.push((ObjectId::Export(*export_id), true));
492 }
493
494 let mut commands_by_replica: BTreeMap<ReplicaId, Vec<ObjectId>> = BTreeMap::new();
496
497 for (object_id, prefers_single_replica) in scheduling_preferences {
498 let active_replicas = match object_id {
499 ObjectId::Ingestion(ingestion_id) => {
500 &mut self
501 .active_ingestions
502 .get_mut(&ingestion_id)
503 .expect("missing ingestion state")
504 .active_replicas
505 }
506 ObjectId::Export(export_id) => {
507 &mut self
508 .active_exports
509 .get_mut(&export_id)
510 .expect("missing ingestion state")
511 .active_replicas
512 }
513 };
514
515 if prefers_single_replica {
516 if active_replicas.is_empty() {
518 let target_replica = self.replicas.keys().min().copied();
519 if let Some(first_replica_id) = target_replica {
520 tracing::info!(
521 object_id = ?object_id,
522 replica_id = %first_replica_id,
523 "scheduling single-replica object");
524 active_replicas.insert(first_replica_id);
525
526 commands_by_replica
527 .entry(first_replica_id)
528 .or_default()
529 .push(object_id);
530 }
531 } else {
532 tracing::info!(
533 ?object_id,
534 active_replicas = %active_replicas.iter().map(|id| id.to_string()).join(", "),
535 "single-replica object already running, not scheduling again",
536 );
537 }
538 } else {
539 let current_replica_ids: BTreeSet<_> = self.replicas.keys().copied().collect();
540 let unscheduled_replicas: Vec<_> = current_replica_ids
541 .difference(active_replicas)
542 .copied()
543 .collect();
544 for replica_id in unscheduled_replicas {
545 tracing::info!(
546 ?object_id,
547 %replica_id,
548 "scheduling multi-replica object"
549 );
550 active_replicas.insert(replica_id);
551 }
552 }
553 }
554
555 if send_commands {
556 for (replica_id, object_ids) in commands_by_replica {
557 let mut ingestion_commands = vec![];
558 let mut export_commands = vec![];
559 for object_id in object_ids {
560 match object_id {
561 ObjectId::Ingestion(id) => {
562 ingestion_commands.push(RunIngestionCommand {
563 id,
564 description: self
565 .get_ingestion_description(&id)
566 .expect("missing ingestion description")
567 .clone(),
568 });
569 }
570 ObjectId::Export(id) => {
571 export_commands.push(RunSinkCommand {
572 id,
573 description: self
574 .get_export_description(&id)
575 .expect("missing export description")
576 .clone(),
577 });
578 }
579 }
580 }
581 for ingestion in ingestion_commands {
582 let replica = self.replicas.get_mut(&replica_id).expect("missing replica");
583 let ingestion = Box::new(ingestion);
584 replica.send(StorageCommand::RunIngestion(ingestion));
585 }
586 for export in export_commands {
587 let replica = self.replicas.get_mut(&replica_id).expect("missing replica");
588 let export = Box::new(export);
589 replica.send(StorageCommand::RunSink(export));
590 }
591 }
592 }
593 }
594
595 pub fn get_ingestion_description(
603 &self,
604 id: &GlobalId,
605 ) -> Option<IngestionDescription<CollectionMetadata>> {
606 if !self.active_ingestions.contains_key(id) {
607 return None;
608 }
609
610 self.history.iter().rev().find_map(|command| {
611 if let StorageCommand::RunIngestion(ingestion) = command {
612 if &ingestion.id == id {
613 Some(ingestion.description.clone())
614 } else {
615 None
616 }
617 } else {
618 None
619 }
620 })
621 }
622
623 pub fn get_export_description(
631 &self,
632 id: &GlobalId,
633 ) -> Option<StorageSinkDesc<CollectionMetadata, T>> {
634 if !self.active_exports.contains_key(id) {
635 return None;
636 }
637
638 self.history.iter().rev().find_map(|command| {
639 if let StorageCommand::RunSink(sink) = command {
640 if &sink.id == id {
641 Some(sink.description.clone())
642 } else {
643 None
644 }
645 } else {
646 None
647 }
648 })
649 }
650
651 fn absorb_compaction(&mut self, id: GlobalId, frontier: Antichain<T>) {
653 tracing::debug!(?self.active_ingestions, ?id, ?frontier, "allow_compaction");
654
655 if frontier.is_empty() {
656 self.active_ingestions.remove(&id);
657 self.ingestion_exports.remove(&id);
658 self.active_exports.remove(&id);
659 }
660 }
661
662 fn active_replicas(&mut self, id: &GlobalId) -> Box<dyn Iterator<Item = &mut Replica<T>> + '_> {
664 if let Some(ingestion_id) = self.ingestion_exports.get(id) {
665 match self.active_ingestions.get(ingestion_id) {
666 Some(ingestion) => Box::new(self.replicas.iter_mut().filter_map(
667 move |(replica_id, replica)| {
668 if ingestion.active_replicas.contains(replica_id) {
669 Some(replica)
670 } else {
671 None
672 }
673 },
674 )),
675 None => {
676 Box::new(std::iter::empty())
678 }
679 }
680 } else if let Some(export) = self.active_exports.get(id) {
681 Box::new(
682 self.replicas
683 .iter_mut()
684 .filter_map(move |(replica_id, replica)| {
685 if export.active_replicas.contains(replica_id) {
686 Some(replica)
687 } else {
688 None
689 }
690 }),
691 )
692 } else {
693 Box::new(self.replicas.values_mut())
694 }
695 }
696
697 fn is_active_replica(&self, id: &GlobalId, replica_id: &ReplicaId) -> bool {
699 if let Some(ingestion_id) = self.ingestion_exports.get(id) {
700 match self.active_ingestions.get(ingestion_id) {
701 Some(ingestion) => ingestion.active_replicas.contains(replica_id),
702 None => {
703 false
705 }
706 }
707 } else if let Some(export) = self.active_exports.get(id) {
708 export.active_replicas.contains(replica_id)
709 } else {
710 true
712 }
713 }
714
715 pub(super) fn refresh_state_metrics(&self) {
724 let connected_replica_count = self.replicas.values().filter(|r| r.is_connected()).count();
725
726 self.metrics
727 .connected_replica_count
728 .set(u64::cast_from(connected_replica_count));
729 }
730
731 pub fn get_active_replicas_for_object(&self, id: &GlobalId) -> BTreeSet<ReplicaId> {
734 if let Some(ingestion_id) = self.ingestion_exports.get(id) {
735 match self.active_ingestions.get(ingestion_id) {
737 Some(ingestion) => ingestion.active_replicas.clone(),
738 None => {
739 BTreeSet::new()
741 }
742 }
743 } else {
744 self.replicas.keys().copied().collect()
746 }
747 }
748}
749
750#[derive(Clone, Debug)]
752pub(super) struct ReplicaConfig {
753 pub build_info: &'static BuildInfo,
754 pub location: ClusterReplicaLocation,
755 pub grpc_client: GrpcClientParameters,
756}
757
758#[derive(Debug)]
760pub struct Replica<T> {
761 config: ReplicaConfig,
763 command_tx: mpsc::UnboundedSender<StorageCommand<T>>,
768 task: AbortOnDropHandle<()>,
770 connected: Arc<AtomicBool>,
772}
773
774impl<T> Replica<T>
775where
776 T: Timestamp + Lattice + Sync,
777{
778 fn new(
780 id: ReplicaId,
781 config: ReplicaConfig,
782 metrics: ReplicaMetrics,
783 response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
784 ) -> Self {
785 let (command_tx, command_rx) = mpsc::unbounded_channel();
786 let connected = Arc::new(AtomicBool::new(false));
787
788 let task = mz_ore::task::spawn(
789 || "storage-replica-{id}",
790 ReplicaTask {
791 replica_id: id,
792 config: config.clone(),
793 metrics: metrics.clone(),
794 connected: Arc::clone(&connected),
795 command_rx,
796 response_tx,
797 }
798 .run(),
799 );
800
801 Self {
802 config,
803 command_tx,
804 task: task.abort_on_drop(),
805 connected,
806 }
807 }
808
809 fn send(&self, command: StorageCommand<T>) {
811 let _ = self.command_tx.send(command);
813 }
814
815 fn failed(&self) -> bool {
818 self.task.is_finished()
819 }
820
821 pub(super) fn is_connected(&self) -> bool {
823 self.connected.load(atomic::Ordering::Relaxed)
824 }
825}
826
827type StorageCtpClient<T> = transport::Client<StorageCommand<T>, StorageResponse<T>>;
828type ReplicaClient<T> = Partitioned<StorageCtpClient<T>, StorageCommand<T>, StorageResponse<T>>;
829
830struct ReplicaTask<T> {
832 replica_id: ReplicaId,
834 config: ReplicaConfig,
836 metrics: ReplicaMetrics,
838 connected: Arc<AtomicBool>,
840 command_rx: mpsc::UnboundedReceiver<StorageCommand<T>>,
842 response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
844}
845
846impl<T> ReplicaTask<T>
847where
848 T: Timestamp + Lattice + Sync,
849{
850 async fn run(self) {
852 let replica_id = self.replica_id;
853 info!(%replica_id, "starting replica task");
854
855 let client = self.connect().await;
856 match self.run_message_loop(client).await {
857 Ok(()) => info!(%replica_id, "stopped replica task"),
858 Err(error) => warn!(%replica_id, %error, "replica task failed"),
859 }
860 }
861
862 async fn connect(&self) -> ReplicaClient<T> {
867 let try_connect = async move |retry: RetryState| {
868 let version = self.config.build_info.semver_version();
869 let client_params = &self.config.grpc_client;
870
871 let connect_start = Instant::now();
872 let connect_timeout = client_params.connect_timeout.unwrap_or(Duration::MAX);
873 let keepalive_timeout = client_params
874 .http2_keep_alive_timeout
875 .unwrap_or(Duration::MAX);
876
877 let connect_result = StorageCtpClient::<T>::connect_partitioned(
878 self.config.location.ctl_addrs.clone(),
879 version,
880 connect_timeout,
881 keepalive_timeout,
882 self.metrics.clone(),
883 )
884 .await;
885
886 self.metrics.observe_connect_time(connect_start.elapsed());
887
888 connect_result.inspect_err(|error| {
889 let next_backoff = retry.next_backoff.unwrap();
890 if retry.i >= mz_service::retry::INFO_MIN_RETRIES {
891 info!(
892 replica_id = %self.replica_id, ?next_backoff,
893 "error connecting to replica: {error:#}",
894 );
895 } else {
896 debug!(
897 replica_id = %self.replica_id, ?next_backoff,
898 "error connecting to replica: {error:#}",
899 );
900 }
901 })
902 };
903
904 let client = Retry::default()
905 .clamp_backoff(Duration::from_secs(1))
906 .retry_async(try_connect)
907 .await
908 .expect("retries forever");
909
910 self.metrics.observe_connect();
911 self.connected.store(true, atomic::Ordering::Relaxed);
912
913 client
914 }
915
916 async fn run_message_loop(mut self, mut client: ReplicaClient<T>) -> Result<(), anyhow::Error> {
922 loop {
923 select! {
924 command = self.command_rx.recv() => {
927 let Some(mut command) = command else {
928 tracing::debug!(%self.replica_id, "controller is no longer interested in this replica, shutting down message loop");
929 break;
930 };
931
932 self.specialize_command(&mut command);
933 client.send(command).await?;
934 },
935 response = client.recv() => {
938 let Some(response) = response? else {
939 bail!("replica unexpectedly gracefully terminated connection");
940 };
941
942 if self.response_tx.send((Some(self.replica_id), response)).is_err() {
943 tracing::debug!(%self.replica_id, "controller (receiver) is no longer interested in this replica, shutting down message loop");
944 break;
945 }
946 }
947 }
948 }
949
950 Ok(())
951 }
952
953 fn specialize_command(&self, command: &mut StorageCommand<T>) {
958 if let StorageCommand::Hello { nonce } = command {
959 *nonce = Uuid::new_v4();
960 }
961 }
962}