mz_controller/replica_http_locator.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Tracks HTTP addresses for cluster replica processes.
11//!
12//! This module provides the [`ReplicaHttpLocator`] which maintains an in-memory
13//! mapping of cluster replica HTTP addresses. This is used by environmentd to
14//! proxy HTTP requests to clusterd internal endpoints without requiring
15//! direct network access to the clusterd pods.
16//!
17//! The reason for this to exist is that the process orchestrator with
18//! HTTP-to-domain socket proxies only lazily starts its proxies, meaning
19//! we don't know the ports to forward to until after the replica is started.
20
21use std::collections::BTreeMap;
22use std::sync::{Arc, RwLock};
23
24use mz_controller_types::{ClusterId, ReplicaId};
25use mz_orchestrator::Service;
26
27/// Tracks HTTP addresses for cluster replica processes.
28#[derive(Default)]
29pub struct ReplicaHttpLocator {
30 /// Maps (cluster_id, replica_id) -> Service reference.
31 /// We store the Service and call tcp_addresses() lazily.
32 services: RwLock<BTreeMap<(ClusterId, ReplicaId), Arc<dyn Service>>>,
33}
34
35impl std::fmt::Debug for ReplicaHttpLocator {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 f.debug_struct("ReplicaHttpLocator")
38 .field("services", &"<services>")
39 .finish()
40 }
41}
42
43impl ReplicaHttpLocator {
44 /// Returns the HTTP address for a specific process of a replica.
45 ///
46 /// Returns `None` if the replica is not found, the process index is out of
47 /// bounds, or the addresses are not yet available.
48 pub fn get_http_addr(
49 &self,
50 cluster_id: ClusterId,
51 replica_id: ReplicaId,
52 process: usize,
53 ) -> Option<String> {
54 let guard = self.services.read().expect("lock poisoned");
55 let service = guard.get(&(cluster_id, replica_id))?;
56 let addrs = service.tcp_addresses("internal-http");
57 addrs.get(process).cloned()
58 }
59
60 /// Registers a service for a replica.
61 ///
62 /// Called by the controller when a managed replica is provisioned.
63 pub fn register_replica(
64 &self,
65 cluster_id: ClusterId,
66 replica_id: ReplicaId,
67 service: Arc<dyn Service>,
68 ) {
69 let mut guard = self.services.write().expect("lock poisoned");
70 guard.insert((cluster_id, replica_id), service);
71 }
72
73 /// Removes a replica from the locator.
74 ///
75 /// Called by the controller when a replica is dropped.
76 pub(crate) fn remove_replica(&self, cluster_id: ClusterId, replica_id: ReplicaId) {
77 let mut guard = self.services.write().expect("lock poisoned");
78 guard.remove(&(cluster_id, replica_id));
79 }
80}