mz_storage_controller/
instance.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller for a storage instance.
11
12use crate::CollectionMetadata;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::atomic::AtomicBool;
15use std::sync::{Arc, atomic};
16use std::time::{Duration, Instant};
17
18use anyhow::bail;
19use differential_dataflow::lattice::Lattice;
20use itertools::Itertools;
21use mz_build_info::BuildInfo;
22use mz_cluster_client::ReplicaId;
23use mz_cluster_client::client::ClusterReplicaLocation;
24use mz_ore::cast::CastFrom;
25use mz_ore::now::NowFn;
26use mz_ore::retry::{Retry, RetryState};
27use mz_ore::task::AbortOnDropHandle;
28use mz_repr::GlobalId;
29use mz_service::client::{GenericClient, Partitioned};
30use mz_service::params::GrpcClientParameters;
31use mz_service::transport;
32use mz_storage_client::client::{
33    RunIngestionCommand, RunSinkCommand, Status, StatusUpdate, StorageCommand, StorageResponse,
34};
35use mz_storage_client::metrics::{InstanceMetrics, ReplicaMetrics};
36use mz_storage_types::sinks::StorageSinkDesc;
37use mz_storage_types::sources::{IngestionDescription, SourceConnection};
38use timely::order::TotalOrder;
39use timely::progress::{Antichain, Timestamp};
40use tokio::select;
41use tokio::sync::mpsc;
42use tracing::{debug, info, warn};
43use uuid::Uuid;
44
45use crate::history::CommandHistory;
46
47/// A controller for a storage instance.
48///
49/// Encapsulates communication with replicas in this instance, and their rehydration.
50///
51/// Note that storage objects (sources and sinks) don't currently support replication (database-issues#5051).
52/// An instance can have multiple replicas connected, but only if it has no storage objects
53/// installed. Attempting to install storage objects on multi-replica instances, or attempting to
54/// add more than one replica to instances that have storage objects installed, is illegal and will
55/// lead to panics.
56#[derive(Debug)]
57pub(crate) struct Instance<T> {
58    /// The workload class of this instance.
59    ///
60    /// This is currently only used to annotate metrics.
61    pub workload_class: Option<String>,
62    /// The replicas connected to this storage instance.
63    replicas: BTreeMap<ReplicaId, Replica<T>>,
64    /// The ingestions currently running on this instance.
65    ///
66    /// While this is derivable from `history` on demand, keeping a denormalized
67    /// list of running ingestions is quite a bit more convenient in the
68    /// implementation of `StorageController::active_ingestions`.
69    active_ingestions: BTreeMap<GlobalId, ActiveIngestion>,
70    /// A map from ingestion export ID to the ingestion that is producing it.
71    ingestion_exports: BTreeMap<GlobalId, GlobalId>,
72    /// The exports currently running on this instance.
73    ///
74    /// While this is derivable from `history` on demand, keeping a denormalized
75    /// list of running exports is quite a bit more convenient for the
76    /// controller.
77    active_exports: BTreeMap<GlobalId, ActiveExport>,
78    /// The command history, used to replay past commands when introducing new replicas or
79    /// reconnecting to existing replicas.
80    history: CommandHistory<T>,
81    /// Metrics tracked for this storage instance.
82    metrics: InstanceMetrics,
83    /// A function that returns the current time.
84    now: NowFn,
85    /// A sender for responses from replicas.
86    ///
87    /// Responses are tagged with the [`ReplicaId`] of the replica that sent the
88    /// response. Responses that don't originate from a replica (e.g. a "paused"
89    /// status update, when no replicas are connected) are tagged with `None`.
90    response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
91}
92
93#[derive(Debug)]
94struct ActiveIngestion {
95    /// The set of replicas that this ingestion is currently running on.
96    active_replicas: BTreeSet<ReplicaId>,
97}
98
99#[derive(Debug)]
100struct ActiveExport {
101    /// The set of replicas that this export is currently running on.
102    active_replicas: BTreeSet<ReplicaId>,
103}
104
105impl<T> Instance<T>
106where
107    T: Timestamp + Lattice + TotalOrder + Sync,
108{
109    /// Creates a new [`Instance`].
110    pub fn new(
111        workload_class: Option<String>,
112        metrics: InstanceMetrics,
113        now: NowFn,
114        instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
115    ) -> Self {
116        let history = CommandHistory::new(metrics.for_history());
117
118        let mut instance = Self {
119            workload_class,
120            replicas: Default::default(),
121            active_ingestions: Default::default(),
122            ingestion_exports: Default::default(),
123            active_exports: BTreeMap::new(),
124            history,
125            metrics,
126            now,
127            response_tx: instance_response_tx,
128        };
129
130        instance.send(StorageCommand::Hello {
131            // The nonce is protocol iteration-specific and will be set in
132            // `ReplicaTask::specialize_command`.
133            nonce: Default::default(),
134        });
135
136        instance
137    }
138
139    /// Returns the IDs of all replicas connected to this storage instance.
140    pub fn replica_ids(&self) -> impl Iterator<Item = ReplicaId> + '_ {
141        self.replicas.keys().copied()
142    }
143
144    /// Adds a new replica to this storage instance.
145    pub fn add_replica(&mut self, id: ReplicaId, config: ReplicaConfig) {
146        // Reduce the history to limit the amount of commands sent to the new replica, and to
147        // enable the `objects_installed` assert below.
148        self.history.reduce();
149
150        let metrics = self.metrics.for_replica(id);
151        let replica = Replica::new(id, config, metrics, self.response_tx.clone());
152
153        self.replicas.insert(id, replica);
154
155        self.update_scheduling(false);
156
157        self.replay_commands(id);
158    }
159
160    /// Replays commands to the specified replica.
161    pub fn replay_commands(&mut self, replica_id: ReplicaId) {
162        let commands = self.history.iter().cloned();
163
164        let filtered_commands = commands
165            .filter_map(|command| match command {
166                StorageCommand::RunIngestion(ingestion) => {
167                    if self.is_active_replica(&ingestion.id, &replica_id) {
168                        Some(StorageCommand::RunIngestion(ingestion))
169                    } else {
170                        None
171                    }
172                }
173                StorageCommand::RunSink(sink) => {
174                    if self.is_active_replica(&sink.id, &replica_id) {
175                        Some(StorageCommand::RunSink(sink))
176                    } else {
177                        None
178                    }
179                }
180                StorageCommand::AllowCompaction(id, upper) => {
181                    if self.is_active_replica(&id, &replica_id) {
182                        Some(StorageCommand::AllowCompaction(id, upper))
183                    } else {
184                        None
185                    }
186                }
187                command => Some(command),
188            })
189            .collect::<Vec<_>>();
190
191        let replica = self
192            .replicas
193            .get_mut(&replica_id)
194            .expect("replica must exist");
195
196        // Replay the commands at the new replica.
197        for command in filtered_commands {
198            replica.send(command);
199        }
200    }
201
202    /// Removes the identified replica from this storage instance.
203    pub fn drop_replica(&mut self, id: ReplicaId) {
204        let replica = self.replicas.remove(&id);
205
206        let mut needs_rescheduling = false;
207        for (ingestion_id, ingestion) in self.active_ingestions.iter_mut() {
208            let was_running = ingestion.active_replicas.remove(&id);
209            if was_running {
210                tracing::debug!(
211                    %ingestion_id,
212                    replica_id = %id,
213                    "ingestion was running on dropped replica, updating scheduling decisions"
214                );
215                needs_rescheduling = true;
216            }
217        }
218        for (export_id, export) in self.active_exports.iter_mut() {
219            let was_running = export.active_replicas.remove(&id);
220            if was_running {
221                tracing::debug!(
222                    %export_id,
223                    replica_id = %id,
224                    "export was running on dropped replica, updating scheduling decisions"
225                );
226                needs_rescheduling = true;
227            }
228        }
229
230        tracing::info!(%id, %needs_rescheduling, "dropped replica");
231
232        if needs_rescheduling {
233            self.update_scheduling(true);
234        }
235
236        if replica.is_some() && self.replicas.is_empty() {
237            self.update_paused_statuses();
238        }
239    }
240
241    /// Rehydrates any failed replicas of this storage instance.
242    pub fn rehydrate_failed_replicas(&mut self) {
243        let replicas = self.replicas.iter();
244        let failed_replicas: Vec<_> = replicas
245            .filter_map(|(id, replica)| replica.failed().then_some(*id))
246            .collect();
247
248        for id in failed_replicas {
249            let replica = self.replicas.remove(&id).expect("must exist");
250            self.add_replica(id, replica.config);
251        }
252    }
253
254    /// Returns ingestions running on this instance. This _only_ includes the
255    /// "toplevel" ingestions, not any of their source tables (aka. subsources).
256    pub fn active_ingestions(&self) -> impl Iterator<Item = &GlobalId> {
257        self.active_ingestions.keys()
258    }
259
260    /// Returns ingestion exports running on this instance. This includes the
261    /// ingestion itself, if any, and running source tables (aka. subsources).
262    ///
263    /// This does _not_ filter out exports whose write frontier is the empty
264    /// frontier, which some might consider not active anymore. But for the
265    /// purposes of the instance controller, these are still considered active
266    /// because we don't know about frontiers.
267    pub fn active_ingestion_exports(&self) -> impl Iterator<Item = &GlobalId> {
268        let ingestion_exports = self.ingestion_exports.keys();
269        self.active_ingestions.keys().chain(ingestion_exports)
270    }
271
272    /// Returns the exports running on this instance.
273    pub fn active_exports(&self) -> impl Iterator<Item = &GlobalId> {
274        self.active_exports.keys()
275    }
276
277    /// Sets the status to paused for all sources/sinks in the history.
278    fn update_paused_statuses(&mut self) {
279        let now = mz_ore::now::to_datetime((self.now)());
280        let make_update = |id, object_type| StatusUpdate {
281            id,
282            status: Status::Paused,
283            timestamp: now,
284            error: None,
285            hints: BTreeSet::from([format!(
286                "There is currently no replica running this {object_type}"
287            )]),
288            namespaced_errors: Default::default(),
289            replica_id: None,
290        };
291
292        self.history.reduce();
293
294        let mut status_updates = Vec::new();
295        for command in self.history.iter() {
296            match command {
297                StorageCommand::RunIngestion(ingestion) => {
298                    let old_style_ingestion =
299                        ingestion.id != ingestion.description.remap_collection_id;
300                    let subsource_ids = ingestion.description.collection_ids().filter(|id| {
301                        // NOTE(aljoscha): We filter out the remap collection for old style
302                        // ingestions because it doesn't get any status updates about it from the
303                        // replica side. So we don't want to synthesize a 'paused' status here.
304                        // New style ingestion do, since the source itself contains the remap data.
305                        let should_discard =
306                            old_style_ingestion && id == &ingestion.description.remap_collection_id;
307                        !should_discard
308                    });
309                    for id in subsource_ids {
310                        status_updates.push(make_update(id, "source"));
311                    }
312                }
313                StorageCommand::RunSink(sink) => {
314                    status_updates.push(make_update(sink.id, "sink"));
315                }
316                _ => (),
317            }
318        }
319
320        for update in status_updates {
321            // NOTE: If we lift this "inject paused status" logic to the
322            // controller, we could instead return ReplicaId instead of an
323            // Option<ReplicaId>.
324            let _ = self
325                .response_tx
326                .send((None, StorageResponse::StatusUpdate(update)));
327        }
328    }
329
330    /// Sends a command to this storage instance.
331    pub fn send(&mut self, command: StorageCommand<T>) {
332        // Record the command so that new replicas can be brought up to speed.
333        self.history.push(command.clone());
334
335        match command.clone() {
336            StorageCommand::RunIngestion(ingestion) => {
337                // First absorb into our state, because this might change
338                // scheduling decisions, which need to be respected just below
339                // when sending commands.
340                self.absorb_ingestion(*ingestion.clone());
341
342                for replica in self.active_replicas(&ingestion.id) {
343                    replica.send(StorageCommand::RunIngestion(ingestion.clone()));
344                }
345            }
346            StorageCommand::RunSink(sink) => {
347                // First absorb into our state, because this might change
348                // scheduling decisions, which need to be respected just below
349                // when sending commands.
350                self.absorb_export(*sink.clone());
351
352                for replica in self.active_replicas(&sink.id) {
353                    replica.send(StorageCommand::RunSink(sink.clone()));
354                }
355            }
356            StorageCommand::AllowCompaction(id, frontier) => {
357                // First send out commands and then absorb into our state since
358                // absorbing them might remove entries from active_ingestions.
359                for replica in self.active_replicas(&id) {
360                    replica.send(StorageCommand::AllowCompaction(
361                        id.clone(),
362                        frontier.clone(),
363                    ));
364                }
365
366                self.absorb_compaction(id, frontier);
367            }
368            command => {
369                for replica in self.replicas.values_mut() {
370                    replica.send(command.clone());
371                }
372            }
373        }
374
375        if command.installs_objects() && self.replicas.is_empty() {
376            self.update_paused_statuses();
377        }
378    }
379
380    /// Updates internal state based on incoming ingestion commands.
381    ///
382    /// This does _not_ send commands to replicas, we only record the ingestion
383    /// in state and potentially update scheduling decisions.
384    fn absorb_ingestion(&mut self, ingestion: RunIngestionCommand) {
385        let existing_ingestion_state = self.active_ingestions.get_mut(&ingestion.id);
386
387        // Always update our mapping from export to their ingestion.
388        for id in ingestion.description.source_exports.keys() {
389            self.ingestion_exports.insert(id.clone(), ingestion.id);
390        }
391
392        if let Some(ingestion_state) = existing_ingestion_state {
393            // It's an update for an existing ingestion. We don't need to
394            // change anything about our scheduling decisions, no need to
395            // update active_ingestions.
396
397            tracing::debug!(
398                ingestion_id = %ingestion.id,
399                active_replicas = %ingestion_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
400                "updating ingestion"
401            );
402        } else {
403            // We create a new ingestion state for this ingestion.
404            let ingestion_state = ActiveIngestion {
405                active_replicas: BTreeSet::new(),
406            };
407            self.active_ingestions.insert(ingestion.id, ingestion_state);
408
409            // Maybe update scheduling decisions.
410            self.update_scheduling(false);
411        }
412    }
413
414    /// Updates internal state based on incoming export commands.
415    ///
416    /// This does _not_ send commands to replicas, we only record the export
417    /// in state and potentially update scheduling decisions.
418    fn absorb_export(&mut self, export: RunSinkCommand<T>) {
419        let existing_export_state = self.active_exports.get_mut(&export.id);
420
421        if let Some(export_state) = existing_export_state {
422            // It's an update for an existing export. We don't need to
423            // change anything about our scheduling decisions, no need to
424            // update active_exports.
425
426            tracing::debug!(
427                export_id = %export.id,
428                active_replicas = %export_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
429                "updating export"
430            );
431        } else {
432            // We create a new export state for this export.
433            let export_state = ActiveExport {
434                active_replicas: BTreeSet::new(),
435            };
436            self.active_exports.insert(export.id, export_state);
437
438            // Maybe update scheduling decisions.
439            self.update_scheduling(false);
440        }
441    }
442
443    /// Update scheduling decisions, that is what replicas should be running a
444    /// given object, if needed.
445    ///
446    /// An important property of this scheduling algorithm is that we never
447    /// change the scheduling decision for single-replica objects unless we
448    /// have to, that is unless the replica that they are running on goes away.
449    /// We do this, so that we don't send a mix of "run"/"allow
450    /// compaction"/"run" messages to replicas, which wouldn't deal well with
451    /// this. When we _do_ have to make a scheduling decision we schedule a
452    /// single-replica ingestion on the first replica, according to the sort
453    /// order of `ReplicaId`. We do this latter so that the scheduling decision
454    /// is stable across restarts of `environmentd`/the controller.
455    ///
456    /// For multi-replica objects (e.g. Kafka ingestions), each active object is
457    /// scheduled on all replicas.
458    ///
459    /// If `send_commands` is true, will send commands for newly-scheduled
460    /// single-replica objects.
461    fn update_scheduling(&mut self, send_commands: bool) {
462        #[derive(Debug)]
463        enum ObjectId {
464            Ingestion(GlobalId),
465            Export(GlobalId),
466        }
467        // We first collect scheduling preferences and then schedule below.
468        // Applying the decision needs a mutable borrow but we also need to
469        // borrow for determining `prefers_single_replica`, so we split this
470        // into two loops.
471        let mut scheduling_preferences: Vec<(ObjectId, bool)> = Vec::new();
472
473        for ingestion_id in self.active_ingestions.keys() {
474            let ingestion_description = self
475                .get_ingestion_description(ingestion_id)
476                .expect("missing ingestion description");
477
478            let prefers_single_replica = ingestion_description
479                .desc
480                .connection
481                .prefers_single_replica();
482
483            scheduling_preferences
484                .push((ObjectId::Ingestion(*ingestion_id), prefers_single_replica));
485        }
486
487        for export_id in self.active_exports.keys() {
488            // All sinks prefer single replica
489            scheduling_preferences.push((ObjectId::Export(*export_id), true));
490        }
491
492        // Collect all commands per replica and send them in one go.
493        let mut commands_by_replica: BTreeMap<ReplicaId, Vec<ObjectId>> = BTreeMap::new();
494
495        for (object_id, prefers_single_replica) in scheduling_preferences {
496            let active_replicas = match object_id {
497                ObjectId::Ingestion(ingestion_id) => {
498                    &mut self
499                        .active_ingestions
500                        .get_mut(&ingestion_id)
501                        .expect("missing ingestion state")
502                        .active_replicas
503                }
504                ObjectId::Export(export_id) => {
505                    &mut self
506                        .active_exports
507                        .get_mut(&export_id)
508                        .expect("missing ingestion state")
509                        .active_replicas
510                }
511            };
512
513            if prefers_single_replica {
514                // For single-replica ingestion, schedule only if it's not already running.
515                if active_replicas.is_empty() {
516                    let target_replica = self.replicas.keys().min().copied();
517                    if let Some(first_replica_id) = target_replica {
518                        tracing::info!(
519                            object_id = ?object_id,
520                            replica_id = %first_replica_id,
521                            "scheduling single-replica object");
522                        active_replicas.insert(first_replica_id);
523
524                        commands_by_replica
525                            .entry(first_replica_id)
526                            .or_default()
527                            .push(object_id);
528                    }
529                } else {
530                    tracing::info!(
531                        ?object_id,
532                        active_replicas = %active_replicas.iter().map(|id| id.to_string()).join(", "),
533                        "single-replica object already running, not scheduling again",
534                    );
535                }
536            } else {
537                let current_replica_ids: BTreeSet<_> = self.replicas.keys().copied().collect();
538                let unscheduled_replicas: Vec<_> = current_replica_ids
539                    .difference(active_replicas)
540                    .copied()
541                    .collect();
542                for replica_id in unscheduled_replicas {
543                    tracing::info!(
544                        ?object_id,
545                        %replica_id,
546                        "scheduling multi-replica object"
547                    );
548                    active_replicas.insert(replica_id);
549                }
550            }
551        }
552
553        if send_commands {
554            for (replica_id, object_ids) in commands_by_replica {
555                let mut ingestion_commands = vec![];
556                let mut export_commands = vec![];
557                for object_id in object_ids {
558                    match object_id {
559                        ObjectId::Ingestion(id) => {
560                            ingestion_commands.push(RunIngestionCommand {
561                                id,
562                                description: self
563                                    .get_ingestion_description(&id)
564                                    .expect("missing ingestion description")
565                                    .clone(),
566                            });
567                        }
568                        ObjectId::Export(id) => {
569                            export_commands.push(RunSinkCommand {
570                                id,
571                                description: self
572                                    .get_export_description(&id)
573                                    .expect("missing export description")
574                                    .clone(),
575                            });
576                        }
577                    }
578                }
579                for ingestion in ingestion_commands {
580                    let replica = self.replicas.get_mut(&replica_id).expect("missing replica");
581                    let ingestion = Box::new(ingestion);
582                    replica.send(StorageCommand::RunIngestion(ingestion));
583                }
584                for export in export_commands {
585                    let replica = self.replicas.get_mut(&replica_id).expect("missing replica");
586                    let export = Box::new(export);
587                    replica.send(StorageCommand::RunSink(export));
588                }
589            }
590        }
591    }
592
593    /// Returns the ingestion description for the given ingestion ID, if it
594    /// exists.
595    ///
596    /// This function searches through the command history to find the most
597    /// recent RunIngestionCommand for the specified ingestion ID and returns
598    /// its description.  Returns None if no ingestion with the given ID is
599    /// found.
600    pub fn get_ingestion_description(
601        &self,
602        id: &GlobalId,
603    ) -> Option<IngestionDescription<CollectionMetadata>> {
604        if !self.active_ingestions.contains_key(id) {
605            return None;
606        }
607
608        self.history.iter().rev().find_map(|command| {
609            if let StorageCommand::RunIngestion(ingestion) = command {
610                if &ingestion.id == id {
611                    Some(ingestion.description.clone())
612                } else {
613                    None
614                }
615            } else {
616                None
617            }
618        })
619    }
620
621    /// Returns the export description for the given export ID, if it
622    /// exists.
623    ///
624    /// This function searches through the command history to find the most
625    /// recent RunSinkCommand for the specified export ID and returns
626    /// its description.  Returns None if no ingestion with the given ID is
627    /// found.
628    pub fn get_export_description(
629        &self,
630        id: &GlobalId,
631    ) -> Option<StorageSinkDesc<CollectionMetadata, T>> {
632        if !self.active_exports.contains_key(id) {
633            return None;
634        }
635
636        self.history.iter().rev().find_map(|command| {
637            if let StorageCommand::RunSink(sink) = command {
638                if &sink.id == id {
639                    Some(sink.description.clone())
640                } else {
641                    None
642                }
643            } else {
644                None
645            }
646        })
647    }
648
649    /// Updates internal state based on incoming compaction commands.
650    fn absorb_compaction(&mut self, id: GlobalId, frontier: Antichain<T>) {
651        tracing::debug!(?self.active_ingestions, ?id, ?frontier, "allow_compaction");
652
653        if frontier.is_empty() {
654            self.active_ingestions.remove(&id);
655            self.ingestion_exports.remove(&id);
656            self.active_exports.remove(&id);
657        }
658    }
659
660    /// Returns the replicas that are actively running the given object (ingestion or export).
661    fn active_replicas(&mut self, id: &GlobalId) -> Box<dyn Iterator<Item = &mut Replica<T>> + '_> {
662        if let Some(ingestion_id) = self.ingestion_exports.get(id) {
663            match self.active_ingestions.get(ingestion_id) {
664                Some(ingestion) => Box::new(self.replicas.iter_mut().filter_map(
665                    move |(replica_id, replica)| {
666                        if ingestion.active_replicas.contains(replica_id) {
667                            Some(replica)
668                        } else {
669                            None
670                        }
671                    },
672                )),
673                None => {
674                    // The ingestion has already been compacted away (aka. stopped).
675                    Box::new(std::iter::empty())
676                }
677            }
678        } else if let Some(ingestion) = self.active_ingestions.get(id) {
679            Box::new(
680                self.replicas
681                    .iter_mut()
682                    .filter_map(move |(replica_id, replica)| {
683                        if ingestion.active_replicas.contains(replica_id) {
684                            Some(replica)
685                        } else {
686                            None
687                        }
688                    }),
689            )
690        } else if let Some(export) = self.active_exports.get(id) {
691            Box::new(
692                self.replicas
693                    .iter_mut()
694                    .filter_map(move |(replica_id, replica)| {
695                        if export.active_replicas.contains(replica_id) {
696                            Some(replica)
697                        } else {
698                            None
699                        }
700                    }),
701            )
702        } else {
703            Box::new(self.replicas.values_mut())
704        }
705    }
706
707    /// Returns whether the given replica is actively running the given object (ingestion or export).
708    fn is_active_replica(&self, id: &GlobalId, replica_id: &ReplicaId) -> bool {
709        if let Some(ingestion_id) = self.ingestion_exports.get(id) {
710            match self.active_ingestions.get(ingestion_id) {
711                Some(ingestion) => ingestion.active_replicas.contains(replica_id),
712                None => {
713                    // The ingestion has already been compacted away (aka. stopped).
714                    false
715                }
716            }
717        } else if let Some(ingestion) = self.active_ingestions.get(id) {
718            ingestion.active_replicas.contains(replica_id)
719        } else if let Some(export) = self.active_exports.get(id) {
720            export.active_replicas.contains(replica_id)
721        } else {
722            // For non-ingestion objects, all replicas are active
723            true
724        }
725    }
726
727    /// Refresh the controller state metrics for this instance.
728    ///
729    /// We could also do state metric updates directly in response to state changes, but that would
730    /// mean littering the code with metric update calls. Encapsulating state metric maintenance in
731    /// a single method is less noisy.
732    ///
733    /// This method is invoked by `Controller::maintain`, which we expect to be called once per
734    /// second during normal operation.
735    pub(super) fn refresh_state_metrics(&self) {
736        let connected_replica_count = self.replicas.values().filter(|r| r.is_connected()).count();
737
738        self.metrics
739            .connected_replica_count
740            .set(u64::cast_from(connected_replica_count));
741    }
742
743    /// Returns the set of replica IDs that are actively running the given
744    /// object (ingestion, ingestion export (aka. subsource), or export).
745    pub fn get_active_replicas_for_object(&self, id: &GlobalId) -> BTreeSet<ReplicaId> {
746        if let Some(ingestion_id) = self.ingestion_exports.get(id) {
747            // Right now, only ingestions can have per-replica scheduling decisions.
748            match self.active_ingestions.get(ingestion_id) {
749                Some(ingestion) => ingestion.active_replicas.clone(),
750                None => {
751                    // The ingestion has already been compacted away (aka. stopped).
752                    BTreeSet::new()
753                }
754            }
755        } else {
756            // For non-ingestion objects, all replicas are active
757            self.replicas.keys().copied().collect()
758        }
759    }
760}
761
762/// Replica-specific configuration.
763#[derive(Clone, Debug)]
764pub(super) struct ReplicaConfig {
765    pub build_info: &'static BuildInfo,
766    pub location: ClusterReplicaLocation,
767    pub grpc_client: GrpcClientParameters,
768}
769
770/// State maintained about individual replicas.
771#[derive(Debug)]
772pub struct Replica<T> {
773    /// Replica configuration.
774    config: ReplicaConfig,
775    /// A sender for commands for the replica.
776    ///
777    /// If sending to this channel fails, the replica has failed and requires
778    /// rehydration.
779    command_tx: mpsc::UnboundedSender<StorageCommand<T>>,
780    /// A handle to the task that aborts it when the replica is dropped.
781    task: AbortOnDropHandle<()>,
782    /// Flag reporting whether the replica connection has been established.
783    connected: Arc<AtomicBool>,
784}
785
786impl<T> Replica<T>
787where
788    T: Timestamp + Lattice + Sync,
789{
790    /// Creates a new [`Replica`].
791    fn new(
792        id: ReplicaId,
793        config: ReplicaConfig,
794        metrics: ReplicaMetrics,
795        response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
796    ) -> Self {
797        let (command_tx, command_rx) = mpsc::unbounded_channel();
798        let connected = Arc::new(AtomicBool::new(false));
799
800        let task = mz_ore::task::spawn(
801            || "storage-replica-{id}",
802            ReplicaTask {
803                replica_id: id,
804                config: config.clone(),
805                metrics: metrics.clone(),
806                connected: Arc::clone(&connected),
807                command_rx,
808                response_tx,
809            }
810            .run(),
811        );
812
813        Self {
814            config,
815            command_tx,
816            task: task.abort_on_drop(),
817            connected,
818        }
819    }
820
821    /// Sends a command to the replica.
822    fn send(&self, command: StorageCommand<T>) {
823        // Send failures ignored, we'll check for failed replicas separately.
824        let _ = self.command_tx.send(command);
825    }
826
827    /// Determine if this replica has failed. This is true if the replica
828    /// task has terminated.
829    fn failed(&self) -> bool {
830        self.task.is_finished()
831    }
832
833    /// Determine if the replica connection has been established.
834    pub(super) fn is_connected(&self) -> bool {
835        self.connected.load(atomic::Ordering::Relaxed)
836    }
837}
838
839type StorageCtpClient<T> = transport::Client<StorageCommand<T>, StorageResponse<T>>;
840type ReplicaClient<T> = Partitioned<StorageCtpClient<T>, StorageCommand<T>, StorageResponse<T>>;
841
842/// A task handling communication with a replica.
843struct ReplicaTask<T> {
844    /// The ID of the replica.
845    replica_id: ReplicaId,
846    /// Replica configuration.
847    config: ReplicaConfig,
848    /// Replica metrics.
849    metrics: ReplicaMetrics,
850    /// Flag to report successful replica connection.
851    connected: Arc<AtomicBool>,
852    /// A channel upon which commands intended for the replica are delivered.
853    command_rx: mpsc::UnboundedReceiver<StorageCommand<T>>,
854    /// A channel upon which responses from the replica are delivered.
855    response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
856}
857
858impl<T> ReplicaTask<T>
859where
860    T: Timestamp + Lattice + Sync,
861{
862    /// Runs the replica task.
863    async fn run(self) {
864        let replica_id = self.replica_id;
865        info!(%replica_id, "starting replica task");
866
867        let client = self.connect().await;
868        match self.run_message_loop(client).await {
869            Ok(()) => info!(%replica_id, "stopped replica task"),
870            Err(error) => warn!(%replica_id, %error, "replica task failed"),
871        }
872    }
873
874    /// Connects to the replica.
875    ///
876    /// The connection is retried forever (with backoff) and this method returns only after
877    /// a connection was successfully established.
878    async fn connect(&self) -> ReplicaClient<T> {
879        let try_connect = async move |retry: RetryState| {
880            let version = self.config.build_info.semver_version();
881            let client_params = &self.config.grpc_client;
882
883            let connect_start = Instant::now();
884            let connect_timeout = client_params.connect_timeout.unwrap_or(Duration::MAX);
885            let keepalive_timeout = client_params
886                .http2_keep_alive_timeout
887                .unwrap_or(Duration::MAX);
888
889            let connect_result = StorageCtpClient::<T>::connect_partitioned(
890                self.config.location.ctl_addrs.clone(),
891                version,
892                connect_timeout,
893                keepalive_timeout,
894                self.metrics.clone(),
895            )
896            .await;
897
898            self.metrics.observe_connect_time(connect_start.elapsed());
899
900            connect_result.inspect_err(|error| {
901                let next_backoff = retry.next_backoff.unwrap();
902                if retry.i >= mz_service::retry::INFO_MIN_RETRIES {
903                    info!(
904                        replica_id = %self.replica_id, ?next_backoff,
905                        "error connecting to replica: {error:#}",
906                    );
907                } else {
908                    debug!(
909                        replica_id = %self.replica_id, ?next_backoff,
910                        "error connecting to replica: {error:#}",
911                    );
912                }
913            })
914        };
915
916        let client = Retry::default()
917            .clamp_backoff(Duration::from_secs(1))
918            .retry_async(try_connect)
919            .await
920            .expect("retries forever");
921
922        self.metrics.observe_connect();
923        self.connected.store(true, atomic::Ordering::Relaxed);
924
925        client
926    }
927
928    /// Runs the message loop.
929    ///
930    /// Returns (with an `Err`) if it encounters an error condition (e.g. the replica disconnects).
931    /// If no error condition is encountered, the task runs until the controller disconnects from
932    /// the command channel, or the task is dropped.
933    async fn run_message_loop(mut self, mut client: ReplicaClient<T>) -> Result<(), anyhow::Error> {
934        loop {
935            select! {
936                // Command from controller to forward to replica.
937                // `tokio::sync::mpsc::UnboundedReceiver::recv` is documented as cancel safe.
938                command = self.command_rx.recv() => {
939                    let Some(mut command) = command else {
940                        tracing::debug!(%self.replica_id, "controller is no longer interested in this replica, shutting down message loop");
941                        break;
942                    };
943
944                    self.specialize_command(&mut command);
945                    client.send(command).await?;
946                },
947                // Response from replica to forward to controller.
948                // `GenericClient::recv` implementations are required to be cancel safe.
949                response = client.recv() => {
950                    let Some(response) = response? else {
951                        bail!("replica unexpectedly gracefully terminated connection");
952                    };
953
954                    if self.response_tx.send((Some(self.replica_id), response)).is_err() {
955                        tracing::debug!(%self.replica_id, "controller (receiver) is no longer interested in this replica, shutting down message loop");
956                        break;
957                    }
958                }
959            }
960        }
961
962        Ok(())
963    }
964
965    /// Specialize a command for the given replica configuration.
966    ///
967    /// Most [`StorageCommand`]s are independent of the target replica, but some contain
968    /// replica-specific fields that must be adjusted before sending.
969    fn specialize_command(&self, command: &mut StorageCommand<T>) {
970        if let StorageCommand::Hello { nonce } = command {
971            *nonce = Uuid::new_v4();
972        }
973    }
974}