mz_debug/
kubectl_port_forwarder.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Port forwards k8s service via Kubectl
17
18use anyhow::Result;
19use k8s_openapi::api::core::v1::Service;
20use kube::api::ListParams;
21use kube::{Api, Client};
22
23use std::time::Duration;
24
25use mz_ore::retry::{self, RetryResult};
26use tracing::{error, info};
27
28use crate::SelfManagedDebugMode;
29#[derive(Debug, Clone)]
30pub struct KubectlPortForwarder {
31    pub namespace: String,
32    pub service_name: String,
33    pub target_port: i32,
34    pub local_address: String,
35    pub local_port: i32,
36    pub context: Option<String>,
37}
38
39impl KubectlPortForwarder {
40    /// Port forwards a given k8s service via Kubectl.
41    /// The process will retry if the port-forwarding fails and
42    /// will terminate once the port forwarding reaches the max number of retries.
43    /// We retry since kubectl port-forward is flaky.
44    pub async fn port_forward(&self) {
45        if let Err(err) = retry::Retry::default()
46            .max_duration(Duration::from_secs(60))
47            .retry_async(|retry_state| {
48                let k8s_context = self.context.clone();
49                let namespace = self.namespace.clone();
50                let service_name = self.service_name.clone();
51                let local_address = self.local_address.clone();
52                let local_port = self.local_port;
53                let target_port = self.target_port;
54
55                info!(
56                    "Spawning port forwarding process for {} from ports {}:{} -> {}",
57                    service_name, local_address, local_port, target_port
58                );
59
60                async move {
61                    let port_arg_str = format!("{}:{}", &local_port, &target_port);
62                    let service_name_arg_str = format!("services/{}", &service_name);
63                    let mut args = vec![
64                        "port-forward",
65                        &service_name_arg_str,
66                        &port_arg_str,
67                        "-n",
68                        &namespace,
69                        "--address",
70                        &local_address,
71                    ];
72
73                    if let Some(k8s_context) = &k8s_context {
74                        args.extend(["--context", k8s_context]);
75                    }
76
77                    match tokio::process::Command::new("kubectl")
78                        .args(args)
79                        // Silence stdout/stderr
80                        .stdout(std::process::Stdio::null())
81                        .stderr(std::process::Stdio::null())
82                        .kill_on_drop(true)
83                        .output()
84                        .await
85                    {
86                        Ok(output) => {
87                            if !output.status.success() {
88                                let retry_err_msg = format!(
89                                    "Failed to port-forward{}: {}",
90                                    retry_state.next_backoff.map_or_else(
91                                        || "".to_string(),
92                                        |d| format!(", retrying in {:?}", d)
93                                    ),
94                                    String::from_utf8_lossy(&output.stderr)
95                                );
96                                error!("{}", retry_err_msg);
97
98                                return RetryResult::RetryableErr(anyhow::anyhow!(retry_err_msg));
99                            }
100                        }
101                        Err(err) => {
102                            return RetryResult::RetryableErr(anyhow::anyhow!(
103                                "Failed to port-forward: {}",
104                                err
105                            ));
106                        }
107                    }
108                    // The kubectl subprocess's future will only resolve on error, thus the
109                    // code here is unreachable. We return RetryResult::Ok to satisfy
110                    // the type checker.
111                    RetryResult::Ok(())
112                }
113            })
114            .await
115        {
116            error!("{}", err);
117        }
118    }
119}
120
121pub async fn create_kubectl_port_forwarder(
122    client: &Client,
123    args: &SelfManagedDebugMode,
124) -> Result<KubectlPortForwarder, anyhow::Error> {
125    for namespace in &args.k8s_namespaces {
126        let services: Api<Service> = Api::namespaced(client.clone(), namespace);
127        let services = services
128            .list(&ListParams::default().labels("materialize.cloud/mz-resource-id"))
129            .await?;
130        // Finds the sql service that contains a port with name "sql"
131        let maybe_port_info = services
132            .iter()
133            .filter_map(|service| {
134                let spec = service.spec.as_ref()?;
135                let service_name = service.metadata.name.as_ref()?;
136                if !service_name.to_lowercase().contains("balancerd") {
137                    return None;
138                }
139                Some((spec, service_name))
140            })
141            .flat_map(|(spec, service_name)| {
142                spec.ports
143                    .iter()
144                    .flatten()
145                    .map(move |port| (port, service_name))
146            })
147            .find_map(|(port_info, service_name)| {
148                if let Some(port_name) = &port_info.name {
149                    // We want to find the external SQL port and not the internal one
150                    if port_name.to_lowercase().contains("pgwire") {
151                        return Some(KubectlPortForwarder {
152                            context: args.k8s_context.clone(),
153                            namespace: namespace.clone(),
154                            service_name: service_name.to_owned(),
155                            target_port: port_info.port,
156                            local_address: args.port_forward_local_address.clone(),
157                            local_port: args.port_forward_local_port,
158                        });
159                    }
160                }
161
162                None
163            });
164
165        if let Some(port_info) = maybe_port_info {
166            return Ok(port_info);
167        }
168    }
169
170    Err(anyhow::anyhow!(
171        "No SQL port forwarding info found. Set --auto-port-forward to false and point --mz-connection-url to a Materialize instance."
172    ))
173}
174
175pub fn create_mz_connection_url(
176    local_address: String,
177    local_port: i32,
178    connection_url_override: Option<String>,
179) -> String {
180    if let Some(connection_url_override) = connection_url_override {
181        return connection_url_override;
182    }
183    format!("postgres://{}:{}?sslmode=prefer", local_address, local_port)
184}