mz_debug/
kubectl_port_forwarder.rs
1use anyhow::Result;
19use k8s_openapi::api::core::v1::Service;
20use kube::api::ListParams;
21use kube::{Api, Client};
22
23use std::time::Duration;
24
25use mz_ore::retry::{self, RetryResult};
26use tracing::{error, info};
27
28use crate::SelfManagedDebugMode;
29#[derive(Debug, Clone)]
30pub struct KubectlPortForwarder {
31 pub namespace: String,
32 pub service_name: String,
33 pub target_port: i32,
34 pub local_address: String,
35 pub local_port: i32,
36 pub context: Option<String>,
37}
38
39impl KubectlPortForwarder {
40 pub async fn port_forward(&self) {
45 if let Err(err) = retry::Retry::default()
46 .max_duration(Duration::from_secs(60))
47 .retry_async(|retry_state| {
48 let k8s_context = self.context.clone();
49 let namespace = self.namespace.clone();
50 let service_name = self.service_name.clone();
51 let local_address = self.local_address.clone();
52 let local_port = self.local_port;
53 let target_port = self.target_port;
54
55 info!(
56 "Spawning port forwarding process for {} from ports {}:{} -> {}",
57 service_name, local_address, local_port, target_port
58 );
59
60 async move {
61 let port_arg_str = format!("{}:{}", &local_port, &target_port);
62 let service_name_arg_str = format!("services/{}", &service_name);
63 let mut args = vec![
64 "port-forward",
65 &service_name_arg_str,
66 &port_arg_str,
67 "-n",
68 &namespace,
69 "--address",
70 &local_address,
71 ];
72
73 if let Some(k8s_context) = &k8s_context {
74 args.extend(["--context", k8s_context]);
75 }
76
77 match tokio::process::Command::new("kubectl")
78 .args(args)
79 .stdout(std::process::Stdio::null())
81 .stderr(std::process::Stdio::null())
82 .kill_on_drop(true)
83 .output()
84 .await
85 {
86 Ok(output) => {
87 if !output.status.success() {
88 let retry_err_msg = format!(
89 "Failed to port-forward{}: {}",
90 retry_state.next_backoff.map_or_else(
91 || "".to_string(),
92 |d| format!(", retrying in {:?}", d)
93 ),
94 String::from_utf8_lossy(&output.stderr)
95 );
96 error!("{}", retry_err_msg);
97
98 return RetryResult::RetryableErr(anyhow::anyhow!(retry_err_msg));
99 }
100 }
101 Err(err) => {
102 return RetryResult::RetryableErr(anyhow::anyhow!(
103 "Failed to port-forward: {}",
104 err
105 ));
106 }
107 }
108 RetryResult::Ok(())
112 }
113 })
114 .await
115 {
116 error!("{}", err);
117 }
118 }
119}
120
121pub async fn create_kubectl_port_forwarder(
122 client: &Client,
123 args: &SelfManagedDebugMode,
124) -> Result<KubectlPortForwarder, anyhow::Error> {
125 for namespace in &args.k8s_namespaces {
126 let services: Api<Service> = Api::namespaced(client.clone(), namespace);
127 let services = services
128 .list(&ListParams::default().labels("materialize.cloud/mz-resource-id"))
129 .await?;
130 let maybe_port_info = services
132 .iter()
133 .filter_map(|service| {
134 let spec = service.spec.as_ref()?;
135 let service_name = service.metadata.name.as_ref()?;
136 if !service_name.to_lowercase().contains("balancerd") {
137 return None;
138 }
139 Some((spec, service_name))
140 })
141 .flat_map(|(spec, service_name)| {
142 spec.ports
143 .iter()
144 .flatten()
145 .map(move |port| (port, service_name))
146 })
147 .find_map(|(port_info, service_name)| {
148 if let Some(port_name) = &port_info.name {
149 if port_name.to_lowercase().contains("pgwire") {
151 return Some(KubectlPortForwarder {
152 context: args.k8s_context.clone(),
153 namespace: namespace.clone(),
154 service_name: service_name.to_owned(),
155 target_port: port_info.port,
156 local_address: args.port_forward_local_address.clone(),
157 local_port: args.port_forward_local_port,
158 });
159 }
160 }
161
162 None
163 });
164
165 if let Some(port_info) = maybe_port_info {
166 return Ok(port_info);
167 }
168 }
169
170 Err(anyhow::anyhow!(
171 "No SQL port forwarding info found. Set --auto-port-forward to false and point --mz-connection-url to a Materialize instance."
172 ))
173}
174
175pub fn create_mz_connection_url(
176 local_address: String,
177 local_port: i32,
178 connection_url_override: Option<String>,
179) -> String {
180 if let Some(connection_url_override) = connection_url_override {
181 return connection_url_override;
182 }
183 format!("postgres://{}:{}?sslmode=prefer", local_address, local_port)
184}