mz_storage_controller/
instance.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller for a storage instance.
11
12use crate::CollectionMetadata;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::atomic::AtomicBool;
15use std::sync::{Arc, atomic};
16use std::time::{Duration, Instant};
17
18use anyhow::bail;
19use differential_dataflow::lattice::Lattice;
20use itertools::Itertools;
21use mz_build_info::BuildInfo;
22use mz_cluster_client::ReplicaId;
23use mz_cluster_client::client::ClusterReplicaLocation;
24use mz_ore::cast::CastFrom;
25use mz_ore::now::NowFn;
26use mz_ore::retry::{Retry, RetryState};
27use mz_ore::task::AbortOnDropHandle;
28use mz_repr::GlobalId;
29use mz_service::client::{GenericClient, Partitioned};
30use mz_service::params::GrpcClientParameters;
31use mz_service::transport;
32use mz_storage_client::client::{
33    RunIngestionCommand, RunSinkCommand, Status, StatusUpdate, StorageCommand, StorageResponse,
34};
35use mz_storage_client::metrics::{InstanceMetrics, ReplicaMetrics};
36use mz_storage_types::sinks::StorageSinkDesc;
37use mz_storage_types::sources::{IngestionDescription, SourceConnection};
38use timely::order::TotalOrder;
39use timely::progress::{Antichain, Timestamp};
40use tokio::select;
41use tokio::sync::mpsc;
42use tracing::{debug, info, warn};
43use uuid::Uuid;
44
45use crate::history::CommandHistory;
46
47/// A controller for a storage instance.
48///
49/// Encapsulates communication with replicas in this instance, and their rehydration.
50///
51/// Note that storage objects (sources and sinks) don't currently support replication (database-issues#5051).
52/// An instance can have multiple replicas connected, but only if it has no storage objects
53/// installed. Attempting to install storage objects on multi-replica instances, or attempting to
54/// add more than one replica to instances that have storage objects installed, is illegal and will
55/// lead to panics.
56#[derive(Debug)]
57pub(crate) struct Instance<T> {
58    /// The workload class of this instance.
59    ///
60    /// This is currently only used to annotate metrics.
61    pub workload_class: Option<String>,
62    /// The replicas connected to this storage instance.
63    replicas: BTreeMap<ReplicaId, Replica<T>>,
64    /// The ingestions currently running on this instance.
65    ///
66    /// While this is derivable from `history` on demand, keeping a denormalized
67    /// list of running ingestions is quite a bit more convenient in the
68    /// implementation of `StorageController::active_ingestions`.
69    active_ingestions: BTreeMap<GlobalId, ActiveIngestion>,
70    /// A map from ingestion export ID to the ingestion that is producing it.
71    ingestion_exports: BTreeMap<GlobalId, GlobalId>,
72    /// The exports currently running on this instance.
73    ///
74    /// While this is derivable from `history` on demand, keeping a denormalized
75    /// list of running exports is quite a bit more convenient for the
76    /// controller.
77    active_exports: BTreeMap<GlobalId, ActiveExport>,
78    /// The command history, used to replay past commands when introducing new replicas or
79    /// reconnecting to existing replicas.
80    history: CommandHistory<T>,
81    /// Metrics tracked for this storage instance.
82    metrics: InstanceMetrics,
83    /// A function that returns the current time.
84    now: NowFn,
85    /// A sender for responses from replicas.
86    ///
87    /// Responses are tagged with the [`ReplicaId`] of the replica that sent the
88    /// response. Responses that don't originate from a replica (e.g. a "paused"
89    /// status update, when no replicas are connected) are tagged with `None`.
90    response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
91}
92
93#[derive(Debug)]
94struct ActiveIngestion {
95    /// The set of replicas that this ingestion is currently running on.
96    active_replicas: BTreeSet<ReplicaId>,
97}
98
99#[derive(Debug)]
100struct ActiveExport {
101    /// The set of replicas that this export is currently running on.
102    active_replicas: BTreeSet<ReplicaId>,
103}
104
105impl<T> Instance<T>
106where
107    T: Timestamp + Lattice + TotalOrder + Sync,
108{
109    /// Creates a new [`Instance`].
110    pub fn new(
111        workload_class: Option<String>,
112        metrics: InstanceMetrics,
113        now: NowFn,
114        instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
115    ) -> Self {
116        let history = CommandHistory::new(metrics.for_history());
117
118        let mut instance = Self {
119            workload_class,
120            replicas: Default::default(),
121            active_ingestions: Default::default(),
122            ingestion_exports: Default::default(),
123            active_exports: BTreeMap::new(),
124            history,
125            metrics,
126            now,
127            response_tx: instance_response_tx,
128        };
129
130        instance.send(StorageCommand::Hello {
131            // The nonce is protocol iteration-specific and will be set in
132            // `ReplicaTask::specialize_command`.
133            nonce: Default::default(),
134        });
135
136        instance
137    }
138
139    /// Returns the IDs of all replicas connected to this storage instance.
140    pub fn replica_ids(&self) -> impl Iterator<Item = ReplicaId> + '_ {
141        self.replicas.keys().copied()
142    }
143
144    /// Adds a new replica to this storage instance.
145    pub fn add_replica(&mut self, id: ReplicaId, config: ReplicaConfig) {
146        // Reduce the history to limit the amount of commands sent to the new replica, and to
147        // enable the `objects_installed` assert below.
148        self.history.reduce();
149
150        let metrics = self.metrics.for_replica(id);
151        let replica = Replica::new(id, config, metrics, self.response_tx.clone());
152
153        self.replicas.insert(id, replica);
154
155        self.update_scheduling(false);
156
157        self.replay_commands(id);
158    }
159
160    /// Replays commands to the specified replica.
161    pub fn replay_commands(&mut self, replica_id: ReplicaId) {
162        let commands = self.history.iter().cloned();
163
164        let filtered_commands = commands
165            .filter_map(|command| match command {
166                StorageCommand::RunIngestion(ingestion) => {
167                    if self.is_active_replica(&ingestion.id, &replica_id) {
168                        Some(StorageCommand::RunIngestion(ingestion))
169                    } else {
170                        None
171                    }
172                }
173                StorageCommand::RunSink(sink) => {
174                    if self.is_active_replica(&sink.id, &replica_id) {
175                        Some(StorageCommand::RunSink(sink))
176                    } else {
177                        None
178                    }
179                }
180                StorageCommand::AllowCompaction(id, upper) => {
181                    if self.is_active_replica(&id, &replica_id) {
182                        Some(StorageCommand::AllowCompaction(id, upper))
183                    } else {
184                        None
185                    }
186                }
187                command => Some(command),
188            })
189            .collect::<Vec<_>>();
190
191        let replica = self
192            .replicas
193            .get_mut(&replica_id)
194            .expect("replica must exist");
195
196        // Replay the commands at the new replica.
197        for command in filtered_commands {
198            replica.send(command);
199        }
200    }
201
202    /// Removes the identified replica from this storage instance.
203    pub fn drop_replica(&mut self, id: ReplicaId) {
204        let replica = self.replicas.remove(&id);
205
206        let mut needs_rescheduling = false;
207        for (ingestion_id, ingestion) in self.active_ingestions.iter_mut() {
208            let was_running = ingestion.active_replicas.remove(&id);
209            if was_running {
210                tracing::debug!(
211                    %ingestion_id,
212                    replica_id = %id,
213                    "ingestion was running on dropped replica, updating scheduling decisions"
214                );
215                needs_rescheduling = true;
216            }
217        }
218        for (export_id, export) in self.active_exports.iter_mut() {
219            let was_running = export.active_replicas.remove(&id);
220            if was_running {
221                tracing::debug!(
222                    %export_id,
223                    replica_id = %id,
224                    "export was running on dropped replica, updating scheduling decisions"
225                );
226                needs_rescheduling = true;
227            }
228        }
229
230        tracing::info!(%id, %needs_rescheduling, "dropped replica");
231
232        if needs_rescheduling {
233            self.update_scheduling(true);
234        }
235
236        if replica.is_some() && self.replicas.is_empty() {
237            self.update_paused_statuses();
238        }
239    }
240
241    /// Rehydrates any failed replicas of this storage instance.
242    pub fn rehydrate_failed_replicas(&mut self) {
243        let replicas = self.replicas.iter();
244        let failed_replicas: Vec<_> = replicas
245            .filter_map(|(id, replica)| replica.failed().then_some(*id))
246            .collect();
247
248        for id in failed_replicas {
249            let replica = self.replicas.remove(&id).expect("must exist");
250            self.add_replica(id, replica.config);
251        }
252    }
253
254    /// Returns ingestions running on this instance. This _only_ includes the
255    /// "toplevel" ingestions, not any of their source tables (aka. subsources).
256    pub fn active_ingestions(&self) -> impl Iterator<Item = &GlobalId> {
257        self.active_ingestions.keys()
258    }
259
260    /// Returns ingestion exports running on this instance. This includes the
261    /// ingestion itself, if any, and running source tables (aka. subsources).
262    ///
263    /// This does _not_ filter out exports whose write frontier is the empty
264    /// frontier, which some might consider not active anymore. But for the
265    /// purposes of the instance controller, these are still considered active
266    /// because we don't know about frontiers.
267    pub fn active_ingestion_exports(&self) -> impl Iterator<Item = &GlobalId> {
268        let ingestion_exports = self.ingestion_exports.keys();
269        self.active_ingestions.keys().chain(ingestion_exports)
270    }
271
272    /// Returns the exports running on this instance.
273    pub fn active_exports(&self) -> impl Iterator<Item = &GlobalId> {
274        self.active_exports.keys()
275    }
276
277    /// Sets the status to paused for all sources/sinks in the history.
278    fn update_paused_statuses(&mut self) {
279        let now = mz_ore::now::to_datetime((self.now)());
280        let make_update = |id, object_type| StatusUpdate {
281            id,
282            status: Status::Paused,
283            timestamp: now,
284            error: None,
285            hints: BTreeSet::from([format!(
286                "There is currently no replica running this {object_type}"
287            )]),
288            namespaced_errors: Default::default(),
289            replica_id: None,
290        };
291
292        self.history.reduce();
293
294        let mut status_updates = Vec::new();
295        for command in self.history.iter() {
296            match command {
297                StorageCommand::RunIngestion(ingestion) => {
298                    // NOTE(aljoscha): We filter out the remap collection because we
299                    // don't get any status updates about it from the replica side. So
300                    // we don't want to synthesize a 'paused' status here.
301                    //
302                    // TODO(aljoscha): I think we want to fix this eventually, and make
303                    // sure we get status updates for the remap shard as well. Currently
304                    // its handling in the source status collection is a bit difficult
305                    // because we don't have updates for it in the status history
306                    // collection.
307                    let subsource_ids = ingestion
308                        .description
309                        .collection_ids()
310                        .filter(|id| id != &ingestion.description.remap_collection_id);
311                    for id in subsource_ids {
312                        status_updates.push(make_update(id, "source"));
313                    }
314                }
315                StorageCommand::RunSink(sink) => {
316                    status_updates.push(make_update(sink.id, "sink"));
317                }
318                _ => (),
319            }
320        }
321
322        for update in status_updates {
323            // NOTE: If we lift this "inject paused status" logic to the
324            // controller, we could instead return ReplicaId instead of an
325            // Option<ReplicaId>.
326            let _ = self
327                .response_tx
328                .send((None, StorageResponse::StatusUpdate(update)));
329        }
330    }
331
332    /// Sends a command to this storage instance.
333    pub fn send(&mut self, command: StorageCommand<T>) {
334        // Record the command so that new replicas can be brought up to speed.
335        self.history.push(command.clone());
336
337        match command.clone() {
338            StorageCommand::RunIngestion(ingestion) => {
339                // First absorb into our state, because this might change
340                // scheduling decisions, which need to be respected just below
341                // when sending commands.
342                self.absorb_ingestion(*ingestion.clone());
343
344                for replica in self.active_replicas(&ingestion.id) {
345                    replica.send(StorageCommand::RunIngestion(ingestion.clone()));
346                }
347            }
348            StorageCommand::RunSink(sink) => {
349                // First absorb into our state, because this might change
350                // scheduling decisions, which need to be respected just below
351                // when sending commands.
352                self.absorb_export(*sink.clone());
353
354                for replica in self.active_replicas(&sink.id) {
355                    replica.send(StorageCommand::RunSink(sink.clone()));
356                }
357            }
358            StorageCommand::AllowCompaction(id, frontier) => {
359                // First send out commands and then absorb into our state since
360                // absorbing them might remove entries from active_ingestions.
361                for replica in self.active_replicas(&id) {
362                    replica.send(StorageCommand::AllowCompaction(
363                        id.clone(),
364                        frontier.clone(),
365                    ));
366                }
367
368                self.absorb_compaction(id, frontier);
369            }
370            command => {
371                for replica in self.replicas.values_mut() {
372                    replica.send(command.clone());
373                }
374            }
375        }
376
377        if command.installs_objects() && self.replicas.is_empty() {
378            self.update_paused_statuses();
379        }
380    }
381
382    /// Updates internal state based on incoming ingestion commands.
383    ///
384    /// This does _not_ send commands to replicas, we only record the ingestion
385    /// in state and potentially update scheduling decisions.
386    fn absorb_ingestion(&mut self, ingestion: RunIngestionCommand) {
387        let existing_ingestion_state = self.active_ingestions.get_mut(&ingestion.id);
388
389        // Always update our mapping from export to their ingestion.
390        for id in ingestion.description.source_exports.keys() {
391            self.ingestion_exports.insert(id.clone(), ingestion.id);
392        }
393
394        if let Some(ingestion_state) = existing_ingestion_state {
395            // It's an update for an existing ingestion. We don't need to
396            // change anything about our scheduling decisions, no need to
397            // update active_ingestions.
398
399            tracing::debug!(
400                ingestion_id = %ingestion.id,
401                active_replicas = %ingestion_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
402                "updating ingestion"
403            );
404        } else {
405            // We create a new ingestion state for this ingestion.
406            let ingestion_state = ActiveIngestion {
407                active_replicas: BTreeSet::new(),
408            };
409            self.active_ingestions.insert(ingestion.id, ingestion_state);
410
411            // Maybe update scheduling decisions.
412            self.update_scheduling(false);
413        }
414    }
415
416    /// Updates internal state based on incoming export commands.
417    ///
418    /// This does _not_ send commands to replicas, we only record the export
419    /// in state and potentially update scheduling decisions.
420    fn absorb_export(&mut self, export: RunSinkCommand<T>) {
421        let existing_export_state = self.active_exports.get_mut(&export.id);
422
423        if let Some(export_state) = existing_export_state {
424            // It's an update for an existing export. We don't need to
425            // change anything about our scheduling decisions, no need to
426            // update active_exports.
427
428            tracing::debug!(
429                export_id = %export.id,
430                active_replicas = %export_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
431                "updating export"
432            );
433        } else {
434            // We create a new export state for this export.
435            let export_state = ActiveExport {
436                active_replicas: BTreeSet::new(),
437            };
438            self.active_exports.insert(export.id, export_state);
439
440            // Maybe update scheduling decisions.
441            self.update_scheduling(false);
442        }
443    }
444
445    /// Update scheduling decisions, that is what replicas should be running a
446    /// given object, if needed.
447    ///
448    /// An important property of this scheduling algorithm is that we never
449    /// change the scheduling decision for single-replica objects unless we
450    /// have to, that is unless the replica that they are running on goes away.
451    /// We do this, so that we don't send a mix of "run"/"allow
452    /// compaction"/"run" messages to replicas, which wouldn't deal well with
453    /// this. When we _do_ have to make a scheduling decision we schedule a
454    /// single-replica ingestion on the first replica, according to the sort
455    /// order of `ReplicaId`. We do this latter so that the scheduling decision
456    /// is stable across restarts of `environmentd`/the controller.
457    ///
458    /// For multi-replica objects (e.g. Kafka ingestions), each active object is
459    /// scheduled on all replicas.
460    ///
461    /// If `send_commands` is true, will send commands for newly-scheduled
462    /// single-replica objects.
463    fn update_scheduling(&mut self, send_commands: bool) {
464        #[derive(Debug)]
465        enum ObjectId {
466            Ingestion(GlobalId),
467            Export(GlobalId),
468        }
469        // We first collect scheduling preferences and then schedule below.
470        // Applying the decision needs a mutable borrow but we also need to
471        // borrow for determining `prefers_single_replica`, so we split this
472        // into two loops.
473        let mut scheduling_preferences: Vec<(ObjectId, bool)> = Vec::new();
474
475        for ingestion_id in self.active_ingestions.keys() {
476            let ingestion_description = self
477                .get_ingestion_description(ingestion_id)
478                .expect("missing ingestion description");
479
480            let prefers_single_replica = ingestion_description
481                .desc
482                .connection
483                .prefers_single_replica();
484
485            scheduling_preferences
486                .push((ObjectId::Ingestion(*ingestion_id), prefers_single_replica));
487        }
488
489        for export_id in self.active_exports.keys() {
490            // All sinks prefer single replica
491            scheduling_preferences.push((ObjectId::Export(*export_id), true));
492        }
493
494        // Collect all commands per replica and send them in one go.
495        let mut commands_by_replica: BTreeMap<ReplicaId, Vec<ObjectId>> = BTreeMap::new();
496
497        for (object_id, prefers_single_replica) in scheduling_preferences {
498            let active_replicas = match object_id {
499                ObjectId::Ingestion(ingestion_id) => {
500                    &mut self
501                        .active_ingestions
502                        .get_mut(&ingestion_id)
503                        .expect("missing ingestion state")
504                        .active_replicas
505                }
506                ObjectId::Export(export_id) => {
507                    &mut self
508                        .active_exports
509                        .get_mut(&export_id)
510                        .expect("missing ingestion state")
511                        .active_replicas
512                }
513            };
514
515            if prefers_single_replica {
516                // For single-replica ingestion, schedule only if it's not already running.
517                if active_replicas.is_empty() {
518                    let target_replica = self.replicas.keys().min().copied();
519                    if let Some(first_replica_id) = target_replica {
520                        tracing::info!(
521                            object_id = ?object_id,
522                            replica_id = %first_replica_id,
523                            "scheduling single-replica object");
524                        active_replicas.insert(first_replica_id);
525
526                        commands_by_replica
527                            .entry(first_replica_id)
528                            .or_default()
529                            .push(object_id);
530                    }
531                } else {
532                    tracing::info!(
533                        ?object_id,
534                        active_replicas = %active_replicas.iter().map(|id| id.to_string()).join(", "),
535                        "single-replica object already running, not scheduling again",
536                    );
537                }
538            } else {
539                let current_replica_ids: BTreeSet<_> = self.replicas.keys().copied().collect();
540                let unscheduled_replicas: Vec<_> = current_replica_ids
541                    .difference(active_replicas)
542                    .copied()
543                    .collect();
544                for replica_id in unscheduled_replicas {
545                    tracing::info!(
546                        ?object_id,
547                        %replica_id,
548                        "scheduling multi-replica object"
549                    );
550                    active_replicas.insert(replica_id);
551                }
552            }
553        }
554
555        if send_commands {
556            for (replica_id, object_ids) in commands_by_replica {
557                let mut ingestion_commands = vec![];
558                let mut export_commands = vec![];
559                for object_id in object_ids {
560                    match object_id {
561                        ObjectId::Ingestion(id) => {
562                            ingestion_commands.push(RunIngestionCommand {
563                                id,
564                                description: self
565                                    .get_ingestion_description(&id)
566                                    .expect("missing ingestion description")
567                                    .clone(),
568                            });
569                        }
570                        ObjectId::Export(id) => {
571                            export_commands.push(RunSinkCommand {
572                                id,
573                                description: self
574                                    .get_export_description(&id)
575                                    .expect("missing export description")
576                                    .clone(),
577                            });
578                        }
579                    }
580                }
581                for ingestion in ingestion_commands {
582                    let replica = self.replicas.get_mut(&replica_id).expect("missing replica");
583                    let ingestion = Box::new(ingestion);
584                    replica.send(StorageCommand::RunIngestion(ingestion));
585                }
586                for export in export_commands {
587                    let replica = self.replicas.get_mut(&replica_id).expect("missing replica");
588                    let export = Box::new(export);
589                    replica.send(StorageCommand::RunSink(export));
590                }
591            }
592        }
593    }
594
595    /// Returns the ingestion description for the given ingestion ID, if it
596    /// exists.
597    ///
598    /// This function searches through the command history to find the most
599    /// recent RunIngestionCommand for the specified ingestion ID and returns
600    /// its description.  Returns None if no ingestion with the given ID is
601    /// found.
602    pub fn get_ingestion_description(
603        &self,
604        id: &GlobalId,
605    ) -> Option<IngestionDescription<CollectionMetadata>> {
606        if !self.active_ingestions.contains_key(id) {
607            return None;
608        }
609
610        self.history.iter().rev().find_map(|command| {
611            if let StorageCommand::RunIngestion(ingestion) = command {
612                if &ingestion.id == id {
613                    Some(ingestion.description.clone())
614                } else {
615                    None
616                }
617            } else {
618                None
619            }
620        })
621    }
622
623    /// Returns the export description for the given export ID, if it
624    /// exists.
625    ///
626    /// This function searches through the command history to find the most
627    /// recent RunSinkCommand for the specified export ID and returns
628    /// its description.  Returns None if no ingestion with the given ID is
629    /// found.
630    pub fn get_export_description(
631        &self,
632        id: &GlobalId,
633    ) -> Option<StorageSinkDesc<CollectionMetadata, T>> {
634        if !self.active_exports.contains_key(id) {
635            return None;
636        }
637
638        self.history.iter().rev().find_map(|command| {
639            if let StorageCommand::RunSink(sink) = command {
640                if &sink.id == id {
641                    Some(sink.description.clone())
642                } else {
643                    None
644                }
645            } else {
646                None
647            }
648        })
649    }
650
651    /// Updates internal state based on incoming compaction commands.
652    fn absorb_compaction(&mut self, id: GlobalId, frontier: Antichain<T>) {
653        tracing::debug!(?self.active_ingestions, ?id, ?frontier, "allow_compaction");
654
655        if frontier.is_empty() {
656            self.active_ingestions.remove(&id);
657            self.ingestion_exports.remove(&id);
658            self.active_exports.remove(&id);
659        }
660    }
661
662    /// Returns the replicas that are actively running the given object (ingestion or export).
663    fn active_replicas(&mut self, id: &GlobalId) -> Box<dyn Iterator<Item = &mut Replica<T>> + '_> {
664        if let Some(ingestion_id) = self.ingestion_exports.get(id) {
665            match self.active_ingestions.get(ingestion_id) {
666                Some(ingestion) => Box::new(self.replicas.iter_mut().filter_map(
667                    move |(replica_id, replica)| {
668                        if ingestion.active_replicas.contains(replica_id) {
669                            Some(replica)
670                        } else {
671                            None
672                        }
673                    },
674                )),
675                None => {
676                    // The ingestion has already been compacted away (aka. stopped).
677                    Box::new(std::iter::empty())
678                }
679            }
680        } else if let Some(export) = self.active_exports.get(id) {
681            Box::new(
682                self.replicas
683                    .iter_mut()
684                    .filter_map(move |(replica_id, replica)| {
685                        if export.active_replicas.contains(replica_id) {
686                            Some(replica)
687                        } else {
688                            None
689                        }
690                    }),
691            )
692        } else {
693            Box::new(self.replicas.values_mut())
694        }
695    }
696
697    /// Returns whether the given replica is actively running the given object (ingestion or export).
698    fn is_active_replica(&self, id: &GlobalId, replica_id: &ReplicaId) -> bool {
699        if let Some(ingestion_id) = self.ingestion_exports.get(id) {
700            match self.active_ingestions.get(ingestion_id) {
701                Some(ingestion) => ingestion.active_replicas.contains(replica_id),
702                None => {
703                    // The ingestion has already been compacted away (aka. stopped).
704                    false
705                }
706            }
707        } else if let Some(export) = self.active_exports.get(id) {
708            export.active_replicas.contains(replica_id)
709        } else {
710            // For non-ingestion objects, all replicas are active
711            true
712        }
713    }
714
715    /// Refresh the controller state metrics for this instance.
716    ///
717    /// We could also do state metric updates directly in response to state changes, but that would
718    /// mean littering the code with metric update calls. Encapsulating state metric maintenance in
719    /// a single method is less noisy.
720    ///
721    /// This method is invoked by `Controller::maintain`, which we expect to be called once per
722    /// second during normal operation.
723    pub(super) fn refresh_state_metrics(&self) {
724        let connected_replica_count = self.replicas.values().filter(|r| r.is_connected()).count();
725
726        self.metrics
727            .connected_replica_count
728            .set(u64::cast_from(connected_replica_count));
729    }
730
731    /// Returns the set of replica IDs that are actively running the given
732    /// object (ingestion, ingestion export (aka. subsource), or export).
733    pub fn get_active_replicas_for_object(&self, id: &GlobalId) -> BTreeSet<ReplicaId> {
734        if let Some(ingestion_id) = self.ingestion_exports.get(id) {
735            // Right now, only ingestions can have per-replica scheduling decisions.
736            match self.active_ingestions.get(ingestion_id) {
737                Some(ingestion) => ingestion.active_replicas.clone(),
738                None => {
739                    // The ingestion has already been compacted away (aka. stopped).
740                    BTreeSet::new()
741                }
742            }
743        } else {
744            // For non-ingestion objects, all replicas are active
745            self.replicas.keys().copied().collect()
746        }
747    }
748}
749
750/// Replica-specific configuration.
751#[derive(Clone, Debug)]
752pub(super) struct ReplicaConfig {
753    pub build_info: &'static BuildInfo,
754    pub location: ClusterReplicaLocation,
755    pub grpc_client: GrpcClientParameters,
756}
757
758/// State maintained about individual replicas.
759#[derive(Debug)]
760pub struct Replica<T> {
761    /// Replica configuration.
762    config: ReplicaConfig,
763    /// A sender for commands for the replica.
764    ///
765    /// If sending to this channel fails, the replica has failed and requires
766    /// rehydration.
767    command_tx: mpsc::UnboundedSender<StorageCommand<T>>,
768    /// A handle to the task that aborts it when the replica is dropped.
769    task: AbortOnDropHandle<()>,
770    /// Flag reporting whether the replica connection has been established.
771    connected: Arc<AtomicBool>,
772}
773
774impl<T> Replica<T>
775where
776    T: Timestamp + Lattice + Sync,
777{
778    /// Creates a new [`Replica`].
779    fn new(
780        id: ReplicaId,
781        config: ReplicaConfig,
782        metrics: ReplicaMetrics,
783        response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
784    ) -> Self {
785        let (command_tx, command_rx) = mpsc::unbounded_channel();
786        let connected = Arc::new(AtomicBool::new(false));
787
788        let task = mz_ore::task::spawn(
789            || "storage-replica-{id}",
790            ReplicaTask {
791                replica_id: id,
792                config: config.clone(),
793                metrics: metrics.clone(),
794                connected: Arc::clone(&connected),
795                command_rx,
796                response_tx,
797            }
798            .run(),
799        );
800
801        Self {
802            config,
803            command_tx,
804            task: task.abort_on_drop(),
805            connected,
806        }
807    }
808
809    /// Sends a command to the replica.
810    fn send(&self, command: StorageCommand<T>) {
811        // Send failures ignored, we'll check for failed replicas separately.
812        let _ = self.command_tx.send(command);
813    }
814
815    /// Determine if this replica has failed. This is true if the replica
816    /// task has terminated.
817    fn failed(&self) -> bool {
818        self.task.is_finished()
819    }
820
821    /// Determine if the replica connection has been established.
822    pub(super) fn is_connected(&self) -> bool {
823        self.connected.load(atomic::Ordering::Relaxed)
824    }
825}
826
827type StorageCtpClient<T> = transport::Client<StorageCommand<T>, StorageResponse<T>>;
828type ReplicaClient<T> = Partitioned<StorageCtpClient<T>, StorageCommand<T>, StorageResponse<T>>;
829
830/// A task handling communication with a replica.
831struct ReplicaTask<T> {
832    /// The ID of the replica.
833    replica_id: ReplicaId,
834    /// Replica configuration.
835    config: ReplicaConfig,
836    /// Replica metrics.
837    metrics: ReplicaMetrics,
838    /// Flag to report successful replica connection.
839    connected: Arc<AtomicBool>,
840    /// A channel upon which commands intended for the replica are delivered.
841    command_rx: mpsc::UnboundedReceiver<StorageCommand<T>>,
842    /// A channel upon which responses from the replica are delivered.
843    response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
844}
845
846impl<T> ReplicaTask<T>
847where
848    T: Timestamp + Lattice + Sync,
849{
850    /// Runs the replica task.
851    async fn run(self) {
852        let replica_id = self.replica_id;
853        info!(%replica_id, "starting replica task");
854
855        let client = self.connect().await;
856        match self.run_message_loop(client).await {
857            Ok(()) => info!(%replica_id, "stopped replica task"),
858            Err(error) => warn!(%replica_id, %error, "replica task failed"),
859        }
860    }
861
862    /// Connects to the replica.
863    ///
864    /// The connection is retried forever (with backoff) and this method returns only after
865    /// a connection was successfully established.
866    async fn connect(&self) -> ReplicaClient<T> {
867        let try_connect = async move |retry: RetryState| {
868            let version = self.config.build_info.semver_version();
869            let client_params = &self.config.grpc_client;
870
871            let connect_start = Instant::now();
872            let connect_timeout = client_params.connect_timeout.unwrap_or(Duration::MAX);
873            let keepalive_timeout = client_params
874                .http2_keep_alive_timeout
875                .unwrap_or(Duration::MAX);
876
877            let connect_result = StorageCtpClient::<T>::connect_partitioned(
878                self.config.location.ctl_addrs.clone(),
879                version,
880                connect_timeout,
881                keepalive_timeout,
882                self.metrics.clone(),
883            )
884            .await;
885
886            self.metrics.observe_connect_time(connect_start.elapsed());
887
888            connect_result.inspect_err(|error| {
889                let next_backoff = retry.next_backoff.unwrap();
890                if retry.i >= mz_service::retry::INFO_MIN_RETRIES {
891                    info!(
892                        replica_id = %self.replica_id, ?next_backoff,
893                        "error connecting to replica: {error:#}",
894                    );
895                } else {
896                    debug!(
897                        replica_id = %self.replica_id, ?next_backoff,
898                        "error connecting to replica: {error:#}",
899                    );
900                }
901            })
902        };
903
904        let client = Retry::default()
905            .clamp_backoff(Duration::from_secs(1))
906            .retry_async(try_connect)
907            .await
908            .expect("retries forever");
909
910        self.metrics.observe_connect();
911        self.connected.store(true, atomic::Ordering::Relaxed);
912
913        client
914    }
915
916    /// Runs the message loop.
917    ///
918    /// Returns (with an `Err`) if it encounters an error condition (e.g. the replica disconnects).
919    /// If no error condition is encountered, the task runs until the controller disconnects from
920    /// the command channel, or the task is dropped.
921    async fn run_message_loop(mut self, mut client: ReplicaClient<T>) -> Result<(), anyhow::Error> {
922        loop {
923            select! {
924                // Command from controller to forward to replica.
925                // `tokio::sync::mpsc::UnboundedReceiver::recv` is documented as cancel safe.
926                command = self.command_rx.recv() => {
927                    let Some(mut command) = command else {
928                        tracing::debug!(%self.replica_id, "controller is no longer interested in this replica, shutting down message loop");
929                        break;
930                    };
931
932                    self.specialize_command(&mut command);
933                    client.send(command).await?;
934                },
935                // Response from replica to forward to controller.
936                // `GenericClient::recv` implementations are required to be cancel safe.
937                response = client.recv() => {
938                    let Some(response) = response? else {
939                        bail!("replica unexpectedly gracefully terminated connection");
940                    };
941
942                    if self.response_tx.send((Some(self.replica_id), response)).is_err() {
943                        tracing::debug!(%self.replica_id, "controller (receiver) is no longer interested in this replica, shutting down message loop");
944                        break;
945                    }
946                }
947            }
948        }
949
950        Ok(())
951    }
952
953    /// Specialize a command for the given replica configuration.
954    ///
955    /// Most [`StorageCommand`]s are independent of the target replica, but some contain
956    /// replica-specific fields that must be adjusted before sending.
957    fn specialize_command(&self, command: &mut StorageCommand<T>) {
958        if let StorageCommand::Hello { nonce } = command {
959            *nonce = Uuid::new_v4();
960        }
961    }
962}