Skip to main content

mz_storage_controller/
instance.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller for a storage instance.
11
12use crate::CollectionMetadata;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::atomic::AtomicBool;
15use std::sync::{Arc, atomic};
16use std::time::{Duration, Instant};
17
18use anyhow::bail;
19use itertools::Itertools;
20use mz_build_info::BuildInfo;
21use mz_cluster_client::ReplicaId;
22use mz_cluster_client::client::ClusterReplicaLocation;
23use mz_ore::cast::CastFrom;
24use mz_ore::now::NowFn;
25use mz_ore::retry::{Retry, RetryState};
26use mz_ore::task::AbortOnDropHandle;
27use mz_repr::{GlobalId, Timestamp};
28use mz_service::client::{GenericClient, Partitioned};
29use mz_service::params::GrpcClientParameters;
30use mz_service::transport;
31use mz_storage_client::client::{
32    RunIngestionCommand, RunSinkCommand, Status, StatusUpdate, StorageCommand, StorageResponse,
33};
34use mz_storage_client::metrics::{InstanceMetrics, ReplicaMetrics};
35use mz_storage_types::sinks::StorageSinkDesc;
36use mz_storage_types::sources::{IngestionDescription, SourceConnection};
37use timely::progress::Antichain;
38use tokio::select;
39use tokio::sync::mpsc;
40use tracing::{debug, info, warn};
41use uuid::Uuid;
42
43use crate::history::CommandHistory;
44
45/// A controller for a storage instance.
46///
47/// Encapsulates communication with replicas in this instance, and their rehydration.
48///
49/// Note that storage objects (sources and sinks) don't currently support replication (database-issues#5051).
50/// An instance can have multiple replicas connected, but only if it has no storage objects
51/// installed. Attempting to install storage objects on multi-replica instances, or attempting to
52/// add more than one replica to instances that have storage objects installed, is illegal and will
53/// lead to panics.
54#[derive(Debug)]
55pub(crate) struct Instance {
56    /// The workload class of this instance.
57    ///
58    /// This is currently only used to annotate metrics.
59    pub workload_class: Option<String>,
60    /// The replicas connected to this storage instance.
61    replicas: BTreeMap<ReplicaId, Replica>,
62    /// The ingestions currently running on this instance.
63    ///
64    /// While this is derivable from `history` on demand, keeping a denormalized
65    /// list of running ingestions is quite a bit more convenient in the
66    /// implementation of `StorageController::active_ingestions`.
67    active_ingestions: BTreeMap<GlobalId, ActiveIngestion>,
68    /// A map from ingestion export ID to the ingestion that is producing it.
69    ingestion_exports: BTreeMap<GlobalId, GlobalId>,
70    /// The exports currently running on this instance.
71    ///
72    /// While this is derivable from `history` on demand, keeping a denormalized
73    /// list of running exports is quite a bit more convenient for the
74    /// controller.
75    active_exports: BTreeMap<GlobalId, ActiveExport>,
76    /// The command history, used to replay past commands when introducing new replicas or
77    /// reconnecting to existing replicas.
78    history: CommandHistory,
79    /// Metrics tracked for this storage instance.
80    metrics: InstanceMetrics,
81    /// A function that returns the current time.
82    now: NowFn,
83    /// A sender for responses from replicas.
84    ///
85    /// Responses are tagged with the [`ReplicaId`] of the replica that sent the
86    /// response. Responses that don't originate from a replica (e.g. a "paused"
87    /// status update, when no replicas are connected) are tagged with `None`.
88    response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse)>,
89}
90
91#[derive(Debug)]
92struct ActiveIngestion {
93    /// The set of replicas that this ingestion is currently running on.
94    active_replicas: BTreeSet<ReplicaId>,
95}
96
97#[derive(Debug)]
98struct ActiveExport {
99    /// The set of replicas that this export is currently running on.
100    active_replicas: BTreeSet<ReplicaId>,
101}
102
103impl Instance {
104    /// Creates a new [`Instance`].
105    pub fn new(
106        workload_class: Option<String>,
107        metrics: InstanceMetrics,
108        now: NowFn,
109        instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse)>,
110    ) -> Self {
111        let history = CommandHistory::new(metrics.for_history());
112
113        let mut instance = Self {
114            workload_class,
115            replicas: Default::default(),
116            active_ingestions: Default::default(),
117            ingestion_exports: Default::default(),
118            active_exports: BTreeMap::new(),
119            history,
120            metrics,
121            now,
122            response_tx: instance_response_tx,
123        };
124
125        instance.send(StorageCommand::Hello {
126            // The nonce is protocol iteration-specific and will be set in
127            // `ReplicaTask::specialize_command`.
128            nonce: Default::default(),
129        });
130
131        instance
132    }
133
134    /// Returns the IDs of all replicas connected to this storage instance.
135    pub fn replica_ids(&self) -> impl Iterator<Item = ReplicaId> + '_ {
136        self.replicas.keys().copied()
137    }
138
139    /// Adds a new replica to this storage instance.
140    pub fn add_replica(&mut self, id: ReplicaId, config: ReplicaConfig) {
141        // Reduce the history to limit the amount of commands sent to the new replica, and to
142        // enable the `objects_installed` assert below.
143        self.history.reduce();
144
145        let metrics = self.metrics.for_replica(id);
146        let replica = Replica::new(id, config, metrics, self.response_tx.clone());
147
148        self.replicas.insert(id, replica);
149
150        self.update_scheduling(false);
151
152        self.replay_commands(id);
153    }
154
155    /// Replays commands to the specified replica.
156    pub fn replay_commands(&mut self, replica_id: ReplicaId) {
157        let commands = self.history.iter().cloned();
158
159        let filtered_commands = commands
160            .filter_map(|command| match command {
161                StorageCommand::RunIngestion(ingestion) => {
162                    if self.is_active_replica(&ingestion.id, &replica_id) {
163                        Some(StorageCommand::RunIngestion(ingestion))
164                    } else {
165                        None
166                    }
167                }
168                StorageCommand::RunSink(sink) => {
169                    if self.is_active_replica(&sink.id, &replica_id) {
170                        Some(StorageCommand::RunSink(sink))
171                    } else {
172                        None
173                    }
174                }
175                StorageCommand::AllowCompaction(id, upper) => {
176                    if self.is_active_replica(&id, &replica_id) {
177                        Some(StorageCommand::AllowCompaction(id, upper))
178                    } else {
179                        None
180                    }
181                }
182                command => Some(command),
183            })
184            .collect::<Vec<_>>();
185
186        let replica = self
187            .replicas
188            .get_mut(&replica_id)
189            .expect("replica must exist");
190
191        // Replay the commands at the new replica.
192        for command in filtered_commands {
193            replica.send(command);
194        }
195    }
196
197    /// Removes the identified replica from this storage instance.
198    pub fn drop_replica(&mut self, id: ReplicaId) {
199        let replica = self.replicas.remove(&id);
200
201        let mut needs_rescheduling = false;
202        for (ingestion_id, ingestion) in self.active_ingestions.iter_mut() {
203            let was_running = ingestion.active_replicas.remove(&id);
204            if was_running {
205                tracing::debug!(
206                    %ingestion_id,
207                    replica_id = %id,
208                    "ingestion was running on dropped replica, updating scheduling decisions"
209                );
210                needs_rescheduling = true;
211            }
212        }
213        for (export_id, export) in self.active_exports.iter_mut() {
214            let was_running = export.active_replicas.remove(&id);
215            if was_running {
216                tracing::debug!(
217                    %export_id,
218                    replica_id = %id,
219                    "export was running on dropped replica, updating scheduling decisions"
220                );
221                needs_rescheduling = true;
222            }
223        }
224
225        tracing::info!(%id, %needs_rescheduling, "dropped replica");
226
227        if needs_rescheduling {
228            self.update_scheduling(true);
229        }
230
231        if replica.is_some() && self.replicas.is_empty() {
232            self.update_paused_statuses();
233        }
234    }
235
236    /// Rehydrates any failed replicas of this storage instance.
237    pub fn rehydrate_failed_replicas(&mut self) {
238        let replicas = self.replicas.iter();
239        let failed_replicas: Vec<_> = replicas
240            .filter_map(|(id, replica)| replica.failed().then_some(*id))
241            .collect();
242
243        for id in failed_replicas {
244            let replica = self.replicas.remove(&id).expect("must exist");
245            self.add_replica(id, replica.config);
246        }
247    }
248
249    /// Returns ingestions running on this instance. This _only_ includes the
250    /// "toplevel" ingestions, not any of their source tables (aka. subsources).
251    pub fn active_ingestions(&self) -> impl Iterator<Item = &GlobalId> {
252        self.active_ingestions.keys()
253    }
254
255    /// Returns ingestion exports running on this instance. This includes the
256    /// ingestion itself, if any, and running source tables (aka. subsources).
257    ///
258    /// This does _not_ filter out exports whose write frontier is the empty
259    /// frontier, which some might consider not active anymore. But for the
260    /// purposes of the instance controller, these are still considered active
261    /// because we don't know about frontiers.
262    pub fn active_ingestion_exports(&self) -> impl Iterator<Item = &GlobalId> {
263        let ingestion_exports = self.ingestion_exports.keys();
264        self.active_ingestions.keys().chain(ingestion_exports)
265    }
266
267    /// Returns the exports running on this instance.
268    pub fn active_exports(&self) -> impl Iterator<Item = &GlobalId> {
269        self.active_exports.keys()
270    }
271
272    /// Sets the status to paused for all sources/sinks in the history.
273    fn update_paused_statuses(&mut self) {
274        let now = mz_ore::now::to_datetime((self.now)());
275        let make_update = |id, object_type| StatusUpdate {
276            id,
277            status: Status::Paused,
278            timestamp: now,
279            error: None,
280            hints: BTreeSet::from([format!(
281                "There is currently no replica running this {object_type}"
282            )]),
283            namespaced_errors: Default::default(),
284            replica_id: None,
285        };
286
287        self.history.reduce();
288
289        let mut status_updates = Vec::new();
290        for command in self.history.iter() {
291            match command {
292                StorageCommand::RunIngestion(ingestion) => {
293                    let old_style_ingestion =
294                        ingestion.id != ingestion.description.remap_collection_id;
295                    let subsource_ids = ingestion.description.collection_ids().filter(|id| {
296                        // NOTE(aljoscha): We filter out the remap collection for old style
297                        // ingestions because it doesn't get any status updates about it from the
298                        // replica side. So we don't want to synthesize a 'paused' status here.
299                        // New style ingestion do, since the source itself contains the remap data.
300                        let should_discard =
301                            old_style_ingestion && id == &ingestion.description.remap_collection_id;
302                        !should_discard
303                    });
304                    for id in subsource_ids {
305                        status_updates.push(make_update(id, "source"));
306                    }
307                }
308                StorageCommand::RunSink(sink) => {
309                    status_updates.push(make_update(sink.id, "sink"));
310                }
311                _ => (),
312            }
313        }
314
315        for update in status_updates {
316            // NOTE: If we lift this "inject paused status" logic to the
317            // controller, we could instead return ReplicaId instead of an
318            // Option<ReplicaId>.
319            let _ = self
320                .response_tx
321                .send((None, StorageResponse::StatusUpdate(update)));
322        }
323    }
324
325    /// Sends a command to this storage instance.
326    pub fn send(&mut self, command: StorageCommand) {
327        // Record the command so that new replicas can be brought up to speed.
328        self.history.push(command.clone());
329
330        match command.clone() {
331            StorageCommand::RunIngestion(ingestion) => {
332                // First absorb into our state, because this might change
333                // scheduling decisions, which need to be respected just below
334                // when sending commands.
335                self.absorb_ingestion(*ingestion.clone());
336
337                for replica in self.active_replicas(&ingestion.id) {
338                    replica.send(StorageCommand::RunIngestion(ingestion.clone()));
339                }
340            }
341            StorageCommand::RunSink(sink) => {
342                // First absorb into our state, because this might change
343                // scheduling decisions, which need to be respected just below
344                // when sending commands.
345                self.absorb_export(*sink.clone());
346
347                for replica in self.active_replicas(&sink.id) {
348                    replica.send(StorageCommand::RunSink(sink.clone()));
349                }
350            }
351            StorageCommand::AllowCompaction(id, frontier) => {
352                // First send out commands and then absorb into our state since
353                // absorbing them might remove entries from active_ingestions.
354                for replica in self.active_replicas(&id) {
355                    replica.send(StorageCommand::AllowCompaction(
356                        id.clone(),
357                        frontier.clone(),
358                    ));
359                }
360
361                self.absorb_compaction(id, frontier);
362            }
363            command => {
364                for replica in self.replicas.values_mut() {
365                    replica.send(command.clone());
366                }
367            }
368        }
369
370        if command.installs_objects() && self.replicas.is_empty() {
371            self.update_paused_statuses();
372        }
373    }
374
375    /// Updates internal state based on incoming ingestion commands.
376    ///
377    /// This does _not_ send commands to replicas, we only record the ingestion
378    /// in state and potentially update scheduling decisions.
379    fn absorb_ingestion(&mut self, ingestion: RunIngestionCommand) {
380        let existing_ingestion_state = self.active_ingestions.get_mut(&ingestion.id);
381
382        // Always update our mapping from export to their ingestion.
383        for id in ingestion.description.source_exports.keys() {
384            self.ingestion_exports.insert(id.clone(), ingestion.id);
385        }
386
387        if let Some(ingestion_state) = existing_ingestion_state {
388            // It's an update for an existing ingestion. We don't need to
389            // change anything about our scheduling decisions, no need to
390            // update active_ingestions.
391
392            tracing::debug!(
393                ingestion_id = %ingestion.id,
394                active_replicas = %ingestion_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
395                "updating ingestion"
396            );
397        } else {
398            // We create a new ingestion state for this ingestion.
399            let ingestion_state = ActiveIngestion {
400                active_replicas: BTreeSet::new(),
401            };
402            self.active_ingestions.insert(ingestion.id, ingestion_state);
403
404            // Maybe update scheduling decisions.
405            self.update_scheduling(false);
406        }
407    }
408
409    /// Updates internal state based on incoming export commands.
410    ///
411    /// This does _not_ send commands to replicas, we only record the export
412    /// in state and potentially update scheduling decisions.
413    fn absorb_export(&mut self, export: RunSinkCommand) {
414        let existing_export_state = self.active_exports.get_mut(&export.id);
415
416        if let Some(export_state) = existing_export_state {
417            // It's an update for an existing export. We don't need to
418            // change anything about our scheduling decisions, no need to
419            // update active_exports.
420
421            tracing::debug!(
422                export_id = %export.id,
423                active_replicas = %export_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
424                "updating export"
425            );
426        } else {
427            // We create a new export state for this export.
428            let export_state = ActiveExport {
429                active_replicas: BTreeSet::new(),
430            };
431            self.active_exports.insert(export.id, export_state);
432
433            // Maybe update scheduling decisions.
434            self.update_scheduling(false);
435        }
436    }
437
438    /// Update scheduling decisions, that is what replicas should be running a
439    /// given object, if needed.
440    ///
441    /// An important property of this scheduling algorithm is that we never
442    /// change the scheduling decision for single-replica objects unless we
443    /// have to, that is unless the replica that they are running on goes away.
444    /// We do this, so that we don't send a mix of "run"/"allow
445    /// compaction"/"run" messages to replicas, which wouldn't deal well with
446    /// this. When we _do_ have to make a scheduling decision we schedule a
447    /// single-replica ingestion on the first replica, according to the sort
448    /// order of `ReplicaId`. We do this latter so that the scheduling decision
449    /// is stable across restarts of `environmentd`/the controller.
450    ///
451    /// For multi-replica objects (e.g. Kafka ingestions), each active object is
452    /// scheduled on all replicas.
453    ///
454    /// If `send_commands` is true, will send commands for newly-scheduled
455    /// single-replica objects.
456    fn update_scheduling(&mut self, send_commands: bool) {
457        #[derive(Debug)]
458        enum ObjectId {
459            Ingestion(GlobalId),
460            Export(GlobalId),
461        }
462        // We first collect scheduling preferences and then schedule below.
463        // Applying the decision needs a mutable borrow but we also need to
464        // borrow for determining `prefers_single_replica`, so we split this
465        // into two loops.
466        let mut scheduling_preferences: Vec<(ObjectId, bool)> = Vec::new();
467
468        for ingestion_id in self.active_ingestions.keys() {
469            let ingestion_description = self
470                .get_ingestion_description(ingestion_id)
471                .expect("missing ingestion description");
472
473            let prefers_single_replica = ingestion_description
474                .desc
475                .connection
476                .prefers_single_replica();
477
478            scheduling_preferences
479                .push((ObjectId::Ingestion(*ingestion_id), prefers_single_replica));
480        }
481
482        for export_id in self.active_exports.keys() {
483            // All sinks prefer single replica
484            scheduling_preferences.push((ObjectId::Export(*export_id), true));
485        }
486
487        // Collect all commands per replica and send them in one go.
488        let mut commands_by_replica: BTreeMap<ReplicaId, Vec<ObjectId>> = BTreeMap::new();
489
490        for (object_id, prefers_single_replica) in scheduling_preferences {
491            let active_replicas = match object_id {
492                ObjectId::Ingestion(ingestion_id) => {
493                    &mut self
494                        .active_ingestions
495                        .get_mut(&ingestion_id)
496                        .expect("missing ingestion state")
497                        .active_replicas
498                }
499                ObjectId::Export(export_id) => {
500                    &mut self
501                        .active_exports
502                        .get_mut(&export_id)
503                        .expect("missing ingestion state")
504                        .active_replicas
505                }
506            };
507
508            if prefers_single_replica {
509                // For single-replica ingestion, schedule only if it's not already running.
510                if active_replicas.is_empty() {
511                    let target_replica = self.replicas.keys().min().copied();
512                    if let Some(first_replica_id) = target_replica {
513                        tracing::info!(
514                            object_id = ?object_id,
515                            replica_id = %first_replica_id,
516                            "scheduling single-replica object");
517                        active_replicas.insert(first_replica_id);
518
519                        commands_by_replica
520                            .entry(first_replica_id)
521                            .or_default()
522                            .push(object_id);
523                    }
524                } else {
525                    tracing::info!(
526                        ?object_id,
527                        active_replicas = %active_replicas.iter().map(|id| id.to_string()).join(", "),
528                        "single-replica object already running, not scheduling again",
529                    );
530                }
531            } else {
532                let current_replica_ids: BTreeSet<_> = self.replicas.keys().copied().collect();
533                let unscheduled_replicas: Vec<_> = current_replica_ids
534                    .difference(active_replicas)
535                    .copied()
536                    .collect();
537                for replica_id in unscheduled_replicas {
538                    tracing::info!(
539                        ?object_id,
540                        %replica_id,
541                        "scheduling multi-replica object"
542                    );
543                    active_replicas.insert(replica_id);
544                }
545            }
546        }
547
548        if send_commands {
549            for (replica_id, object_ids) in commands_by_replica {
550                let mut ingestion_commands = vec![];
551                let mut export_commands = vec![];
552                for object_id in object_ids {
553                    match object_id {
554                        ObjectId::Ingestion(id) => {
555                            ingestion_commands.push(RunIngestionCommand {
556                                id,
557                                description: self
558                                    .get_ingestion_description(&id)
559                                    .expect("missing ingestion description")
560                                    .clone(),
561                            });
562                        }
563                        ObjectId::Export(id) => {
564                            export_commands.push(RunSinkCommand {
565                                id,
566                                description: self
567                                    .get_export_description(&id)
568                                    .expect("missing export description")
569                                    .clone(),
570                            });
571                        }
572                    }
573                }
574                for ingestion in ingestion_commands {
575                    let replica = self.replicas.get_mut(&replica_id).expect("missing replica");
576                    let ingestion = Box::new(ingestion);
577                    replica.send(StorageCommand::RunIngestion(ingestion));
578                }
579                for export in export_commands {
580                    let replica = self.replicas.get_mut(&replica_id).expect("missing replica");
581                    let export = Box::new(export);
582                    replica.send(StorageCommand::RunSink(export));
583                }
584            }
585        }
586    }
587
588    /// Returns the ingestion description for the given ingestion ID, if it
589    /// exists.
590    ///
591    /// This function searches through the command history to find the most
592    /// recent RunIngestionCommand for the specified ingestion ID and returns
593    /// its description.  Returns None if no ingestion with the given ID is
594    /// found.
595    pub fn get_ingestion_description(
596        &self,
597        id: &GlobalId,
598    ) -> Option<IngestionDescription<CollectionMetadata>> {
599        if !self.active_ingestions.contains_key(id) {
600            return None;
601        }
602
603        self.history.iter().rev().find_map(|command| {
604            if let StorageCommand::RunIngestion(ingestion) = command {
605                if &ingestion.id == id {
606                    Some(ingestion.description.clone())
607                } else {
608                    None
609                }
610            } else {
611                None
612            }
613        })
614    }
615
616    /// Returns the export description for the given export ID, if it
617    /// exists.
618    ///
619    /// This function searches through the command history to find the most
620    /// recent RunSinkCommand for the specified export ID and returns
621    /// its description.  Returns None if no ingestion with the given ID is
622    /// found.
623    pub fn get_export_description(
624        &self,
625        id: &GlobalId,
626    ) -> Option<StorageSinkDesc<CollectionMetadata>> {
627        if !self.active_exports.contains_key(id) {
628            return None;
629        }
630
631        self.history.iter().rev().find_map(|command| {
632            if let StorageCommand::RunSink(sink) = command {
633                if &sink.id == id {
634                    Some(sink.description.clone())
635                } else {
636                    None
637                }
638            } else {
639                None
640            }
641        })
642    }
643
644    /// Updates internal state based on incoming compaction commands.
645    fn absorb_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
646        tracing::debug!(?self.active_ingestions, ?id, ?frontier, "allow_compaction");
647
648        if frontier.is_empty() {
649            self.active_ingestions.remove(&id);
650            self.ingestion_exports.remove(&id);
651            self.active_exports.remove(&id);
652        }
653    }
654
655    /// Returns the replicas that are actively running the given object (ingestion or export).
656    fn active_replicas(&mut self, id: &GlobalId) -> Box<dyn Iterator<Item = &mut Replica> + '_> {
657        if let Some(ingestion_id) = self.ingestion_exports.get(id) {
658            match self.active_ingestions.get(ingestion_id) {
659                Some(ingestion) => Box::new(self.replicas.iter_mut().filter_map(
660                    move |(replica_id, replica)| {
661                        if ingestion.active_replicas.contains(replica_id) {
662                            Some(replica)
663                        } else {
664                            None
665                        }
666                    },
667                )),
668                None => {
669                    // The ingestion has already been compacted away (aka. stopped).
670                    Box::new(std::iter::empty())
671                }
672            }
673        } else if let Some(ingestion) = self.active_ingestions.get(id) {
674            Box::new(
675                self.replicas
676                    .iter_mut()
677                    .filter_map(move |(replica_id, replica)| {
678                        if ingestion.active_replicas.contains(replica_id) {
679                            Some(replica)
680                        } else {
681                            None
682                        }
683                    }),
684            )
685        } else if let Some(export) = self.active_exports.get(id) {
686            Box::new(
687                self.replicas
688                    .iter_mut()
689                    .filter_map(move |(replica_id, replica)| {
690                        if export.active_replicas.contains(replica_id) {
691                            Some(replica)
692                        } else {
693                            None
694                        }
695                    }),
696            )
697        } else {
698            Box::new(self.replicas.values_mut())
699        }
700    }
701
702    /// Returns whether the given replica is actively running the given object (ingestion or export).
703    fn is_active_replica(&self, id: &GlobalId, replica_id: &ReplicaId) -> bool {
704        if let Some(ingestion_id) = self.ingestion_exports.get(id) {
705            match self.active_ingestions.get(ingestion_id) {
706                Some(ingestion) => ingestion.active_replicas.contains(replica_id),
707                None => {
708                    // The ingestion has already been compacted away (aka. stopped).
709                    false
710                }
711            }
712        } else if let Some(ingestion) = self.active_ingestions.get(id) {
713            ingestion.active_replicas.contains(replica_id)
714        } else if let Some(export) = self.active_exports.get(id) {
715            export.active_replicas.contains(replica_id)
716        } else {
717            // For non-ingestion objects, all replicas are active
718            true
719        }
720    }
721
722    /// Refresh the controller state metrics for this instance.
723    ///
724    /// We could also do state metric updates directly in response to state changes, but that would
725    /// mean littering the code with metric update calls. Encapsulating state metric maintenance in
726    /// a single method is less noisy.
727    ///
728    /// This method is invoked by `Controller::maintain`, which we expect to be called once per
729    /// second during normal operation.
730    pub(super) fn refresh_state_metrics(&self) {
731        let connected_replica_count = self.replicas.values().filter(|r| r.is_connected()).count();
732
733        self.metrics
734            .connected_replica_count
735            .set(u64::cast_from(connected_replica_count));
736    }
737
738    /// Returns the set of replica IDs that are actively running the given
739    /// object (ingestion, ingestion export (aka. subsource), or export).
740    pub fn get_active_replicas_for_object(&self, id: &GlobalId) -> BTreeSet<ReplicaId> {
741        if let Some(ingestion_id) = self.ingestion_exports.get(id) {
742            // Right now, only ingestions can have per-replica scheduling decisions.
743            match self.active_ingestions.get(ingestion_id) {
744                Some(ingestion) => ingestion.active_replicas.clone(),
745                None => {
746                    // The ingestion has already been compacted away (aka. stopped).
747                    BTreeSet::new()
748                }
749            }
750        } else {
751            // For non-ingestion objects, all replicas are active
752            self.replicas.keys().copied().collect()
753        }
754    }
755}
756
757/// Replica-specific configuration.
758#[derive(Clone, Debug)]
759pub(super) struct ReplicaConfig {
760    pub build_info: &'static BuildInfo,
761    pub location: ClusterReplicaLocation,
762    pub grpc_client: GrpcClientParameters,
763}
764
765/// State maintained about individual replicas.
766#[derive(Debug)]
767pub struct Replica {
768    /// Replica configuration.
769    config: ReplicaConfig,
770    /// A sender for commands for the replica.
771    ///
772    /// If sending to this channel fails, the replica has failed and requires
773    /// rehydration.
774    command_tx: mpsc::UnboundedSender<StorageCommand>,
775    /// A handle to the task that aborts it when the replica is dropped.
776    task: AbortOnDropHandle<()>,
777    /// Flag reporting whether the replica connection has been established.
778    connected: Arc<AtomicBool>,
779}
780
781impl Replica {
782    /// Creates a new [`Replica`].
783    fn new(
784        id: ReplicaId,
785        config: ReplicaConfig,
786        metrics: ReplicaMetrics,
787        response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse)>,
788    ) -> Self {
789        let (command_tx, command_rx) = mpsc::unbounded_channel();
790        let connected = Arc::new(AtomicBool::new(false));
791
792        let task = mz_ore::task::spawn(
793            || "storage-replica-{id}",
794            ReplicaTask {
795                replica_id: id,
796                config: config.clone(),
797                metrics: metrics.clone(),
798                connected: Arc::clone(&connected),
799                command_rx,
800                response_tx,
801            }
802            .run(),
803        );
804
805        Self {
806            config,
807            command_tx,
808            task: task.abort_on_drop(),
809            connected,
810        }
811    }
812
813    /// Sends a command to the replica.
814    fn send(&self, command: StorageCommand) {
815        // Send failures ignored, we'll check for failed replicas separately.
816        let _ = self.command_tx.send(command);
817    }
818
819    /// Determine if this replica has failed. This is true if the replica
820    /// task has terminated.
821    fn failed(&self) -> bool {
822        self.task.is_finished()
823    }
824
825    /// Determine if the replica connection has been established.
826    pub(super) fn is_connected(&self) -> bool {
827        self.connected.load(atomic::Ordering::Relaxed)
828    }
829}
830
831type StorageCtpClient = transport::Client<StorageCommand, StorageResponse>;
832type ReplicaClient = Partitioned<StorageCtpClient, StorageCommand, StorageResponse>;
833
834/// A task handling communication with a replica.
835struct ReplicaTask {
836    /// The ID of the replica.
837    replica_id: ReplicaId,
838    /// Replica configuration.
839    config: ReplicaConfig,
840    /// Replica metrics.
841    metrics: ReplicaMetrics,
842    /// Flag to report successful replica connection.
843    connected: Arc<AtomicBool>,
844    /// A channel upon which commands intended for the replica are delivered.
845    command_rx: mpsc::UnboundedReceiver<StorageCommand>,
846    /// A channel upon which responses from the replica are delivered.
847    response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse)>,
848}
849
850impl ReplicaTask {
851    /// Runs the replica task.
852    async fn run(self) {
853        let replica_id = self.replica_id;
854        info!(%replica_id, "starting replica task");
855
856        let client = self.connect().await;
857        match self.run_message_loop(client).await {
858            Ok(()) => info!(%replica_id, "stopped replica task"),
859            Err(error) => warn!(%replica_id, %error, "replica task failed"),
860        }
861    }
862
863    /// Connects to the replica.
864    ///
865    /// The connection is retried forever (with backoff) and this method returns only after
866    /// a connection was successfully established.
867    async fn connect(&self) -> ReplicaClient {
868        let try_connect = async move |retry: RetryState| {
869            let version = self.config.build_info.semver_version();
870            let client_params = &self.config.grpc_client;
871
872            let connect_start = Instant::now();
873            let connect_timeout = client_params.connect_timeout.unwrap_or(Duration::MAX);
874            let keepalive_timeout = client_params
875                .http2_keep_alive_timeout
876                .unwrap_or(Duration::MAX);
877
878            let connect_result = StorageCtpClient::connect_partitioned(
879                self.config.location.ctl_addrs.clone(),
880                version,
881                connect_timeout,
882                keepalive_timeout,
883                self.metrics.clone(),
884            )
885            .await;
886
887            self.metrics.observe_connect_time(connect_start.elapsed());
888
889            connect_result.inspect_err(|error| {
890                let next_backoff = retry.next_backoff.unwrap();
891                if retry.i >= mz_service::retry::INFO_MIN_RETRIES {
892                    info!(
893                        replica_id = %self.replica_id, ?next_backoff,
894                        "error connecting to replica: {error:#}",
895                    );
896                } else {
897                    debug!(
898                        replica_id = %self.replica_id, ?next_backoff,
899                        "error connecting to replica: {error:#}",
900                    );
901                }
902            })
903        };
904
905        let client = Retry::default()
906            .clamp_backoff(Duration::from_secs(1))
907            .retry_async(try_connect)
908            .await
909            .expect("retries forever");
910
911        self.metrics.observe_connect();
912        self.connected.store(true, atomic::Ordering::Relaxed);
913
914        client
915    }
916
917    /// Runs the message loop.
918    ///
919    /// Returns (with an `Err`) if it encounters an error condition (e.g. the replica disconnects).
920    /// If no error condition is encountered, the task runs until the controller disconnects from
921    /// the command channel, or the task is dropped.
922    async fn run_message_loop(mut self, mut client: ReplicaClient) -> Result<(), anyhow::Error> {
923        loop {
924            select! {
925                // Command from controller to forward to replica.
926                // `tokio::sync::mpsc::UnboundedReceiver::recv` is documented as cancel safe.
927                command = self.command_rx.recv() => {
928                    let Some(mut command) = command else {
929                        tracing::debug!(%self.replica_id, "controller is no longer interested in this replica, shutting down message loop");
930                        break;
931                    };
932
933                    self.specialize_command(&mut command);
934                    client.send(command).await?;
935                },
936                // Response from replica to forward to controller.
937                // `GenericClient::recv` implementations are required to be cancel safe.
938                response = client.recv() => {
939                    let Some(response) = response? else {
940                        bail!("replica unexpectedly gracefully terminated connection");
941                    };
942
943                    if self.response_tx.send((Some(self.replica_id), response)).is_err() {
944                        tracing::debug!(%self.replica_id, "controller (receiver) is no longer interested in this replica, shutting down message loop");
945                        break;
946                    }
947                }
948            }
949        }
950
951        Ok(())
952    }
953
954    /// Specialize a command for the given replica configuration.
955    ///
956    /// Most [`StorageCommand`]s are independent of the target replica, but some contain
957    /// replica-specific fields that must be adjusted before sending.
958    fn specialize_command(&self, command: &mut StorageCommand) {
959        if let StorageCommand::Hello { nonce } = command {
960            *nonce = Uuid::new_v4();
961        }
962    }
963}