mz_storage_controller/
instance.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller for a storage instance.
11
12use crate::CollectionMetadata;
13use std::collections::{BTreeMap, BTreeSet};
14use std::num::NonZeroI64;
15use std::sync::atomic::AtomicBool;
16use std::sync::{Arc, atomic};
17use std::time::{Duration, Instant};
18
19use anyhow::bail;
20use differential_dataflow::lattice::Lattice;
21use itertools::Itertools;
22use mz_build_info::BuildInfo;
23use mz_cluster_client::ReplicaId;
24use mz_cluster_client::client::{ClusterReplicaLocation, ClusterStartupEpoch, TimelyConfig};
25use mz_controller_types::dyncfgs::ENABLE_CREATE_SOCKETS_V2;
26use mz_dyncfg::ConfigSet;
27use mz_ore::cast::CastFrom;
28use mz_ore::now::NowFn;
29use mz_ore::retry::{Retry, RetryState};
30use mz_ore::task::AbortOnDropHandle;
31use mz_repr::GlobalId;
32use mz_service::client::{GenericClient, Partitioned};
33use mz_service::params::GrpcClientParameters;
34use mz_storage_client::client::{
35    RunIngestionCommand, RunSinkCommand, Status, StatusUpdate, StorageClient, StorageCommand,
36    StorageGrpcClient, StorageResponse,
37};
38use mz_storage_client::metrics::{InstanceMetrics, ReplicaMetrics};
39use mz_storage_types::dyncfgs::STORAGE_SINK_SNAPSHOT_FRONTIER;
40use mz_storage_types::sinks::StorageSinkDesc;
41use mz_storage_types::sources::{IngestionDescription, SourceConnection};
42use timely::order::TotalOrder;
43use timely::progress::{Antichain, Timestamp};
44use tokio::select;
45use tokio::sync::mpsc;
46use tracing::{debug, info, warn};
47
48use crate::history::CommandHistory;
49
50/// A controller for a storage instance.
51///
52/// Encapsulates communication with replicas in this instance, and their rehydration.
53///
54/// Note that storage objects (sources and sinks) don't currently support replication (database-issues#5051).
55/// An instance can have muliple replicas connected, but only if it has no storage objects
56/// installed. Attempting to install storage objects on multi-replica instances, or attempting to
57/// add more than one replica to instances that have storage objects installed, is illegal and will
58/// lead to panics.
59#[derive(Debug)]
60pub(crate) struct Instance<T> {
61    /// The workload class of this instance.
62    ///
63    /// This is currently only used to annotate metrics.
64    pub workload_class: Option<String>,
65    /// The replicas connected to this storage instance.
66    replicas: BTreeMap<ReplicaId, Replica<T>>,
67    /// The ingestions currently running on this instance.
68    ///
69    /// While this is derivable from `history` on demand, keeping a denormalized
70    /// list of running ingestions is quite a bit more convenient in the
71    /// implementation of `StorageController::active_ingestions`.
72    active_ingestions: BTreeMap<GlobalId, ActiveIngestion>,
73    /// A map from ingestion export ID to the ingestion that is producing it.
74    ingestion_exports: BTreeMap<GlobalId, GlobalId>,
75    /// The exports currently running on this instance.
76    ///
77    /// While this is derivable from `history` on demand, keeping a denormalized
78    /// list of running exports is quite a bit more convenient for the
79    /// controller.
80    active_exports: BTreeMap<GlobalId, ActiveExport>,
81    /// The command history, used to replay past commands when introducing new replicas or
82    /// reconnecting to existing replicas.
83    history: CommandHistory<T>,
84    /// The current cluster startup epoch.
85    ///
86    /// The `replica` value of the epoch is increased every time a replica is (re)connected,
87    /// allowing the distinction of different replica incarnations.
88    epoch: ClusterStartupEpoch,
89    /// Metrics tracked for this storage instance.
90    metrics: InstanceMetrics,
91    /// Dynamic system configuration.
92    dyncfg: Arc<ConfigSet>,
93    /// A function that returns the current time.
94    now: NowFn,
95    /// A sender for responses from replicas.
96    ///
97    /// Responses are tagged with the [`ReplicaId`] of the replica that sent the
98    /// response. Responses that don't originate from a replica (e.g. a "paused"
99    /// status update, when no replicas are connected) are tagged with `None`.
100    response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
101}
102
103#[derive(Debug)]
104struct ActiveIngestion {
105    /// The set of replicas that this ingestion is currently running on.
106    active_replicas: BTreeSet<ReplicaId>,
107}
108
109#[derive(Debug)]
110struct ActiveExport {
111    /// The set of replicas that this export is currently running on.
112    active_replicas: BTreeSet<ReplicaId>,
113}
114
115impl<T> Instance<T>
116where
117    T: Timestamp + Lattice + TotalOrder,
118    StorageGrpcClient: StorageClient<T>,
119{
120    /// Creates a new [`Instance`].
121    pub fn new(
122        envd_epoch: NonZeroI64,
123        metrics: InstanceMetrics,
124        dyncfg: Arc<ConfigSet>,
125        now: NowFn,
126        instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
127    ) -> Self {
128        let enable_snapshot_frontier = STORAGE_SINK_SNAPSHOT_FRONTIER.handle(&dyncfg);
129        let history = CommandHistory::new(metrics.for_history(), enable_snapshot_frontier);
130        let epoch = ClusterStartupEpoch::new(envd_epoch, 0);
131
132        let mut instance = Self {
133            workload_class: None,
134            replicas: Default::default(),
135            active_ingestions: Default::default(),
136            ingestion_exports: Default::default(),
137            active_exports: BTreeMap::new(),
138            history,
139            epoch,
140            metrics,
141            dyncfg,
142            now,
143            response_tx: instance_response_tx,
144        };
145
146        instance.send(StorageCommand::CreateTimely {
147            config: Default::default(),
148            epoch,
149        });
150
151        instance
152    }
153
154    /// Returns the IDs of all replicas connected to this storage instance.
155    pub fn replica_ids(&self) -> impl Iterator<Item = ReplicaId> + '_ {
156        self.replicas.keys().copied()
157    }
158
159    /// Adds a new replica to this storage instance.
160    pub fn add_replica(&mut self, id: ReplicaId, config: ReplicaConfig) {
161        // Reduce the history to limit the amount of commands sent to the new replica, and to
162        // enable the `objects_installed` assert below.
163        self.history.reduce();
164
165        self.epoch.bump_replica();
166        let metrics = self.metrics.for_replica(id);
167        let replica = Replica::new(
168            id,
169            config,
170            self.epoch,
171            metrics,
172            Arc::clone(&self.dyncfg),
173            self.response_tx.clone(),
174        );
175
176        self.replicas.insert(id, replica);
177
178        self.update_scheduling(false);
179
180        self.replay_commands(id);
181    }
182
183    /// Replays commands to the specified replica.
184    pub fn replay_commands(&mut self, replica_id: ReplicaId) {
185        let commands = self.history.iter().cloned();
186
187        let filtered_commands = commands
188            .filter_map(|command| match command {
189                StorageCommand::RunIngestion(ingestion) => {
190                    if self.is_active_replica(&ingestion.id, &replica_id) {
191                        Some(StorageCommand::RunIngestion(ingestion))
192                    } else {
193                        None
194                    }
195                }
196                StorageCommand::RunSink(sink) => {
197                    if self.is_active_replica(&sink.id, &replica_id) {
198                        Some(StorageCommand::RunSink(sink))
199                    } else {
200                        None
201                    }
202                }
203                StorageCommand::AllowCompaction(id, upper) => {
204                    if self.is_active_replica(&id, &replica_id) {
205                        Some(StorageCommand::AllowCompaction(id, upper))
206                    } else {
207                        None
208                    }
209                }
210                command => Some(command),
211            })
212            .collect::<Vec<_>>();
213
214        let replica = self
215            .replicas
216            .get_mut(&replica_id)
217            .expect("replica must exist");
218
219        // Replay the commands at the new replica.
220        for command in filtered_commands {
221            replica.send(command);
222        }
223    }
224
225    /// Removes the identified replica from this storage instance.
226    pub fn drop_replica(&mut self, id: ReplicaId) {
227        let replica = self.replicas.remove(&id);
228
229        let mut needs_rescheduling = false;
230        for (ingestion_id, ingestion) in self.active_ingestions.iter_mut() {
231            let was_running = ingestion.active_replicas.remove(&id);
232            if was_running {
233                tracing::debug!(
234                    %ingestion_id,
235                    replica_id = %id,
236                    "ingestion was running on dropped replica, updating scheduling decisions"
237                );
238                needs_rescheduling = true;
239            }
240        }
241        for (export_id, export) in self.active_exports.iter_mut() {
242            let was_running = export.active_replicas.remove(&id);
243            if was_running {
244                tracing::debug!(
245                    %export_id,
246                    replica_id = %id,
247                    "export was running on dropped replica, updating scheduling decisions"
248                );
249                needs_rescheduling = true;
250            }
251        }
252
253        tracing::info!(%id, %needs_rescheduling, "dropped replica");
254
255        if needs_rescheduling {
256            self.update_scheduling(true);
257        }
258
259        if replica.is_some() && self.replicas.is_empty() {
260            self.update_paused_statuses();
261        }
262    }
263
264    /// Rehydrates any failed replicas of this storage instance.
265    pub fn rehydrate_failed_replicas(&mut self) {
266        let replicas = self.replicas.iter();
267        let failed_replicas: Vec<_> = replicas
268            .filter_map(|(id, replica)| replica.failed().then_some(*id))
269            .collect();
270
271        for id in failed_replicas {
272            let replica = self.replicas.remove(&id).expect("must exist");
273            self.add_replica(id, replica.config);
274        }
275    }
276
277    /// Returns the ingestions running on this instance.
278    pub fn active_ingestions(&self) -> impl Iterator<Item = &GlobalId> {
279        self.active_ingestions.keys()
280    }
281
282    /// Returns the exports running on this instance.
283    pub fn active_exports(&self) -> impl Iterator<Item = &GlobalId> {
284        self.active_exports.keys()
285    }
286
287    /// Sets the status to paused for all sources/sinks in the history.
288    fn update_paused_statuses(&mut self) {
289        let now = mz_ore::now::to_datetime((self.now)());
290        let make_update = |id, object_type| StatusUpdate {
291            id,
292            status: Status::Paused,
293            timestamp: now,
294            error: None,
295            hints: BTreeSet::from([format!(
296                "There is currently no replica running this {object_type}"
297            )]),
298            namespaced_errors: Default::default(),
299            replica_id: None,
300        };
301
302        self.history.reduce();
303
304        let mut status_updates = Vec::new();
305        for command in self.history.iter() {
306            match command {
307                StorageCommand::RunIngestion(ingestion) => {
308                    // NOTE(aljoscha): We filter out the remap collection because we
309                    // don't get any status updates about it from the replica side. So
310                    // we don't want to synthesize a 'paused' status here.
311                    //
312                    // TODO(aljoscha): I think we want to fix this eventually, and make
313                    // sure we get status updates for the remap shard as well. Currently
314                    // its handling in the source status collection is a bit difficult
315                    // because we don't have updates for it in the status history
316                    // collection.
317                    let subsource_ids = ingestion
318                        .description
319                        .collection_ids()
320                        .filter(|id| id != &ingestion.description.remap_collection_id);
321                    for id in subsource_ids {
322                        status_updates.push(make_update(id, "source"));
323                    }
324                }
325                StorageCommand::RunSink(sink) => {
326                    status_updates.push(make_update(sink.id, "sink"));
327                }
328                _ => (),
329            }
330        }
331
332        for update in status_updates {
333            // NOTE: If we lift this "inject paused status" logic to the
334            // controller, we could instead return ReplicaId instead of an
335            // Option<ReplicaId>.
336            let _ = self
337                .response_tx
338                .send((None, StorageResponse::StatusUpdate(update)));
339        }
340    }
341
342    /// Sends a command to this storage instance.
343    pub fn send(&mut self, command: StorageCommand<T>) {
344        // Record the command so that new replicas can be brought up to speed.
345        self.history.push(command.clone());
346
347        match command.clone() {
348            StorageCommand::RunIngestion(ingestion) => {
349                // First absorb into our state, because this might change
350                // scheduling decisions, which need to be respected just below
351                // when sending commands.
352                self.absorb_ingestion(*ingestion.clone());
353
354                for replica in self.active_replicas(&ingestion.id) {
355                    replica.send(StorageCommand::RunIngestion(ingestion.clone()));
356                }
357            }
358            StorageCommand::RunSink(sink) => {
359                // First absorb into our state, because this might change
360                // scheduling decisions, which need to be respected just below
361                // when sending commands.
362                self.absorb_export(*sink.clone());
363
364                for replica in self.active_replicas(&sink.id) {
365                    replica.send(StorageCommand::RunSink(sink.clone()));
366                }
367            }
368            StorageCommand::AllowCompaction(id, frontier) => {
369                // First send out commands and then absorb into our state since
370                // absorbing them might remove entries from active_ingestions.
371                for replica in self.active_replicas(&id) {
372                    replica.send(StorageCommand::AllowCompaction(
373                        id.clone(),
374                        frontier.clone(),
375                    ));
376                }
377
378                self.absorb_compaction(id, frontier);
379            }
380            command => {
381                for replica in self.replicas.values_mut() {
382                    replica.send(command.clone());
383                }
384            }
385        }
386
387        if command.installs_objects() && self.replicas.is_empty() {
388            self.update_paused_statuses();
389        }
390    }
391
392    /// Updates internal state based on incoming ingestion commands.
393    ///
394    /// This does _not_ send commands to replicas, we only record the ingestion
395    /// in state and potentially update scheduling decisions.
396    fn absorb_ingestion(&mut self, ingestion: RunIngestionCommand) {
397        let existing_ingestion_state = self.active_ingestions.get_mut(&ingestion.id);
398
399        // Always update our mapping from export to their ingestion.
400        for id in ingestion.description.source_exports.keys() {
401            self.ingestion_exports.insert(id.clone(), ingestion.id);
402        }
403
404        if let Some(ingestion_state) = existing_ingestion_state {
405            // It's an update for an existing ingestion. We don't need to
406            // change anything about our scheduling decisions, no need to
407            // update active_ingestions.
408
409            tracing::debug!(
410                ingestion_id = %ingestion.id,
411                active_replicas = %ingestion_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
412                "updating ingestion"
413            );
414        } else {
415            // We create a new ingestion state for this ingestion.
416            let ingestion_state = ActiveIngestion {
417                active_replicas: BTreeSet::new(),
418            };
419            self.active_ingestions.insert(ingestion.id, ingestion_state);
420
421            // Maybe update scheduling decisions.
422            self.update_scheduling(false);
423        }
424    }
425
426    /// Updates internal state based on incoming export commands.
427    ///
428    /// This does _not_ send commands to replicas, we only record the export
429    /// in state and potentially update scheduling decisions.
430    fn absorb_export(&mut self, export: RunSinkCommand<T>) {
431        let existing_export_state = self.active_exports.get_mut(&export.id);
432
433        if let Some(export_state) = existing_export_state {
434            // It's an update for an existing export. We don't need to
435            // change anything about our scheduling decisions, no need to
436            // update active_exports.
437
438            tracing::debug!(
439                export_id = %export.id,
440                active_replicas = %export_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
441                "updating export"
442            );
443        } else {
444            // We create a new export state for this export.
445            let export_state = ActiveExport {
446                active_replicas: BTreeSet::new(),
447            };
448            self.active_exports.insert(export.id, export_state);
449
450            // Maybe update scheduling decisions.
451            self.update_scheduling(false);
452        }
453    }
454
455    /// Update scheduling decisions, that is what replicas should be running a
456    /// given object, if needed.
457    ///
458    /// An important property of this scheduling algorithm is that we never
459    /// change the scheduling decision for single-replica objects unless we
460    /// have to, that is unless the replica that they are running on goes away.
461    /// We do this, so that we don't send a mix of "run"/"allow
462    /// compaction"/"run" messages to replicas, which wouldn't deal well with
463    /// this. When we _do_ have to make a scheduling decision we schedule a
464    /// single-replica ingestion on the first replica, according to the sort
465    /// order of `ReplicaId`. We do this latter so that the scheduling decision
466    /// is stable across restarts of `environmentd`/the controller.
467    ///
468    /// For multi-replica objects (e.g. Kafka ingestions), each active object is
469    /// scheduled on all replicas.
470    ///
471    /// If `send_commands` is true, will send commands for newly-scheduled
472    /// single-replica objects.
473    fn update_scheduling(&mut self, send_commands: bool) {
474        #[derive(Debug)]
475        enum ObjectId {
476            Ingestion(GlobalId),
477            Export(GlobalId),
478        }
479        // We first collect scheduling preferences and then schedule below.
480        // Applying the decision needs a mutable borrow but we also need to
481        // borrow for determining `prefers_single_replica`, so we split this
482        // into two loops.
483        let mut scheduling_preferences: Vec<(ObjectId, bool)> = Vec::new();
484
485        for ingestion_id in self.active_ingestions.keys() {
486            let ingestion_description = self
487                .get_ingestion_description(ingestion_id)
488                .expect("missing ingestion description");
489
490            let prefers_single_replica = ingestion_description
491                .desc
492                .connection
493                .prefers_single_replica();
494
495            scheduling_preferences
496                .push((ObjectId::Ingestion(*ingestion_id), prefers_single_replica));
497        }
498
499        for export_id in self.active_exports.keys() {
500            // All sinks prefer single replica
501            scheduling_preferences.push((ObjectId::Export(*export_id), true));
502        }
503
504        // Collect all commands per replica and send them in one go.
505        let mut commands_by_replica: BTreeMap<ReplicaId, Vec<ObjectId>> = BTreeMap::new();
506
507        for (object_id, prefers_single_replica) in scheduling_preferences {
508            let active_replicas = match object_id {
509                ObjectId::Ingestion(ingestion_id) => {
510                    &mut self
511                        .active_ingestions
512                        .get_mut(&ingestion_id)
513                        .expect("missing ingestion state")
514                        .active_replicas
515                }
516                ObjectId::Export(export_id) => {
517                    &mut self
518                        .active_exports
519                        .get_mut(&export_id)
520                        .expect("missing ingestion state")
521                        .active_replicas
522                }
523            };
524
525            if prefers_single_replica {
526                // For single-replica ingestion, schedule only if it's not already running.
527                if active_replicas.is_empty() {
528                    let target_replica = self.replicas.keys().min().copied();
529                    if let Some(first_replica_id) = target_replica {
530                        tracing::info!(
531                            object_id = ?object_id,
532                            replica_id = %first_replica_id,
533                            "scheduling single-replica object");
534                        active_replicas.insert(first_replica_id);
535
536                        commands_by_replica
537                            .entry(first_replica_id)
538                            .or_default()
539                            .push(object_id);
540                    }
541                } else {
542                    tracing::info!(
543                        ?object_id,
544                        active_replicas = %active_replicas.iter().map(|id| id.to_string()).join(", "),
545                        "single-replica object already running, not scheduling again",
546                    );
547                }
548            } else {
549                let current_replica_ids: BTreeSet<_> = self.replicas.keys().copied().collect();
550                let unscheduled_replicas: Vec<_> = current_replica_ids
551                    .difference(active_replicas)
552                    .copied()
553                    .collect();
554                for replica_id in unscheduled_replicas {
555                    tracing::info!(
556                        ?object_id,
557                        %replica_id,
558                        "scheduling multi-replica object"
559                    );
560                    active_replicas.insert(replica_id);
561                }
562            }
563        }
564
565        if send_commands {
566            for (replica_id, object_ids) in commands_by_replica {
567                let mut ingestion_commands = vec![];
568                let mut export_commands = vec![];
569                for object_id in object_ids {
570                    match object_id {
571                        ObjectId::Ingestion(id) => {
572                            ingestion_commands.push(RunIngestionCommand {
573                                id,
574                                description: self
575                                    .get_ingestion_description(&id)
576                                    .expect("missing ingestion description")
577                                    .clone(),
578                            });
579                        }
580                        ObjectId::Export(id) => {
581                            export_commands.push(RunSinkCommand {
582                                id,
583                                description: self
584                                    .get_export_description(&id)
585                                    .expect("missing export description")
586                                    .clone(),
587                            });
588                        }
589                    }
590                }
591                for ingestion in ingestion_commands {
592                    let replica = self.replicas.get_mut(&replica_id).expect("missing replica");
593                    let ingestion = Box::new(ingestion);
594                    replica.send(StorageCommand::RunIngestion(ingestion));
595                }
596                for export in export_commands {
597                    let replica = self.replicas.get_mut(&replica_id).expect("missing replica");
598                    let export = Box::new(export);
599                    replica.send(StorageCommand::RunSink(export));
600                }
601            }
602        }
603    }
604
605    /// Returns the ingestion description for the given ingestion ID, if it
606    /// exists.
607    ///
608    /// This function searches through the command history to find the most
609    /// recent RunIngestionCommand for the specified ingestion ID and returns
610    /// its description.  Returns None if no ingestion with the given ID is
611    /// found.
612    pub fn get_ingestion_description(
613        &self,
614        id: &GlobalId,
615    ) -> Option<IngestionDescription<CollectionMetadata>> {
616        if !self.active_ingestions.contains_key(id) {
617            return None;
618        }
619
620        self.history.iter().rev().find_map(|command| {
621            if let StorageCommand::RunIngestion(ingestion) = command {
622                if &ingestion.id == id {
623                    Some(ingestion.description.clone())
624                } else {
625                    None
626                }
627            } else {
628                None
629            }
630        })
631    }
632
633    /// Returns the export description for the given export ID, if it
634    /// exists.
635    ///
636    /// This function searches through the command history to find the most
637    /// recent RunSinkCommand for the specified export ID and returns
638    /// its description.  Returns None if no ingestion with the given ID is
639    /// found.
640    pub fn get_export_description(
641        &self,
642        id: &GlobalId,
643    ) -> Option<StorageSinkDesc<CollectionMetadata, T>> {
644        if !self.active_exports.contains_key(id) {
645            return None;
646        }
647
648        self.history.iter().rev().find_map(|command| {
649            if let StorageCommand::RunSink(sink) = command {
650                if &sink.id == id {
651                    Some(sink.description.clone())
652                } else {
653                    None
654                }
655            } else {
656                None
657            }
658        })
659    }
660
661    /// Updates internal state based on incoming compaction commands.
662    fn absorb_compaction(&mut self, id: GlobalId, frontier: Antichain<T>) {
663        tracing::debug!(?self.active_ingestions, ?id, ?frontier, "allow_compaction");
664
665        if frontier.is_empty() {
666            self.active_ingestions.remove(&id);
667            self.ingestion_exports.remove(&id);
668            self.active_exports.remove(&id);
669        }
670    }
671
672    /// Returns the replicas that are actively running the given object (ingestion or export).
673    fn active_replicas(&mut self, id: &GlobalId) -> Box<dyn Iterator<Item = &mut Replica<T>> + '_> {
674        if let Some(ingestion_id) = self.ingestion_exports.get(id) {
675            match self.active_ingestions.get(ingestion_id) {
676                Some(ingestion) => Box::new(self.replicas.iter_mut().filter_map(
677                    move |(replica_id, replica)| {
678                        if ingestion.active_replicas.contains(replica_id) {
679                            Some(replica)
680                        } else {
681                            None
682                        }
683                    },
684                )),
685                None => {
686                    // The ingestion has already been compacted away (aka. stopped).
687                    Box::new(std::iter::empty())
688                }
689            }
690        } else if let Some(export) = self.active_exports.get(id) {
691            Box::new(
692                self.replicas
693                    .iter_mut()
694                    .filter_map(move |(replica_id, replica)| {
695                        if export.active_replicas.contains(replica_id) {
696                            Some(replica)
697                        } else {
698                            None
699                        }
700                    }),
701            )
702        } else {
703            Box::new(self.replicas.values_mut())
704        }
705    }
706
707    /// Returns whether the given replica is actively running the given object (ingestion or export).
708    fn is_active_replica(&self, id: &GlobalId, replica_id: &ReplicaId) -> bool {
709        if let Some(ingestion_id) = self.ingestion_exports.get(id) {
710            match self.active_ingestions.get(ingestion_id) {
711                Some(ingestion) => ingestion.active_replicas.contains(replica_id),
712                None => {
713                    // The ingestion has already been compacted away (aka. stopped).
714                    false
715                }
716            }
717        } else if let Some(export) = self.active_exports.get(id) {
718            export.active_replicas.contains(replica_id)
719        } else {
720            // For non-ingestion objects, all replicas are active
721            true
722        }
723    }
724
725    /// Refresh the controller state metrics for this instance.
726    ///
727    /// We could also do state metric updates directly in response to state changes, but that would
728    /// mean littering the code with metric update calls. Encapsulating state metric maintenance in
729    /// a single method is less noisy.
730    ///
731    /// This method is invoked by `Controller::maintain`, which we expect to be called once per
732    /// second during normal operation.
733    pub(super) fn refresh_state_metrics(&self) {
734        let connected_replica_count = self.replicas.values().filter(|r| r.is_connected()).count();
735
736        self.metrics
737            .connected_replica_count
738            .set(u64::cast_from(connected_replica_count));
739    }
740
741    /// Returns the set of replica IDs that are actively running the given
742    /// object (ingestion, ingestion export (aka. subsource), or export).
743    pub fn get_active_replicas_for_object(&self, id: &GlobalId) -> BTreeSet<ReplicaId> {
744        if let Some(ingestion_id) = self.ingestion_exports.get(id) {
745            // Right now, only ingestions can have per-replica scheduling decisions.
746            match self.active_ingestions.get(ingestion_id) {
747                Some(ingestion) => ingestion.active_replicas.clone(),
748                None => {
749                    // The ingestion has already been compacted away (aka. stopped).
750                    BTreeSet::new()
751                }
752            }
753        } else {
754            // For non-ingestion objects, all replicas are active
755            self.replicas.keys().copied().collect()
756        }
757    }
758}
759
760/// Replica-specific configuration.
761#[derive(Clone, Debug)]
762pub(super) struct ReplicaConfig {
763    pub build_info: &'static BuildInfo,
764    pub location: ClusterReplicaLocation,
765    pub grpc_client: GrpcClientParameters,
766}
767
768/// State maintained about individual replicas.
769#[derive(Debug)]
770pub struct Replica<T> {
771    /// Replica configuration.
772    config: ReplicaConfig,
773    /// A sender for commands for the replica.
774    ///
775    /// If sending to this channel fails, the replica has failed and requires
776    /// rehydration.
777    command_tx: mpsc::UnboundedSender<StorageCommand<T>>,
778    /// A handle to the task that aborts it when the replica is dropped.
779    task: AbortOnDropHandle<()>,
780    /// Flag reporting whether the replica connection has been established.
781    connected: Arc<AtomicBool>,
782}
783
784impl<T> Replica<T>
785where
786    T: Timestamp + Lattice,
787    StorageGrpcClient: StorageClient<T>,
788{
789    /// Creates a new [`Replica`].
790    fn new(
791        id: ReplicaId,
792        config: ReplicaConfig,
793        epoch: ClusterStartupEpoch,
794        metrics: ReplicaMetrics,
795        dyncfg: Arc<ConfigSet>,
796        response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
797    ) -> Self {
798        let (command_tx, command_rx) = mpsc::unbounded_channel();
799        let connected = Arc::new(AtomicBool::new(false));
800
801        let task = mz_ore::task::spawn(
802            || "storage-replica-{id}",
803            ReplicaTask {
804                replica_id: id,
805                config: config.clone(),
806                epoch,
807                metrics: metrics.clone(),
808                connected: Arc::clone(&connected),
809                command_rx,
810                response_tx,
811                dyncfg,
812            }
813            .run(),
814        );
815
816        Self {
817            config,
818            command_tx,
819            task: task.abort_on_drop(),
820            connected,
821        }
822    }
823
824    /// Sends a command to the replica.
825    fn send(&self, command: StorageCommand<T>) {
826        // Send failures ignored, we'll check for failed replicas separately.
827        let _ = self.command_tx.send(command);
828    }
829
830    /// Determine if this replica has failed. This is true if the replica
831    /// task has terminated.
832    fn failed(&self) -> bool {
833        self.task.is_finished()
834    }
835
836    /// Determine if the replica connection has been established.
837    pub(super) fn is_connected(&self) -> bool {
838        self.connected.load(atomic::Ordering::Relaxed)
839    }
840}
841
842type ReplicaClient<T> = Partitioned<StorageGrpcClient, StorageCommand<T>, StorageResponse<T>>;
843
844/// A task handling communication with a replica.
845struct ReplicaTask<T> {
846    /// The ID of the replica.
847    replica_id: ReplicaId,
848    /// Replica configuration.
849    config: ReplicaConfig,
850    /// The epoch identifying this incarnation of the replica.
851    epoch: ClusterStartupEpoch,
852    /// Replica metrics.
853    metrics: ReplicaMetrics,
854    /// Flag to report successful replica connection.
855    connected: Arc<AtomicBool>,
856    /// A channel upon which commands intended for the replica are delivered.
857    command_rx: mpsc::UnboundedReceiver<StorageCommand<T>>,
858    /// A channel upon which responses from the replica are delivered.
859    response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
860    /// Dynamic system configuration.
861    dyncfg: Arc<ConfigSet>,
862}
863
864impl<T> ReplicaTask<T>
865where
866    T: Timestamp + Lattice,
867    StorageGrpcClient: StorageClient<T>,
868{
869    /// Runs the replica task.
870    async fn run(self) {
871        let replica_id = self.replica_id;
872        info!(%replica_id, "starting replica task");
873
874        let client = self.connect().await;
875        match self.run_message_loop(client).await {
876            Ok(()) => info!(%replica_id, "stopped replica task"),
877            Err(error) => warn!(%replica_id, %error, "replica task failed"),
878        }
879    }
880
881    /// Connects to the replica.
882    ///
883    /// The connection is retried forever (with backoff) and this method returns only after
884    /// a connection was successfully established.
885    async fn connect(&self) -> ReplicaClient<T> {
886        let try_connect = |retry: RetryState| {
887            let addrs = &self.config.location.ctl_addrs;
888            let dests = addrs
889                .iter()
890                .map(|addr| (addr.clone(), self.metrics.clone()))
891                .collect();
892            let version = self.config.build_info.semver_version();
893            let client_params = &self.config.grpc_client;
894
895            async move {
896                let connect_start = Instant::now();
897                let connect_result =
898                    StorageGrpcClient::connect_partitioned(dests, version, client_params).await;
899                self.metrics.observe_connect_time(connect_start.elapsed());
900
901                connect_result.inspect_err(|error| {
902                    let next_backoff = retry.next_backoff.unwrap();
903                    if retry.i >= mz_service::retry::INFO_MIN_RETRIES {
904                        info!(
905                            replica_id = %self.replica_id, ?next_backoff,
906                            "error connecting to replica: {error:#}",
907                        );
908                    } else {
909                        debug!(
910                            replica_id = %self.replica_id, ?next_backoff,
911                            "error connecting to replica: {error:#}",
912                        );
913                    }
914                })
915            }
916        };
917
918        let client = Retry::default()
919            .clamp_backoff(Duration::from_secs(1))
920            .retry_async(try_connect)
921            .await
922            .expect("retries forever");
923
924        self.metrics.observe_connect();
925        self.connected.store(true, atomic::Ordering::Relaxed);
926
927        client
928    }
929
930    /// Runs the message loop.
931    ///
932    /// Returns (with an `Err`) if it encounters an error condition (e.g. the replica disconnects).
933    /// If no error condition is encountered, the task runs until the controller disconnects from
934    /// the command channel, or the task is dropped.
935    async fn run_message_loop(mut self, mut client: ReplicaClient<T>) -> Result<(), anyhow::Error> {
936        loop {
937            select! {
938                // Command from controller to forward to replica.
939                // `tokio::sync::mpsc::UnboundedReceiver::recv` is documented as cancel safe.
940                command = self.command_rx.recv() => {
941                    let Some(mut command) = command else {
942                        tracing::debug!(%self.replica_id, "controller is no longer interested in this replica, shutting down message loop");
943                        break;
944                    };
945
946                    self.specialize_command(&mut command);
947                    client.send(command).await?;
948                },
949                // Response from replica to forward to controller.
950                // `GenericClient::recv` implementations are required to be cancel safe.
951                response = client.recv() => {
952                    let Some(response) = response? else {
953                        bail!("replica unexpectedly gracefully terminated connection");
954                    };
955
956                    if self.response_tx.send((Some(self.replica_id), response)).is_err() {
957                        tracing::debug!(%self.replica_id, "controller (receiver) is no longer interested in this replica, shutting down message loop");
958                        break;
959                    }
960                }
961            }
962        }
963
964        Ok(())
965    }
966
967    /// Specialize a command for the given replica configuration.
968    ///
969    /// Most [`StorageCommand`]s are independent of the target replica, but some contain
970    /// replica-specific fields that must be adjusted before sending.
971    fn specialize_command(&self, command: &mut StorageCommand<T>) {
972        if let StorageCommand::CreateTimely { config, epoch } = command {
973            **config = TimelyConfig {
974                workers: self.config.location.workers,
975                // Overridden by the storage `PartitionedState` implementation.
976                process: 0,
977                addresses: self.config.location.dataflow_addrs.clone(),
978                // This value is not currently used by storage, so we just choose
979                // some identifiable value.
980                arrangement_exert_proportionality: 1337,
981                // Disable zero-copy by default.
982                // TODO: Bring in line with compute.
983                enable_zero_copy: false,
984                // Do not use lgalloc to back zero-copy memory.
985                enable_zero_copy_lgalloc: false,
986                // No limit; zero-copy is disabled.
987                zero_copy_limit: None,
988                enable_create_sockets_v2: ENABLE_CREATE_SOCKETS_V2.get(&self.dyncfg),
989            };
990            *epoch = self.epoch;
991        }
992    }
993}