mz_controller/replica_http_locator.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Tracks HTTP addresses for cluster replica processes.
11//!
12//! This module provides the [`ReplicaHttpLocator`] which maintains an in-memory
13//! mapping of cluster replica HTTP addresses. This is used by environmentd to
14//! proxy HTTP requests to clusterd internal endpoints without requiring
15//! direct network access to the clusterd pods.
16
17use std::collections::BTreeMap;
18use std::sync::RwLock;
19
20use mz_controller_types::{ClusterId, ReplicaId};
21
22/// Tracks HTTP addresses for cluster replica processes.
23#[derive(Debug, Default)]
24pub struct ReplicaHttpLocator {
25 /// Maps (cluster_id, replica_id) to a list of process HTTP addresses.
26 replica_addresses: RwLock<BTreeMap<(ClusterId, ReplicaId), Vec<String>>>,
27}
28
29impl ReplicaHttpLocator {
30 /// Returns the HTTP address for a specific process of a replica.
31 ///
32 /// Returns `None` if the replica is not found, the process index is out of
33 /// bounds, or the addresses are not yet available.
34 pub fn get_http_addr(
35 &self,
36 cluster_id: ClusterId,
37 replica_id: ReplicaId,
38 process: usize,
39 ) -> Option<String> {
40 let guard = self.replica_addresses.read().expect("lock poisoned");
41 let addrs = guard.get(&(cluster_id, replica_id))?;
42 addrs.get(process).cloned()
43 }
44
45 /// Registers a service for a replica.
46 ///
47 /// Called by the controller when a managed replica is provisioned.
48 pub fn register_replica(
49 &self,
50 cluster_id: ClusterId,
51 replica_id: ReplicaId,
52 addresses: Vec<String>,
53 ) {
54 let mut guard = self.replica_addresses.write().expect("lock poisoned");
55 guard.insert((cluster_id, replica_id), addresses);
56 }
57
58 /// Removes a replica from the locator.
59 ///
60 /// Called by the controller when a replica is dropped.
61 pub(crate) fn remove_replica(&self, cluster_id: ClusterId, replica_id: ReplicaId) {
62 let mut guard = self.replica_addresses.write().expect("lock poisoned");
63 guard.remove(&(cluster_id, replica_id));
64 }
65}