Skip to main content

mz_controller/
replica_http_locator.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Tracks HTTP addresses for cluster replica processes.
11//!
12//! This module provides the [`ReplicaHttpLocator`] which maintains an in-memory
13//! mapping of cluster replica HTTP addresses. This is used by environmentd to
14//! proxy HTTP requests to clusterd internal endpoints without requiring
15//! direct network access to the clusterd pods.
16
17use std::collections::BTreeMap;
18use std::sync::RwLock;
19
20use mz_controller_types::{ClusterId, ReplicaId};
21
22/// Tracks HTTP addresses for cluster replica processes.
23#[derive(Debug, Default)]
24pub struct ReplicaHttpLocator {
25    /// Maps (cluster_id, replica_id) to a list of process HTTP addresses.
26    replica_addresses: RwLock<BTreeMap<(ClusterId, ReplicaId), Vec<String>>>,
27}
28
29impl ReplicaHttpLocator {
30    /// Returns the HTTP address for a specific process of a replica.
31    ///
32    /// Returns `None` if the replica is not found, the process index is out of
33    /// bounds, or the addresses are not yet available.
34    pub fn get_http_addr(
35        &self,
36        cluster_id: ClusterId,
37        replica_id: ReplicaId,
38        process: usize,
39    ) -> Option<String> {
40        let guard = self.replica_addresses.read().expect("lock poisoned");
41        let addrs = guard.get(&(cluster_id, replica_id))?;
42        addrs.get(process).cloned()
43    }
44
45    /// Registers a service for a replica.
46    ///
47    /// Called by the controller when a managed replica is provisioned.
48    pub fn register_replica(
49        &self,
50        cluster_id: ClusterId,
51        replica_id: ReplicaId,
52        addresses: Vec<String>,
53    ) {
54        let mut guard = self.replica_addresses.write().expect("lock poisoned");
55        guard.insert((cluster_id, replica_id), addresses);
56    }
57
58    /// Removes a replica from the locator.
59    ///
60    /// Called by the controller when a replica is dropped.
61    pub(crate) fn remove_replica(&self, cluster_id: ClusterId, replica_id: ReplicaId) {
62        let mut guard = self.replica_addresses.write().expect("lock poisoned");
63        guard.remove(&(cluster_id, replica_id));
64    }
65}