mz_storage_controller/
instance.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller for a storage instance.
11
12use crate::CollectionMetadata;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::atomic::AtomicBool;
15use std::sync::{Arc, atomic};
16use std::time::{Duration, Instant};
17
18use anyhow::bail;
19use async_trait::async_trait;
20use differential_dataflow::lattice::Lattice;
21use itertools::Itertools;
22use mz_build_info::BuildInfo;
23use mz_cluster_client::ReplicaId;
24use mz_cluster_client::client::ClusterReplicaLocation;
25use mz_ore::cast::CastFrom;
26use mz_ore::now::NowFn;
27use mz_ore::retry::{Retry, RetryState};
28use mz_ore::task::AbortOnDropHandle;
29use mz_repr::GlobalId;
30use mz_service::client::{GenericClient, Partitioned};
31use mz_service::params::GrpcClientParameters;
32use mz_service::transport;
33use mz_storage_client::client::{
34    RunIngestionCommand, RunSinkCommand, Status, StatusUpdate, StorageClient, StorageCommand,
35    StorageGrpcClient, StorageResponse,
36};
37use mz_storage_client::metrics::{InstanceMetrics, ReplicaMetrics};
38use mz_storage_types::sinks::StorageSinkDesc;
39use mz_storage_types::sources::{IngestionDescription, SourceConnection};
40use timely::order::TotalOrder;
41use timely::progress::{Antichain, Timestamp};
42use tokio::select;
43use tokio::sync::mpsc;
44use tracing::{debug, info, warn};
45use uuid::Uuid;
46
47use crate::history::CommandHistory;
48
49/// A controller for a storage instance.
50///
51/// Encapsulates communication with replicas in this instance, and their rehydration.
52///
53/// Note that storage objects (sources and sinks) don't currently support replication (database-issues#5051).
54/// An instance can have muliple replicas connected, but only if it has no storage objects
55/// installed. Attempting to install storage objects on multi-replica instances, or attempting to
56/// add more than one replica to instances that have storage objects installed, is illegal and will
57/// lead to panics.
58#[derive(Debug)]
59pub(crate) struct Instance<T> {
60    /// The workload class of this instance.
61    ///
62    /// This is currently only used to annotate metrics.
63    pub workload_class: Option<String>,
64    /// The replicas connected to this storage instance.
65    replicas: BTreeMap<ReplicaId, Replica<T>>,
66    /// The ingestions currently running on this instance.
67    ///
68    /// While this is derivable from `history` on demand, keeping a denormalized
69    /// list of running ingestions is quite a bit more convenient in the
70    /// implementation of `StorageController::active_ingestions`.
71    active_ingestions: BTreeMap<GlobalId, ActiveIngestion>,
72    /// A map from ingestion export ID to the ingestion that is producing it.
73    ingestion_exports: BTreeMap<GlobalId, GlobalId>,
74    /// The exports currently running on this instance.
75    ///
76    /// While this is derivable from `history` on demand, keeping a denormalized
77    /// list of running exports is quite a bit more convenient for the
78    /// controller.
79    active_exports: BTreeMap<GlobalId, ActiveExport>,
80    /// The command history, used to replay past commands when introducing new replicas or
81    /// reconnecting to existing replicas.
82    history: CommandHistory<T>,
83    /// Metrics tracked for this storage instance.
84    metrics: InstanceMetrics,
85    /// A function that returns the current time.
86    now: NowFn,
87    /// A sender for responses from replicas.
88    ///
89    /// Responses are tagged with the [`ReplicaId`] of the replica that sent the
90    /// response. Responses that don't originate from a replica (e.g. a "paused"
91    /// status update, when no replicas are connected) are tagged with `None`.
92    response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
93}
94
95#[derive(Debug)]
96struct ActiveIngestion {
97    /// The set of replicas that this ingestion is currently running on.
98    active_replicas: BTreeSet<ReplicaId>,
99}
100
101#[derive(Debug)]
102struct ActiveExport {
103    /// The set of replicas that this export is currently running on.
104    active_replicas: BTreeSet<ReplicaId>,
105}
106
107impl<T> Instance<T>
108where
109    T: Timestamp + Lattice + TotalOrder + Sync,
110    StorageGrpcClient: StorageClient<T>,
111{
112    /// Creates a new [`Instance`].
113    pub fn new(
114        workload_class: Option<String>,
115        metrics: InstanceMetrics,
116        now: NowFn,
117        instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
118    ) -> Self {
119        let history = CommandHistory::new(metrics.for_history());
120
121        let mut instance = Self {
122            workload_class,
123            replicas: Default::default(),
124            active_ingestions: Default::default(),
125            ingestion_exports: Default::default(),
126            active_exports: BTreeMap::new(),
127            history,
128            metrics,
129            now,
130            response_tx: instance_response_tx,
131        };
132
133        instance.send(StorageCommand::Hello {
134            // The nonce is protocol iteration-specific and will be set in
135            // `ReplicaTask::specialize_command`.
136            nonce: Default::default(),
137        });
138
139        instance
140    }
141
142    /// Returns the IDs of all replicas connected to this storage instance.
143    pub fn replica_ids(&self) -> impl Iterator<Item = ReplicaId> + '_ {
144        self.replicas.keys().copied()
145    }
146
147    /// Adds a new replica to this storage instance.
148    pub fn add_replica(&mut self, id: ReplicaId, config: ReplicaConfig) {
149        // Reduce the history to limit the amount of commands sent to the new replica, and to
150        // enable the `objects_installed` assert below.
151        self.history.reduce();
152
153        let metrics = self.metrics.for_replica(id);
154        let replica = Replica::new(id, config, metrics, self.response_tx.clone());
155
156        self.replicas.insert(id, replica);
157
158        self.update_scheduling(false);
159
160        self.replay_commands(id);
161    }
162
163    /// Replays commands to the specified replica.
164    pub fn replay_commands(&mut self, replica_id: ReplicaId) {
165        let commands = self.history.iter().cloned();
166
167        let filtered_commands = commands
168            .filter_map(|command| match command {
169                StorageCommand::RunIngestion(ingestion) => {
170                    if self.is_active_replica(&ingestion.id, &replica_id) {
171                        Some(StorageCommand::RunIngestion(ingestion))
172                    } else {
173                        None
174                    }
175                }
176                StorageCommand::RunSink(sink) => {
177                    if self.is_active_replica(&sink.id, &replica_id) {
178                        Some(StorageCommand::RunSink(sink))
179                    } else {
180                        None
181                    }
182                }
183                StorageCommand::AllowCompaction(id, upper) => {
184                    if self.is_active_replica(&id, &replica_id) {
185                        Some(StorageCommand::AllowCompaction(id, upper))
186                    } else {
187                        None
188                    }
189                }
190                command => Some(command),
191            })
192            .collect::<Vec<_>>();
193
194        let replica = self
195            .replicas
196            .get_mut(&replica_id)
197            .expect("replica must exist");
198
199        // Replay the commands at the new replica.
200        for command in filtered_commands {
201            replica.send(command);
202        }
203    }
204
205    /// Removes the identified replica from this storage instance.
206    pub fn drop_replica(&mut self, id: ReplicaId) {
207        let replica = self.replicas.remove(&id);
208
209        let mut needs_rescheduling = false;
210        for (ingestion_id, ingestion) in self.active_ingestions.iter_mut() {
211            let was_running = ingestion.active_replicas.remove(&id);
212            if was_running {
213                tracing::debug!(
214                    %ingestion_id,
215                    replica_id = %id,
216                    "ingestion was running on dropped replica, updating scheduling decisions"
217                );
218                needs_rescheduling = true;
219            }
220        }
221        for (export_id, export) in self.active_exports.iter_mut() {
222            let was_running = export.active_replicas.remove(&id);
223            if was_running {
224                tracing::debug!(
225                    %export_id,
226                    replica_id = %id,
227                    "export was running on dropped replica, updating scheduling decisions"
228                );
229                needs_rescheduling = true;
230            }
231        }
232
233        tracing::info!(%id, %needs_rescheduling, "dropped replica");
234
235        if needs_rescheduling {
236            self.update_scheduling(true);
237        }
238
239        if replica.is_some() && self.replicas.is_empty() {
240            self.update_paused_statuses();
241        }
242    }
243
244    /// Rehydrates any failed replicas of this storage instance.
245    pub fn rehydrate_failed_replicas(&mut self) {
246        let replicas = self.replicas.iter();
247        let failed_replicas: Vec<_> = replicas
248            .filter_map(|(id, replica)| replica.failed().then_some(*id))
249            .collect();
250
251        for id in failed_replicas {
252            let replica = self.replicas.remove(&id).expect("must exist");
253            self.add_replica(id, replica.config);
254        }
255    }
256
257    /// Returns the ingestions running on this instance.
258    pub fn active_ingestions(&self) -> impl Iterator<Item = &GlobalId> {
259        self.active_ingestions.keys()
260    }
261
262    /// Returns the exports running on this instance.
263    pub fn active_exports(&self) -> impl Iterator<Item = &GlobalId> {
264        self.active_exports.keys()
265    }
266
267    /// Sets the status to paused for all sources/sinks in the history.
268    fn update_paused_statuses(&mut self) {
269        let now = mz_ore::now::to_datetime((self.now)());
270        let make_update = |id, object_type| StatusUpdate {
271            id,
272            status: Status::Paused,
273            timestamp: now,
274            error: None,
275            hints: BTreeSet::from([format!(
276                "There is currently no replica running this {object_type}"
277            )]),
278            namespaced_errors: Default::default(),
279            replica_id: None,
280        };
281
282        self.history.reduce();
283
284        let mut status_updates = Vec::new();
285        for command in self.history.iter() {
286            match command {
287                StorageCommand::RunIngestion(ingestion) => {
288                    // NOTE(aljoscha): We filter out the remap collection because we
289                    // don't get any status updates about it from the replica side. So
290                    // we don't want to synthesize a 'paused' status here.
291                    //
292                    // TODO(aljoscha): I think we want to fix this eventually, and make
293                    // sure we get status updates for the remap shard as well. Currently
294                    // its handling in the source status collection is a bit difficult
295                    // because we don't have updates for it in the status history
296                    // collection.
297                    let subsource_ids = ingestion
298                        .description
299                        .collection_ids()
300                        .filter(|id| id != &ingestion.description.remap_collection_id);
301                    for id in subsource_ids {
302                        status_updates.push(make_update(id, "source"));
303                    }
304                }
305                StorageCommand::RunSink(sink) => {
306                    status_updates.push(make_update(sink.id, "sink"));
307                }
308                _ => (),
309            }
310        }
311
312        for update in status_updates {
313            // NOTE: If we lift this "inject paused status" logic to the
314            // controller, we could instead return ReplicaId instead of an
315            // Option<ReplicaId>.
316            let _ = self
317                .response_tx
318                .send((None, StorageResponse::StatusUpdate(update)));
319        }
320    }
321
322    /// Sends a command to this storage instance.
323    pub fn send(&mut self, command: StorageCommand<T>) {
324        // Record the command so that new replicas can be brought up to speed.
325        self.history.push(command.clone());
326
327        match command.clone() {
328            StorageCommand::RunIngestion(ingestion) => {
329                // First absorb into our state, because this might change
330                // scheduling decisions, which need to be respected just below
331                // when sending commands.
332                self.absorb_ingestion(*ingestion.clone());
333
334                for replica in self.active_replicas(&ingestion.id) {
335                    replica.send(StorageCommand::RunIngestion(ingestion.clone()));
336                }
337            }
338            StorageCommand::RunSink(sink) => {
339                // First absorb into our state, because this might change
340                // scheduling decisions, which need to be respected just below
341                // when sending commands.
342                self.absorb_export(*sink.clone());
343
344                for replica in self.active_replicas(&sink.id) {
345                    replica.send(StorageCommand::RunSink(sink.clone()));
346                }
347            }
348            StorageCommand::AllowCompaction(id, frontier) => {
349                // First send out commands and then absorb into our state since
350                // absorbing them might remove entries from active_ingestions.
351                for replica in self.active_replicas(&id) {
352                    replica.send(StorageCommand::AllowCompaction(
353                        id.clone(),
354                        frontier.clone(),
355                    ));
356                }
357
358                self.absorb_compaction(id, frontier);
359            }
360            command => {
361                for replica in self.replicas.values_mut() {
362                    replica.send(command.clone());
363                }
364            }
365        }
366
367        if command.installs_objects() && self.replicas.is_empty() {
368            self.update_paused_statuses();
369        }
370    }
371
372    /// Updates internal state based on incoming ingestion commands.
373    ///
374    /// This does _not_ send commands to replicas, we only record the ingestion
375    /// in state and potentially update scheduling decisions.
376    fn absorb_ingestion(&mut self, ingestion: RunIngestionCommand) {
377        let existing_ingestion_state = self.active_ingestions.get_mut(&ingestion.id);
378
379        // Always update our mapping from export to their ingestion.
380        for id in ingestion.description.source_exports.keys() {
381            self.ingestion_exports.insert(id.clone(), ingestion.id);
382        }
383
384        if let Some(ingestion_state) = existing_ingestion_state {
385            // It's an update for an existing ingestion. We don't need to
386            // change anything about our scheduling decisions, no need to
387            // update active_ingestions.
388
389            tracing::debug!(
390                ingestion_id = %ingestion.id,
391                active_replicas = %ingestion_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
392                "updating ingestion"
393            );
394        } else {
395            // We create a new ingestion state for this ingestion.
396            let ingestion_state = ActiveIngestion {
397                active_replicas: BTreeSet::new(),
398            };
399            self.active_ingestions.insert(ingestion.id, ingestion_state);
400
401            // Maybe update scheduling decisions.
402            self.update_scheduling(false);
403        }
404    }
405
406    /// Updates internal state based on incoming export commands.
407    ///
408    /// This does _not_ send commands to replicas, we only record the export
409    /// in state and potentially update scheduling decisions.
410    fn absorb_export(&mut self, export: RunSinkCommand<T>) {
411        let existing_export_state = self.active_exports.get_mut(&export.id);
412
413        if let Some(export_state) = existing_export_state {
414            // It's an update for an existing export. We don't need to
415            // change anything about our scheduling decisions, no need to
416            // update active_exports.
417
418            tracing::debug!(
419                export_id = %export.id,
420                active_replicas = %export_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
421                "updating export"
422            );
423        } else {
424            // We create a new export state for this export.
425            let export_state = ActiveExport {
426                active_replicas: BTreeSet::new(),
427            };
428            self.active_exports.insert(export.id, export_state);
429
430            // Maybe update scheduling decisions.
431            self.update_scheduling(false);
432        }
433    }
434
435    /// Update scheduling decisions, that is what replicas should be running a
436    /// given object, if needed.
437    ///
438    /// An important property of this scheduling algorithm is that we never
439    /// change the scheduling decision for single-replica objects unless we
440    /// have to, that is unless the replica that they are running on goes away.
441    /// We do this, so that we don't send a mix of "run"/"allow
442    /// compaction"/"run" messages to replicas, which wouldn't deal well with
443    /// this. When we _do_ have to make a scheduling decision we schedule a
444    /// single-replica ingestion on the first replica, according to the sort
445    /// order of `ReplicaId`. We do this latter so that the scheduling decision
446    /// is stable across restarts of `environmentd`/the controller.
447    ///
448    /// For multi-replica objects (e.g. Kafka ingestions), each active object is
449    /// scheduled on all replicas.
450    ///
451    /// If `send_commands` is true, will send commands for newly-scheduled
452    /// single-replica objects.
453    fn update_scheduling(&mut self, send_commands: bool) {
454        #[derive(Debug)]
455        enum ObjectId {
456            Ingestion(GlobalId),
457            Export(GlobalId),
458        }
459        // We first collect scheduling preferences and then schedule below.
460        // Applying the decision needs a mutable borrow but we also need to
461        // borrow for determining `prefers_single_replica`, so we split this
462        // into two loops.
463        let mut scheduling_preferences: Vec<(ObjectId, bool)> = Vec::new();
464
465        for ingestion_id in self.active_ingestions.keys() {
466            let ingestion_description = self
467                .get_ingestion_description(ingestion_id)
468                .expect("missing ingestion description");
469
470            let prefers_single_replica = ingestion_description
471                .desc
472                .connection
473                .prefers_single_replica();
474
475            scheduling_preferences
476                .push((ObjectId::Ingestion(*ingestion_id), prefers_single_replica));
477        }
478
479        for export_id in self.active_exports.keys() {
480            // All sinks prefer single replica
481            scheduling_preferences.push((ObjectId::Export(*export_id), true));
482        }
483
484        // Collect all commands per replica and send them in one go.
485        let mut commands_by_replica: BTreeMap<ReplicaId, Vec<ObjectId>> = BTreeMap::new();
486
487        for (object_id, prefers_single_replica) in scheduling_preferences {
488            let active_replicas = match object_id {
489                ObjectId::Ingestion(ingestion_id) => {
490                    &mut self
491                        .active_ingestions
492                        .get_mut(&ingestion_id)
493                        .expect("missing ingestion state")
494                        .active_replicas
495                }
496                ObjectId::Export(export_id) => {
497                    &mut self
498                        .active_exports
499                        .get_mut(&export_id)
500                        .expect("missing ingestion state")
501                        .active_replicas
502                }
503            };
504
505            if prefers_single_replica {
506                // For single-replica ingestion, schedule only if it's not already running.
507                if active_replicas.is_empty() {
508                    let target_replica = self.replicas.keys().min().copied();
509                    if let Some(first_replica_id) = target_replica {
510                        tracing::info!(
511                            object_id = ?object_id,
512                            replica_id = %first_replica_id,
513                            "scheduling single-replica object");
514                        active_replicas.insert(first_replica_id);
515
516                        commands_by_replica
517                            .entry(first_replica_id)
518                            .or_default()
519                            .push(object_id);
520                    }
521                } else {
522                    tracing::info!(
523                        ?object_id,
524                        active_replicas = %active_replicas.iter().map(|id| id.to_string()).join(", "),
525                        "single-replica object already running, not scheduling again",
526                    );
527                }
528            } else {
529                let current_replica_ids: BTreeSet<_> = self.replicas.keys().copied().collect();
530                let unscheduled_replicas: Vec<_> = current_replica_ids
531                    .difference(active_replicas)
532                    .copied()
533                    .collect();
534                for replica_id in unscheduled_replicas {
535                    tracing::info!(
536                        ?object_id,
537                        %replica_id,
538                        "scheduling multi-replica object"
539                    );
540                    active_replicas.insert(replica_id);
541                }
542            }
543        }
544
545        if send_commands {
546            for (replica_id, object_ids) in commands_by_replica {
547                let mut ingestion_commands = vec![];
548                let mut export_commands = vec![];
549                for object_id in object_ids {
550                    match object_id {
551                        ObjectId::Ingestion(id) => {
552                            ingestion_commands.push(RunIngestionCommand {
553                                id,
554                                description: self
555                                    .get_ingestion_description(&id)
556                                    .expect("missing ingestion description")
557                                    .clone(),
558                            });
559                        }
560                        ObjectId::Export(id) => {
561                            export_commands.push(RunSinkCommand {
562                                id,
563                                description: self
564                                    .get_export_description(&id)
565                                    .expect("missing export description")
566                                    .clone(),
567                            });
568                        }
569                    }
570                }
571                for ingestion in ingestion_commands {
572                    let replica = self.replicas.get_mut(&replica_id).expect("missing replica");
573                    let ingestion = Box::new(ingestion);
574                    replica.send(StorageCommand::RunIngestion(ingestion));
575                }
576                for export in export_commands {
577                    let replica = self.replicas.get_mut(&replica_id).expect("missing replica");
578                    let export = Box::new(export);
579                    replica.send(StorageCommand::RunSink(export));
580                }
581            }
582        }
583    }
584
585    /// Returns the ingestion description for the given ingestion ID, if it
586    /// exists.
587    ///
588    /// This function searches through the command history to find the most
589    /// recent RunIngestionCommand for the specified ingestion ID and returns
590    /// its description.  Returns None if no ingestion with the given ID is
591    /// found.
592    pub fn get_ingestion_description(
593        &self,
594        id: &GlobalId,
595    ) -> Option<IngestionDescription<CollectionMetadata>> {
596        if !self.active_ingestions.contains_key(id) {
597            return None;
598        }
599
600        self.history.iter().rev().find_map(|command| {
601            if let StorageCommand::RunIngestion(ingestion) = command {
602                if &ingestion.id == id {
603                    Some(ingestion.description.clone())
604                } else {
605                    None
606                }
607            } else {
608                None
609            }
610        })
611    }
612
613    /// Returns the export description for the given export ID, if it
614    /// exists.
615    ///
616    /// This function searches through the command history to find the most
617    /// recent RunSinkCommand for the specified export ID and returns
618    /// its description.  Returns None if no ingestion with the given ID is
619    /// found.
620    pub fn get_export_description(
621        &self,
622        id: &GlobalId,
623    ) -> Option<StorageSinkDesc<CollectionMetadata, T>> {
624        if !self.active_exports.contains_key(id) {
625            return None;
626        }
627
628        self.history.iter().rev().find_map(|command| {
629            if let StorageCommand::RunSink(sink) = command {
630                if &sink.id == id {
631                    Some(sink.description.clone())
632                } else {
633                    None
634                }
635            } else {
636                None
637            }
638        })
639    }
640
641    /// Updates internal state based on incoming compaction commands.
642    fn absorb_compaction(&mut self, id: GlobalId, frontier: Antichain<T>) {
643        tracing::debug!(?self.active_ingestions, ?id, ?frontier, "allow_compaction");
644
645        if frontier.is_empty() {
646            self.active_ingestions.remove(&id);
647            self.ingestion_exports.remove(&id);
648            self.active_exports.remove(&id);
649        }
650    }
651
652    /// Returns the replicas that are actively running the given object (ingestion or export).
653    fn active_replicas(&mut self, id: &GlobalId) -> Box<dyn Iterator<Item = &mut Replica<T>> + '_> {
654        if let Some(ingestion_id) = self.ingestion_exports.get(id) {
655            match self.active_ingestions.get(ingestion_id) {
656                Some(ingestion) => Box::new(self.replicas.iter_mut().filter_map(
657                    move |(replica_id, replica)| {
658                        if ingestion.active_replicas.contains(replica_id) {
659                            Some(replica)
660                        } else {
661                            None
662                        }
663                    },
664                )),
665                None => {
666                    // The ingestion has already been compacted away (aka. stopped).
667                    Box::new(std::iter::empty())
668                }
669            }
670        } else if let Some(export) = self.active_exports.get(id) {
671            Box::new(
672                self.replicas
673                    .iter_mut()
674                    .filter_map(move |(replica_id, replica)| {
675                        if export.active_replicas.contains(replica_id) {
676                            Some(replica)
677                        } else {
678                            None
679                        }
680                    }),
681            )
682        } else {
683            Box::new(self.replicas.values_mut())
684        }
685    }
686
687    /// Returns whether the given replica is actively running the given object (ingestion or export).
688    fn is_active_replica(&self, id: &GlobalId, replica_id: &ReplicaId) -> bool {
689        if let Some(ingestion_id) = self.ingestion_exports.get(id) {
690            match self.active_ingestions.get(ingestion_id) {
691                Some(ingestion) => ingestion.active_replicas.contains(replica_id),
692                None => {
693                    // The ingestion has already been compacted away (aka. stopped).
694                    false
695                }
696            }
697        } else if let Some(export) = self.active_exports.get(id) {
698            export.active_replicas.contains(replica_id)
699        } else {
700            // For non-ingestion objects, all replicas are active
701            true
702        }
703    }
704
705    /// Refresh the controller state metrics for this instance.
706    ///
707    /// We could also do state metric updates directly in response to state changes, but that would
708    /// mean littering the code with metric update calls. Encapsulating state metric maintenance in
709    /// a single method is less noisy.
710    ///
711    /// This method is invoked by `Controller::maintain`, which we expect to be called once per
712    /// second during normal operation.
713    pub(super) fn refresh_state_metrics(&self) {
714        let connected_replica_count = self.replicas.values().filter(|r| r.is_connected()).count();
715
716        self.metrics
717            .connected_replica_count
718            .set(u64::cast_from(connected_replica_count));
719    }
720
721    /// Returns the set of replica IDs that are actively running the given
722    /// object (ingestion, ingestion export (aka. subsource), or export).
723    pub fn get_active_replicas_for_object(&self, id: &GlobalId) -> BTreeSet<ReplicaId> {
724        if let Some(ingestion_id) = self.ingestion_exports.get(id) {
725            // Right now, only ingestions can have per-replica scheduling decisions.
726            match self.active_ingestions.get(ingestion_id) {
727                Some(ingestion) => ingestion.active_replicas.clone(),
728                None => {
729                    // The ingestion has already been compacted away (aka. stopped).
730                    BTreeSet::new()
731                }
732            }
733        } else {
734            // For non-ingestion objects, all replicas are active
735            self.replicas.keys().copied().collect()
736        }
737    }
738}
739
740/// Replica-specific configuration.
741#[derive(Clone, Debug)]
742pub(super) struct ReplicaConfig {
743    pub build_info: &'static BuildInfo,
744    pub location: ClusterReplicaLocation,
745    pub grpc_client: GrpcClientParameters,
746    pub enable_ctp: bool,
747}
748
749/// State maintained about individual replicas.
750#[derive(Debug)]
751pub struct Replica<T> {
752    /// Replica configuration.
753    config: ReplicaConfig,
754    /// A sender for commands for the replica.
755    ///
756    /// If sending to this channel fails, the replica has failed and requires
757    /// rehydration.
758    command_tx: mpsc::UnboundedSender<StorageCommand<T>>,
759    /// A handle to the task that aborts it when the replica is dropped.
760    task: AbortOnDropHandle<()>,
761    /// Flag reporting whether the replica connection has been established.
762    connected: Arc<AtomicBool>,
763}
764
765impl<T> Replica<T>
766where
767    T: Timestamp + Lattice + Sync,
768    StorageGrpcClient: StorageClient<T>,
769{
770    /// Creates a new [`Replica`].
771    fn new(
772        id: ReplicaId,
773        config: ReplicaConfig,
774        metrics: ReplicaMetrics,
775        response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
776    ) -> Self {
777        let (command_tx, command_rx) = mpsc::unbounded_channel();
778        let connected = Arc::new(AtomicBool::new(false));
779
780        let task = mz_ore::task::spawn(
781            || "storage-replica-{id}",
782            ReplicaTask {
783                replica_id: id,
784                config: config.clone(),
785                metrics: metrics.clone(),
786                connected: Arc::clone(&connected),
787                command_rx,
788                response_tx,
789            }
790            .run(),
791        );
792
793        Self {
794            config,
795            command_tx,
796            task: task.abort_on_drop(),
797            connected,
798        }
799    }
800
801    /// Sends a command to the replica.
802    fn send(&self, command: StorageCommand<T>) {
803        // Send failures ignored, we'll check for failed replicas separately.
804        let _ = self.command_tx.send(command);
805    }
806
807    /// Determine if this replica has failed. This is true if the replica
808    /// task has terminated.
809    fn failed(&self) -> bool {
810        self.task.is_finished()
811    }
812
813    /// Determine if the replica connection has been established.
814    pub(super) fn is_connected(&self) -> bool {
815        self.connected.load(atomic::Ordering::Relaxed)
816    }
817}
818
819type StorageCtpClient<T> = transport::Client<StorageCommand<T>, StorageResponse<T>>;
820
821#[derive(Debug)]
822enum ReplicaClient<T: Timestamp + Lattice> {
823    Grpc(Partitioned<StorageGrpcClient, StorageCommand<T>, StorageResponse<T>>),
824    Ctp(Partitioned<StorageCtpClient<T>, StorageCommand<T>, StorageResponse<T>>),
825}
826
827#[async_trait]
828impl<T> GenericClient<StorageCommand<T>, StorageResponse<T>> for ReplicaClient<T>
829where
830    T: Timestamp + Lattice + Sync,
831    StorageGrpcClient: StorageClient<T>,
832{
833    async fn send(&mut self, cmd: StorageCommand<T>) -> anyhow::Result<()> {
834        match self {
835            Self::Grpc(client) => client.send(cmd).await,
836            Self::Ctp(client) => client.send(cmd).await,
837        }
838    }
839
840    /// # Cancel safety
841    ///
842    /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
843    /// statement and some other branch completes first, it is guaranteed that no messages were
844    /// received by this client.
845    async fn recv(&mut self) -> anyhow::Result<Option<StorageResponse<T>>> {
846        match self {
847            // `Partitioned::recv` is documented as cancel safe.
848            Self::Grpc(client) => client.recv().await,
849            // `Partitioned::recv` is documented as cancel safe.
850            Self::Ctp(client) => client.recv().await,
851        }
852    }
853}
854
855/// A task handling communication with a replica.
856struct ReplicaTask<T> {
857    /// The ID of the replica.
858    replica_id: ReplicaId,
859    /// Replica configuration.
860    config: ReplicaConfig,
861    /// Replica metrics.
862    metrics: ReplicaMetrics,
863    /// Flag to report successful replica connection.
864    connected: Arc<AtomicBool>,
865    /// A channel upon which commands intended for the replica are delivered.
866    command_rx: mpsc::UnboundedReceiver<StorageCommand<T>>,
867    /// A channel upon which responses from the replica are delivered.
868    response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
869}
870
871impl<T> ReplicaTask<T>
872where
873    T: Timestamp + Lattice + Sync,
874    StorageGrpcClient: StorageClient<T>,
875{
876    /// Runs the replica task.
877    async fn run(self) {
878        let replica_id = self.replica_id;
879        info!(%replica_id, "starting replica task");
880
881        let client = self.connect().await;
882        match self.run_message_loop(client).await {
883            Ok(()) => info!(%replica_id, "stopped replica task"),
884            Err(error) => warn!(%replica_id, %error, "replica task failed"),
885        }
886    }
887
888    /// Connects to the replica.
889    ///
890    /// The connection is retried forever (with backoff) and this method returns only after
891    /// a connection was successfully established.
892    async fn connect(&self) -> ReplicaClient<T> {
893        let try_connect = async move |retry: RetryState| {
894            let version = self.config.build_info.semver_version();
895            let client_params = &self.config.grpc_client;
896
897            let connect_start = Instant::now();
898            let connect_result = if self.config.enable_ctp {
899                let connect_timeout = client_params.connect_timeout.unwrap_or(Duration::MAX);
900                let keepalive_timeout = client_params
901                    .http2_keep_alive_timeout
902                    .unwrap_or(Duration::MAX);
903
904                StorageCtpClient::<T>::connect_partitioned(
905                    self.config.location.ctl_addrs.clone(),
906                    version,
907                    connect_timeout,
908                    keepalive_timeout,
909                    self.metrics.clone(),
910                )
911                .await
912                .map(ReplicaClient::Ctp)
913            } else {
914                let addrs = &self.config.location.ctl_addrs;
915                let dests = addrs
916                    .iter()
917                    .map(|addr| (addr.clone(), self.metrics.clone()))
918                    .collect();
919                StorageGrpcClient::connect_partitioned(dests, version, client_params)
920                    .await
921                    .map(ReplicaClient::Grpc)
922            };
923
924            self.metrics.observe_connect_time(connect_start.elapsed());
925
926            connect_result.inspect_err(|error| {
927                let next_backoff = retry.next_backoff.unwrap();
928                if retry.i >= mz_service::retry::INFO_MIN_RETRIES {
929                    info!(
930                        replica_id = %self.replica_id, ?next_backoff,
931                        "error connecting to replica: {error:#}",
932                    );
933                } else {
934                    debug!(
935                        replica_id = %self.replica_id, ?next_backoff,
936                        "error connecting to replica: {error:#}",
937                    );
938                }
939            })
940        };
941
942        let client = Retry::default()
943            .clamp_backoff(Duration::from_secs(1))
944            .retry_async(try_connect)
945            .await
946            .expect("retries forever");
947
948        self.metrics.observe_connect();
949        self.connected.store(true, atomic::Ordering::Relaxed);
950
951        client
952    }
953
954    /// Runs the message loop.
955    ///
956    /// Returns (with an `Err`) if it encounters an error condition (e.g. the replica disconnects).
957    /// If no error condition is encountered, the task runs until the controller disconnects from
958    /// the command channel, or the task is dropped.
959    async fn run_message_loop(mut self, mut client: ReplicaClient<T>) -> Result<(), anyhow::Error> {
960        loop {
961            select! {
962                // Command from controller to forward to replica.
963                // `tokio::sync::mpsc::UnboundedReceiver::recv` is documented as cancel safe.
964                command = self.command_rx.recv() => {
965                    let Some(mut command) = command else {
966                        tracing::debug!(%self.replica_id, "controller is no longer interested in this replica, shutting down message loop");
967                        break;
968                    };
969
970                    self.specialize_command(&mut command);
971                    client.send(command).await?;
972                },
973                // Response from replica to forward to controller.
974                // `GenericClient::recv` implementations are required to be cancel safe.
975                response = client.recv() => {
976                    let Some(response) = response? else {
977                        bail!("replica unexpectedly gracefully terminated connection");
978                    };
979
980                    if self.response_tx.send((Some(self.replica_id), response)).is_err() {
981                        tracing::debug!(%self.replica_id, "controller (receiver) is no longer interested in this replica, shutting down message loop");
982                        break;
983                    }
984                }
985            }
986        }
987
988        Ok(())
989    }
990
991    /// Specialize a command for the given replica configuration.
992    ///
993    /// Most [`StorageCommand`]s are independent of the target replica, but some contain
994    /// replica-specific fields that must be adjusted before sending.
995    fn specialize_command(&self, command: &mut StorageCommand<T>) {
996        if let StorageCommand::Hello { nonce } = command {
997            *nonce = Uuid::new_v4();
998        }
999    }
1000}