1use crate::CollectionMetadata;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::atomic::AtomicBool;
15use std::sync::{Arc, atomic};
16use std::time::{Duration, Instant};
17
18use anyhow::bail;
19use differential_dataflow::lattice::Lattice;
20use itertools::Itertools;
21use mz_build_info::BuildInfo;
22use mz_cluster_client::ReplicaId;
23use mz_cluster_client::client::ClusterReplicaLocation;
24use mz_ore::cast::CastFrom;
25use mz_ore::now::NowFn;
26use mz_ore::retry::{Retry, RetryState};
27use mz_ore::task::AbortOnDropHandle;
28use mz_repr::GlobalId;
29use mz_service::client::{GenericClient, Partitioned};
30use mz_service::params::GrpcClientParameters;
31use mz_service::transport;
32use mz_storage_client::client::{
33 RunIngestionCommand, RunSinkCommand, Status, StatusUpdate, StorageCommand, StorageResponse,
34};
35use mz_storage_client::metrics::{InstanceMetrics, ReplicaMetrics};
36use mz_storage_types::sinks::StorageSinkDesc;
37use mz_storage_types::sources::{IngestionDescription, SourceConnection};
38use timely::order::TotalOrder;
39use timely::progress::{Antichain, Timestamp};
40use tokio::select;
41use tokio::sync::mpsc;
42use tracing::{debug, info, warn};
43use uuid::Uuid;
44
45use crate::history::CommandHistory;
46
47#[derive(Debug)]
57pub(crate) struct Instance<T> {
58 pub workload_class: Option<String>,
62 replicas: BTreeMap<ReplicaId, Replica<T>>,
64 active_ingestions: BTreeMap<GlobalId, ActiveIngestion>,
70 ingestion_exports: BTreeMap<GlobalId, GlobalId>,
72 active_exports: BTreeMap<GlobalId, ActiveExport>,
78 history: CommandHistory<T>,
81 metrics: InstanceMetrics,
83 now: NowFn,
85 response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
91}
92
93#[derive(Debug)]
94struct ActiveIngestion {
95 active_replicas: BTreeSet<ReplicaId>,
97}
98
99#[derive(Debug)]
100struct ActiveExport {
101 active_replicas: BTreeSet<ReplicaId>,
103}
104
105impl<T> Instance<T>
106where
107 T: Timestamp + Lattice + TotalOrder + Sync,
108{
109 pub fn new(
111 workload_class: Option<String>,
112 metrics: InstanceMetrics,
113 now: NowFn,
114 instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
115 ) -> Self {
116 let history = CommandHistory::new(metrics.for_history());
117
118 let mut instance = Self {
119 workload_class,
120 replicas: Default::default(),
121 active_ingestions: Default::default(),
122 ingestion_exports: Default::default(),
123 active_exports: BTreeMap::new(),
124 history,
125 metrics,
126 now,
127 response_tx: instance_response_tx,
128 };
129
130 instance.send(StorageCommand::Hello {
131 nonce: Default::default(),
134 });
135
136 instance
137 }
138
139 pub fn replica_ids(&self) -> impl Iterator<Item = ReplicaId> + '_ {
141 self.replicas.keys().copied()
142 }
143
144 pub fn add_replica(&mut self, id: ReplicaId, config: ReplicaConfig) {
146 self.history.reduce();
149
150 let metrics = self.metrics.for_replica(id);
151 let replica = Replica::new(id, config, metrics, self.response_tx.clone());
152
153 self.replicas.insert(id, replica);
154
155 self.update_scheduling(false);
156
157 self.replay_commands(id);
158 }
159
160 pub fn replay_commands(&mut self, replica_id: ReplicaId) {
162 let commands = self.history.iter().cloned();
163
164 let filtered_commands = commands
165 .filter_map(|command| match command {
166 StorageCommand::RunIngestion(ingestion) => {
167 if self.is_active_replica(&ingestion.id, &replica_id) {
168 Some(StorageCommand::RunIngestion(ingestion))
169 } else {
170 None
171 }
172 }
173 StorageCommand::RunSink(sink) => {
174 if self.is_active_replica(&sink.id, &replica_id) {
175 Some(StorageCommand::RunSink(sink))
176 } else {
177 None
178 }
179 }
180 StorageCommand::AllowCompaction(id, upper) => {
181 if self.is_active_replica(&id, &replica_id) {
182 Some(StorageCommand::AllowCompaction(id, upper))
183 } else {
184 None
185 }
186 }
187 command => Some(command),
188 })
189 .collect::<Vec<_>>();
190
191 let replica = self
192 .replicas
193 .get_mut(&replica_id)
194 .expect("replica must exist");
195
196 for command in filtered_commands {
198 replica.send(command);
199 }
200 }
201
202 pub fn drop_replica(&mut self, id: ReplicaId) {
204 let replica = self.replicas.remove(&id);
205
206 let mut needs_rescheduling = false;
207 for (ingestion_id, ingestion) in self.active_ingestions.iter_mut() {
208 let was_running = ingestion.active_replicas.remove(&id);
209 if was_running {
210 tracing::debug!(
211 %ingestion_id,
212 replica_id = %id,
213 "ingestion was running on dropped replica, updating scheduling decisions"
214 );
215 needs_rescheduling = true;
216 }
217 }
218 for (export_id, export) in self.active_exports.iter_mut() {
219 let was_running = export.active_replicas.remove(&id);
220 if was_running {
221 tracing::debug!(
222 %export_id,
223 replica_id = %id,
224 "export was running on dropped replica, updating scheduling decisions"
225 );
226 needs_rescheduling = true;
227 }
228 }
229
230 tracing::info!(%id, %needs_rescheduling, "dropped replica");
231
232 if needs_rescheduling {
233 self.update_scheduling(true);
234 }
235
236 if replica.is_some() && self.replicas.is_empty() {
237 self.update_paused_statuses();
238 }
239 }
240
241 pub fn rehydrate_failed_replicas(&mut self) {
243 let replicas = self.replicas.iter();
244 let failed_replicas: Vec<_> = replicas
245 .filter_map(|(id, replica)| replica.failed().then_some(*id))
246 .collect();
247
248 for id in failed_replicas {
249 let replica = self.replicas.remove(&id).expect("must exist");
250 self.add_replica(id, replica.config);
251 }
252 }
253
254 pub fn active_ingestions(&self) -> impl Iterator<Item = &GlobalId> {
257 self.active_ingestions.keys()
258 }
259
260 pub fn active_ingestion_exports(&self) -> impl Iterator<Item = &GlobalId> {
268 let ingestion_exports = self.ingestion_exports.keys();
269 self.active_ingestions.keys().chain(ingestion_exports)
270 }
271
272 pub fn active_exports(&self) -> impl Iterator<Item = &GlobalId> {
274 self.active_exports.keys()
275 }
276
277 fn update_paused_statuses(&mut self) {
279 let now = mz_ore::now::to_datetime((self.now)());
280 let make_update = |id, object_type| StatusUpdate {
281 id,
282 status: Status::Paused,
283 timestamp: now,
284 error: None,
285 hints: BTreeSet::from([format!(
286 "There is currently no replica running this {object_type}"
287 )]),
288 namespaced_errors: Default::default(),
289 replica_id: None,
290 };
291
292 self.history.reduce();
293
294 let mut status_updates = Vec::new();
295 for command in self.history.iter() {
296 match command {
297 StorageCommand::RunIngestion(ingestion) => {
298 let old_style_ingestion =
299 ingestion.id != ingestion.description.remap_collection_id;
300 let subsource_ids = ingestion.description.collection_ids().filter(|id| {
301 let should_discard =
306 old_style_ingestion && id == &ingestion.description.remap_collection_id;
307 !should_discard
308 });
309 for id in subsource_ids {
310 status_updates.push(make_update(id, "source"));
311 }
312 }
313 StorageCommand::RunSink(sink) => {
314 status_updates.push(make_update(sink.id, "sink"));
315 }
316 _ => (),
317 }
318 }
319
320 for update in status_updates {
321 let _ = self
325 .response_tx
326 .send((None, StorageResponse::StatusUpdate(update)));
327 }
328 }
329
330 pub fn send(&mut self, command: StorageCommand<T>) {
332 self.history.push(command.clone());
334
335 match command.clone() {
336 StorageCommand::RunIngestion(ingestion) => {
337 self.absorb_ingestion(*ingestion.clone());
341
342 for replica in self.active_replicas(&ingestion.id) {
343 replica.send(StorageCommand::RunIngestion(ingestion.clone()));
344 }
345 }
346 StorageCommand::RunSink(sink) => {
347 self.absorb_export(*sink.clone());
351
352 for replica in self.active_replicas(&sink.id) {
353 replica.send(StorageCommand::RunSink(sink.clone()));
354 }
355 }
356 StorageCommand::AllowCompaction(id, frontier) => {
357 for replica in self.active_replicas(&id) {
360 replica.send(StorageCommand::AllowCompaction(
361 id.clone(),
362 frontier.clone(),
363 ));
364 }
365
366 self.absorb_compaction(id, frontier);
367 }
368 command => {
369 for replica in self.replicas.values_mut() {
370 replica.send(command.clone());
371 }
372 }
373 }
374
375 if command.installs_objects() && self.replicas.is_empty() {
376 self.update_paused_statuses();
377 }
378 }
379
380 fn absorb_ingestion(&mut self, ingestion: RunIngestionCommand) {
385 let existing_ingestion_state = self.active_ingestions.get_mut(&ingestion.id);
386
387 for id in ingestion.description.source_exports.keys() {
389 self.ingestion_exports.insert(id.clone(), ingestion.id);
390 }
391
392 if let Some(ingestion_state) = existing_ingestion_state {
393 tracing::debug!(
398 ingestion_id = %ingestion.id,
399 active_replicas = %ingestion_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
400 "updating ingestion"
401 );
402 } else {
403 let ingestion_state = ActiveIngestion {
405 active_replicas: BTreeSet::new(),
406 };
407 self.active_ingestions.insert(ingestion.id, ingestion_state);
408
409 self.update_scheduling(false);
411 }
412 }
413
414 fn absorb_export(&mut self, export: RunSinkCommand<T>) {
419 let existing_export_state = self.active_exports.get_mut(&export.id);
420
421 if let Some(export_state) = existing_export_state {
422 tracing::debug!(
427 export_id = %export.id,
428 active_replicas = %export_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
429 "updating export"
430 );
431 } else {
432 let export_state = ActiveExport {
434 active_replicas: BTreeSet::new(),
435 };
436 self.active_exports.insert(export.id, export_state);
437
438 self.update_scheduling(false);
440 }
441 }
442
443 fn update_scheduling(&mut self, send_commands: bool) {
462 #[derive(Debug)]
463 enum ObjectId {
464 Ingestion(GlobalId),
465 Export(GlobalId),
466 }
467 let mut scheduling_preferences: Vec<(ObjectId, bool)> = Vec::new();
472
473 for ingestion_id in self.active_ingestions.keys() {
474 let ingestion_description = self
475 .get_ingestion_description(ingestion_id)
476 .expect("missing ingestion description");
477
478 let prefers_single_replica = ingestion_description
479 .desc
480 .connection
481 .prefers_single_replica();
482
483 scheduling_preferences
484 .push((ObjectId::Ingestion(*ingestion_id), prefers_single_replica));
485 }
486
487 for export_id in self.active_exports.keys() {
488 scheduling_preferences.push((ObjectId::Export(*export_id), true));
490 }
491
492 let mut commands_by_replica: BTreeMap<ReplicaId, Vec<ObjectId>> = BTreeMap::new();
494
495 for (object_id, prefers_single_replica) in scheduling_preferences {
496 let active_replicas = match object_id {
497 ObjectId::Ingestion(ingestion_id) => {
498 &mut self
499 .active_ingestions
500 .get_mut(&ingestion_id)
501 .expect("missing ingestion state")
502 .active_replicas
503 }
504 ObjectId::Export(export_id) => {
505 &mut self
506 .active_exports
507 .get_mut(&export_id)
508 .expect("missing ingestion state")
509 .active_replicas
510 }
511 };
512
513 if prefers_single_replica {
514 if active_replicas.is_empty() {
516 let target_replica = self.replicas.keys().min().copied();
517 if let Some(first_replica_id) = target_replica {
518 tracing::info!(
519 object_id = ?object_id,
520 replica_id = %first_replica_id,
521 "scheduling single-replica object");
522 active_replicas.insert(first_replica_id);
523
524 commands_by_replica
525 .entry(first_replica_id)
526 .or_default()
527 .push(object_id);
528 }
529 } else {
530 tracing::info!(
531 ?object_id,
532 active_replicas = %active_replicas.iter().map(|id| id.to_string()).join(", "),
533 "single-replica object already running, not scheduling again",
534 );
535 }
536 } else {
537 let current_replica_ids: BTreeSet<_> = self.replicas.keys().copied().collect();
538 let unscheduled_replicas: Vec<_> = current_replica_ids
539 .difference(active_replicas)
540 .copied()
541 .collect();
542 for replica_id in unscheduled_replicas {
543 tracing::info!(
544 ?object_id,
545 %replica_id,
546 "scheduling multi-replica object"
547 );
548 active_replicas.insert(replica_id);
549 }
550 }
551 }
552
553 if send_commands {
554 for (replica_id, object_ids) in commands_by_replica {
555 let mut ingestion_commands = vec![];
556 let mut export_commands = vec![];
557 for object_id in object_ids {
558 match object_id {
559 ObjectId::Ingestion(id) => {
560 ingestion_commands.push(RunIngestionCommand {
561 id,
562 description: self
563 .get_ingestion_description(&id)
564 .expect("missing ingestion description")
565 .clone(),
566 });
567 }
568 ObjectId::Export(id) => {
569 export_commands.push(RunSinkCommand {
570 id,
571 description: self
572 .get_export_description(&id)
573 .expect("missing export description")
574 .clone(),
575 });
576 }
577 }
578 }
579 for ingestion in ingestion_commands {
580 let replica = self.replicas.get_mut(&replica_id).expect("missing replica");
581 let ingestion = Box::new(ingestion);
582 replica.send(StorageCommand::RunIngestion(ingestion));
583 }
584 for export in export_commands {
585 let replica = self.replicas.get_mut(&replica_id).expect("missing replica");
586 let export = Box::new(export);
587 replica.send(StorageCommand::RunSink(export));
588 }
589 }
590 }
591 }
592
593 pub fn get_ingestion_description(
601 &self,
602 id: &GlobalId,
603 ) -> Option<IngestionDescription<CollectionMetadata>> {
604 if !self.active_ingestions.contains_key(id) {
605 return None;
606 }
607
608 self.history.iter().rev().find_map(|command| {
609 if let StorageCommand::RunIngestion(ingestion) = command {
610 if &ingestion.id == id {
611 Some(ingestion.description.clone())
612 } else {
613 None
614 }
615 } else {
616 None
617 }
618 })
619 }
620
621 pub fn get_export_description(
629 &self,
630 id: &GlobalId,
631 ) -> Option<StorageSinkDesc<CollectionMetadata, T>> {
632 if !self.active_exports.contains_key(id) {
633 return None;
634 }
635
636 self.history.iter().rev().find_map(|command| {
637 if let StorageCommand::RunSink(sink) = command {
638 if &sink.id == id {
639 Some(sink.description.clone())
640 } else {
641 None
642 }
643 } else {
644 None
645 }
646 })
647 }
648
649 fn absorb_compaction(&mut self, id: GlobalId, frontier: Antichain<T>) {
651 tracing::debug!(?self.active_ingestions, ?id, ?frontier, "allow_compaction");
652
653 if frontier.is_empty() {
654 self.active_ingestions.remove(&id);
655 self.ingestion_exports.remove(&id);
656 self.active_exports.remove(&id);
657 }
658 }
659
660 fn active_replicas(&mut self, id: &GlobalId) -> Box<dyn Iterator<Item = &mut Replica<T>> + '_> {
662 if let Some(ingestion_id) = self.ingestion_exports.get(id) {
663 match self.active_ingestions.get(ingestion_id) {
664 Some(ingestion) => Box::new(self.replicas.iter_mut().filter_map(
665 move |(replica_id, replica)| {
666 if ingestion.active_replicas.contains(replica_id) {
667 Some(replica)
668 } else {
669 None
670 }
671 },
672 )),
673 None => {
674 Box::new(std::iter::empty())
676 }
677 }
678 } else if let Some(ingestion) = self.active_ingestions.get(id) {
679 Box::new(
680 self.replicas
681 .iter_mut()
682 .filter_map(move |(replica_id, replica)| {
683 if ingestion.active_replicas.contains(replica_id) {
684 Some(replica)
685 } else {
686 None
687 }
688 }),
689 )
690 } else if let Some(export) = self.active_exports.get(id) {
691 Box::new(
692 self.replicas
693 .iter_mut()
694 .filter_map(move |(replica_id, replica)| {
695 if export.active_replicas.contains(replica_id) {
696 Some(replica)
697 } else {
698 None
699 }
700 }),
701 )
702 } else {
703 Box::new(self.replicas.values_mut())
704 }
705 }
706
707 fn is_active_replica(&self, id: &GlobalId, replica_id: &ReplicaId) -> bool {
709 if let Some(ingestion_id) = self.ingestion_exports.get(id) {
710 match self.active_ingestions.get(ingestion_id) {
711 Some(ingestion) => ingestion.active_replicas.contains(replica_id),
712 None => {
713 false
715 }
716 }
717 } else if let Some(ingestion) = self.active_ingestions.get(id) {
718 ingestion.active_replicas.contains(replica_id)
719 } else if let Some(export) = self.active_exports.get(id) {
720 export.active_replicas.contains(replica_id)
721 } else {
722 true
724 }
725 }
726
727 pub(super) fn refresh_state_metrics(&self) {
736 let connected_replica_count = self.replicas.values().filter(|r| r.is_connected()).count();
737
738 self.metrics
739 .connected_replica_count
740 .set(u64::cast_from(connected_replica_count));
741 }
742
743 pub fn get_active_replicas_for_object(&self, id: &GlobalId) -> BTreeSet<ReplicaId> {
746 if let Some(ingestion_id) = self.ingestion_exports.get(id) {
747 match self.active_ingestions.get(ingestion_id) {
749 Some(ingestion) => ingestion.active_replicas.clone(),
750 None => {
751 BTreeSet::new()
753 }
754 }
755 } else {
756 self.replicas.keys().copied().collect()
758 }
759 }
760}
761
762#[derive(Clone, Debug)]
764pub(super) struct ReplicaConfig {
765 pub build_info: &'static BuildInfo,
766 pub location: ClusterReplicaLocation,
767 pub grpc_client: GrpcClientParameters,
768}
769
770#[derive(Debug)]
772pub struct Replica<T> {
773 config: ReplicaConfig,
775 command_tx: mpsc::UnboundedSender<StorageCommand<T>>,
780 task: AbortOnDropHandle<()>,
782 connected: Arc<AtomicBool>,
784}
785
786impl<T> Replica<T>
787where
788 T: Timestamp + Lattice + Sync,
789{
790 fn new(
792 id: ReplicaId,
793 config: ReplicaConfig,
794 metrics: ReplicaMetrics,
795 response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
796 ) -> Self {
797 let (command_tx, command_rx) = mpsc::unbounded_channel();
798 let connected = Arc::new(AtomicBool::new(false));
799
800 let task = mz_ore::task::spawn(
801 || "storage-replica-{id}",
802 ReplicaTask {
803 replica_id: id,
804 config: config.clone(),
805 metrics: metrics.clone(),
806 connected: Arc::clone(&connected),
807 command_rx,
808 response_tx,
809 }
810 .run(),
811 );
812
813 Self {
814 config,
815 command_tx,
816 task: task.abort_on_drop(),
817 connected,
818 }
819 }
820
821 fn send(&self, command: StorageCommand<T>) {
823 let _ = self.command_tx.send(command);
825 }
826
827 fn failed(&self) -> bool {
830 self.task.is_finished()
831 }
832
833 pub(super) fn is_connected(&self) -> bool {
835 self.connected.load(atomic::Ordering::Relaxed)
836 }
837}
838
839type StorageCtpClient<T> = transport::Client<StorageCommand<T>, StorageResponse<T>>;
840type ReplicaClient<T> = Partitioned<StorageCtpClient<T>, StorageCommand<T>, StorageResponse<T>>;
841
842struct ReplicaTask<T> {
844 replica_id: ReplicaId,
846 config: ReplicaConfig,
848 metrics: ReplicaMetrics,
850 connected: Arc<AtomicBool>,
852 command_rx: mpsc::UnboundedReceiver<StorageCommand<T>>,
854 response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
856}
857
858impl<T> ReplicaTask<T>
859where
860 T: Timestamp + Lattice + Sync,
861{
862 async fn run(self) {
864 let replica_id = self.replica_id;
865 info!(%replica_id, "starting replica task");
866
867 let client = self.connect().await;
868 match self.run_message_loop(client).await {
869 Ok(()) => info!(%replica_id, "stopped replica task"),
870 Err(error) => warn!(%replica_id, %error, "replica task failed"),
871 }
872 }
873
874 async fn connect(&self) -> ReplicaClient<T> {
879 let try_connect = async move |retry: RetryState| {
880 let version = self.config.build_info.semver_version();
881 let client_params = &self.config.grpc_client;
882
883 let connect_start = Instant::now();
884 let connect_timeout = client_params.connect_timeout.unwrap_or(Duration::MAX);
885 let keepalive_timeout = client_params
886 .http2_keep_alive_timeout
887 .unwrap_or(Duration::MAX);
888
889 let connect_result = StorageCtpClient::<T>::connect_partitioned(
890 self.config.location.ctl_addrs.clone(),
891 version,
892 connect_timeout,
893 keepalive_timeout,
894 self.metrics.clone(),
895 )
896 .await;
897
898 self.metrics.observe_connect_time(connect_start.elapsed());
899
900 connect_result.inspect_err(|error| {
901 let next_backoff = retry.next_backoff.unwrap();
902 if retry.i >= mz_service::retry::INFO_MIN_RETRIES {
903 info!(
904 replica_id = %self.replica_id, ?next_backoff,
905 "error connecting to replica: {error:#}",
906 );
907 } else {
908 debug!(
909 replica_id = %self.replica_id, ?next_backoff,
910 "error connecting to replica: {error:#}",
911 );
912 }
913 })
914 };
915
916 let client = Retry::default()
917 .clamp_backoff(Duration::from_secs(1))
918 .retry_async(try_connect)
919 .await
920 .expect("retries forever");
921
922 self.metrics.observe_connect();
923 self.connected.store(true, atomic::Ordering::Relaxed);
924
925 client
926 }
927
928 async fn run_message_loop(mut self, mut client: ReplicaClient<T>) -> Result<(), anyhow::Error> {
934 loop {
935 select! {
936 command = self.command_rx.recv() => {
939 let Some(mut command) = command else {
940 tracing::debug!(%self.replica_id, "controller is no longer interested in this replica, shutting down message loop");
941 break;
942 };
943
944 self.specialize_command(&mut command);
945 client.send(command).await?;
946 },
947 response = client.recv() => {
950 let Some(response) = response? else {
951 bail!("replica unexpectedly gracefully terminated connection");
952 };
953
954 if self.response_tx.send((Some(self.replica_id), response)).is_err() {
955 tracing::debug!(%self.replica_id, "controller (receiver) is no longer interested in this replica, shutting down message loop");
956 break;
957 }
958 }
959 }
960 }
961
962 Ok(())
963 }
964
965 fn specialize_command(&self, command: &mut StorageCommand<T>) {
970 if let StorageCommand::Hello { nonce } = command {
971 *nonce = Uuid::new_v4();
972 }
973 }
974}