mz_debug/
kubectl_port_forwarder.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Port forwards k8s service via Kubectl
17
18use anyhow::{Context, Result};
19use k8s_openapi::api::apps::v1::StatefulSet;
20use k8s_openapi::api::core::v1::{Service, ServicePort};
21use kube::api::ListParams;
22use kube::{Api, Client};
23use tokio::io::AsyncBufReadExt;
24
25use tracing::info;
26
27#[derive(Debug)]
28pub struct KubectlPortForwarder {
29    pub namespace: String,
30    pub service_name: String,
31    pub target_port: i32,
32    pub context: Option<String>,
33}
34
35pub struct PortForwardConnection {
36    // tokio process that's killed on drop
37    pub _port_forward_process: tokio::process::Child,
38    // We need to keep the lines otherwise the process will be killed when new lines
39    // are added to the stdout.
40    pub _lines: tokio::io::Lines<tokio::io::BufReader<tokio::process::ChildStdout>>,
41    // The local address and port that the port forward is established on
42    pub local_address: String,
43    pub local_port: i32,
44}
45
46impl KubectlPortForwarder {
47    /// Spawns a port forwarding process that resolves when
48    /// the port forward is established.
49    pub async fn spawn_port_forward(&self) -> Result<PortForwardConnection, anyhow::Error> {
50        let port_arg_str = format!(":{}", &self.target_port);
51        let service_name_arg_str = format!("services/{}", &self.service_name);
52        let mut args = vec![
53            "port-forward",
54            &service_name_arg_str,
55            &port_arg_str,
56            "-n",
57            &self.namespace,
58        ];
59
60        if let Some(k8s_context) = &self.context {
61            args.extend(["--context", k8s_context]);
62        }
63
64        let child = tokio::process::Command::new("kubectl")
65            .args(args)
66            // Silence stderr
67            .stdout(std::process::Stdio::piped())
68            .stderr(std::process::Stdio::null())
69            .kill_on_drop(true)
70            .spawn();
71
72        if let Ok(mut child) = child {
73            if let Some(stdout) = child.stdout.take() {
74                let stdout_reader = tokio::io::BufReader::new(stdout);
75                let mut lines = stdout_reader.lines();
76                let mut local_address = None;
77                let mut local_port = None;
78                let local_address_and_port_regex =
79                    regex::Regex::new(r"Forwarding from ([^:]+):(\d+)")?;
80
81                // Wait until we know port forwarding is established
82                let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async {
83                    // kubectl-port-forward output looks like:
84                    // ```
85                    // Forwarding from 127.0.0.1:6875 -> 6875
86                    // Forwarding from [::1]:6875 -> 6875
87                    // ```
88                    // We want to extract the local address and port from the first line.
89                    while let Ok(Some(line)) = lines.next_line().await {
90                        if let Some(captures) = local_address_and_port_regex.captures(&line) {
91                            local_address = Some(captures[1].to_string());
92                            local_port = captures[2].parse::<i32>().ok();
93                            break;
94                        }
95                    }
96                })
97                .await;
98
99                if timeout.is_err() {
100                    return Err(anyhow::anyhow!("Port forwarding timed out after 5 seconds"));
101                }
102
103                if let (Some(local_address), Some(local_port)) = (local_address, local_port) {
104                    info!(
105                        "Port forwarding established for {} from ports {}:{} -> {}",
106                        &self.service_name, local_address, local_port, &self.target_port
107                    );
108                    return Ok(PortForwardConnection {
109                        _lines: lines,
110                        _port_forward_process: child,
111                        local_address,
112                        local_port,
113                    });
114                } else {
115                    return Err(anyhow::anyhow!(
116                        "Failed to extract local address and port from kubectl-port-forward output"
117                    ));
118                }
119            }
120        }
121        Err(anyhow::anyhow!("Failed to spawn port forwarding process"))
122    }
123}
124
125#[derive(Debug)]
126pub struct ServiceInfo {
127    pub service_name: String,
128    pub service_ports: Vec<ServicePort>,
129    pub namespace: String,
130}
131
132/// Returns ServiceInfo for balancerd
133pub async fn find_environmentd_service(
134    client: &Client,
135    k8s_namespace: &String,
136    mz_instance_name: &String,
137) -> Result<ServiceInfo> {
138    let services_api: Api<Service> = Api::namespaced(client.clone(), k8s_namespace);
139
140    let label_filter = format!(
141        // mz-resource-id is used to identify environmentd services
142        "materialize.cloud/mz-resource-id,materialize.cloud/organization-name={}",
143        mz_instance_name
144    );
145
146    let services = services_api
147        .list(&ListParams::default().labels(&label_filter))
148        .await
149        .with_context(|| format!("Failed to list services in namespace {}", k8s_namespace))?;
150
151    // Find the first sql service that contains environmentd
152    let maybe_service =
153        services
154            .iter()
155            .find_map(|service| match (&service.metadata.name, &service.spec) {
156                (Some(service_name), Some(spec)) => {
157                    // TODO (debug_tool3): This could match both the generation service and the globally active one. We should use the active one.
158                    if !service_name.to_lowercase().contains("environmentd") {
159                        return None;
160                    }
161
162                    if let Some(ports) = &spec.ports {
163                        Some(ServiceInfo {
164                            service_name: service_name.clone(),
165                            service_ports: ports.clone(),
166                            namespace: k8s_namespace.clone(),
167                        })
168                    } else {
169                        None
170                    }
171                }
172                _ => None,
173            });
174
175    if let Some(service) = maybe_service {
176        return Ok(service);
177    }
178
179    Err(anyhow::anyhow!("Could not find environmentd service"))
180}
181
182/// Returns Vec<(service_name, ports)> for cluster services
183pub async fn find_cluster_services(
184    client: &Client,
185    k8s_namespace: &String,
186    mz_instance_name: &String,
187) -> Result<Vec<ServiceInfo>> {
188    let services: Api<Service> = Api::namespaced(client.clone(), k8s_namespace);
189    let services = services
190        .list(&ListParams::default())
191        .await
192        .with_context(|| format!("Failed to list services in namespace {}", k8s_namespace))?;
193
194    let statefulsets_api: Api<StatefulSet> = Api::namespaced(client.clone(), k8s_namespace);
195
196    let organization_name_filter =
197        format!("materialize.cloud/organization-name={}", mz_instance_name);
198
199    let statefulsets = statefulsets_api
200        .list(&ListParams::default().labels(&organization_name_filter))
201        .await
202        .with_context(|| format!("Failed to list services in namespace {}", k8s_namespace))?;
203
204    let cluster_services: Vec<ServiceInfo> = services
205        .iter()
206        .filter_map(|service| {
207            let name = service.metadata.name.clone()?;
208            let spec = service.spec.clone()?;
209            let selector = spec.selector?;
210            let ports = spec.ports?;
211
212            // Check if this is a cluster service
213            if selector.get("environmentd.materialize.cloud/namespace")? != "cluster" {
214                return None;
215            }
216
217            // Check if the owner reference points to environmentd StatefulSet in the same mz instance
218            let envd_statefulset_reference_name = service
219                .metadata
220                .owner_references
221                .as_ref()?
222                .iter()
223                //  There should only be one StatefulSet reference to environmentd
224                .find(|owner_reference| owner_reference.kind == "StatefulSet")?
225                .name
226                .clone();
227
228            if !statefulsets
229                .iter()
230                .filter_map(|statefulset| statefulset.metadata.name.clone())
231                .any(|name| name == envd_statefulset_reference_name)
232            {
233                return None;
234            }
235
236            Some(ServiceInfo {
237                service_name: name,
238                service_ports: ports,
239                namespace: k8s_namespace.clone(),
240            })
241        })
242        .collect();
243
244    if !cluster_services.is_empty() {
245        return Ok(cluster_services);
246    }
247
248    Err(anyhow::anyhow!("Could not find cluster services"))
249}
250
251/// Creates a port forwarder for the external pg wire port of environmentd.
252pub async fn create_pg_wire_port_forwarder(
253    client: &Client,
254    k8s_context: &Option<String>,
255    k8s_namespace: &String,
256    mz_instance_name: &String,
257) -> Result<KubectlPortForwarder> {
258    let service_info = find_environmentd_service(client, k8s_namespace, mz_instance_name)
259        .await
260        .with_context(|| "Cannot find ports for environmentd service")?;
261
262    let maybe_external_sql_port = service_info.service_ports.iter().find_map(|port_info| {
263        if let Some(port_name) = &port_info.name {
264            let port_name = port_name.to_lowercase();
265            if port_name == "sql" {
266                return Some(port_info);
267            }
268        }
269        None
270    });
271
272    if let Some(external_sql_port) = maybe_external_sql_port {
273        Ok(KubectlPortForwarder {
274            context: k8s_context.clone(),
275            namespace: service_info.namespace,
276            service_name: service_info.service_name,
277            target_port: external_sql_port.port,
278        })
279    } else {
280        Err(anyhow::anyhow!(
281            "No SQL port forwarding info found. Set --mz-connection-url to a Materialize instance."
282        ))
283    }
284}