Skip to main content

mz_controller/
replica_http_locator.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Tracks HTTP addresses for cluster replica processes.
11//!
12//! This module provides the [`ReplicaHttpLocator`] which maintains an in-memory
13//! mapping of cluster replica HTTP addresses. This is used by environmentd to
14//! proxy HTTP requests to clusterd internal endpoints without requiring
15//! direct network access to the clusterd pods.
16//!
17//! The reason for this to exist is that the process orchestrator with
18//! HTTP-to-domain socket proxies only lazily starts its proxies, meaning
19//! we don't know the ports to forward to until after the replica is started.
20
21use std::collections::BTreeMap;
22use std::sync::{Arc, RwLock};
23
24use mz_controller_types::{ClusterId, ReplicaId};
25use mz_orchestrator::Service;
26
27/// Tracks HTTP addresses for cluster replica processes.
28#[derive(Default)]
29pub struct ReplicaHttpLocator {
30    /// Maps (cluster_id, replica_id) -> Service reference.
31    /// We store the Service and call tcp_addresses() lazily.
32    services: RwLock<BTreeMap<(ClusterId, ReplicaId), Arc<dyn Service>>>,
33}
34
35impl std::fmt::Debug for ReplicaHttpLocator {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        f.debug_struct("ReplicaHttpLocator")
38            .field("services", &"<services>")
39            .finish()
40    }
41}
42
43impl ReplicaHttpLocator {
44    /// Returns the HTTP address for a specific process of a replica.
45    ///
46    /// Returns `None` if the replica is not found, the process index is out of
47    /// bounds, or the addresses are not yet available.
48    pub fn get_http_addr(
49        &self,
50        cluster_id: ClusterId,
51        replica_id: ReplicaId,
52        process: usize,
53    ) -> Option<String> {
54        let guard = self.services.read().expect("lock poisoned");
55        let service = guard.get(&(cluster_id, replica_id))?;
56        let addrs = service.tcp_addresses("internal-http");
57        addrs.get(process).cloned()
58    }
59
60    /// Registers a service for a replica.
61    ///
62    /// Called by the controller when a managed replica is provisioned.
63    pub fn register_replica(
64        &self,
65        cluster_id: ClusterId,
66        replica_id: ReplicaId,
67        service: Arc<dyn Service>,
68    ) {
69        let mut guard = self.services.write().expect("lock poisoned");
70        guard.insert((cluster_id, replica_id), service);
71    }
72
73    /// Removes a replica from the locator.
74    ///
75    /// Called by the controller when a replica is dropped.
76    pub(crate) fn remove_replica(&self, cluster_id: ClusterId, replica_id: ReplicaId) {
77        let mut guard = self.services.write().expect("lock poisoned");
78        guard.remove(&(cluster_id, replica_id));
79    }
80}