mz_debug/
kubectl_port_forwarder.rs1use anyhow::{Context, Result};
19use k8s_openapi::api::apps::v1::StatefulSet;
20use k8s_openapi::api::core::v1::{Service, ServicePort};
21use kube::api::ListParams;
22use kube::{Api, Client};
23use tokio::io::AsyncBufReadExt;
24
25use tracing::info;
26
27#[derive(Debug)]
28pub struct KubectlPortForwarder {
29 pub namespace: String,
30 pub service_name: String,
31 pub target_port: i32,
32 pub context: Option<String>,
33}
34
35pub struct PortForwardConnection {
36 pub _port_forward_process: tokio::process::Child,
38 pub _lines: tokio::io::Lines<tokio::io::BufReader<tokio::process::ChildStdout>>,
41 pub local_address: String,
43 pub local_port: i32,
44}
45
46impl KubectlPortForwarder {
47 pub async fn spawn_port_forward(&self) -> Result<PortForwardConnection, anyhow::Error> {
50 let port_arg_str = format!(":{}", &self.target_port);
51 let service_name_arg_str = format!("services/{}", &self.service_name);
52 let mut args = vec![
53 "port-forward",
54 &service_name_arg_str,
55 &port_arg_str,
56 "-n",
57 &self.namespace,
58 ];
59
60 if let Some(k8s_context) = &self.context {
61 args.extend(["--context", k8s_context]);
62 }
63
64 let child = tokio::process::Command::new("kubectl")
65 .args(args)
66 .stdout(std::process::Stdio::piped())
68 .stderr(std::process::Stdio::null())
69 .kill_on_drop(true)
70 .spawn();
71
72 if let Ok(mut child) = child {
73 if let Some(stdout) = child.stdout.take() {
74 let stdout_reader = tokio::io::BufReader::new(stdout);
75 let mut lines = stdout_reader.lines();
76 let mut local_address = None;
77 let mut local_port = None;
78 let local_address_and_port_regex =
79 regex::Regex::new(r"Forwarding from ([^:]+):(\d+)")?;
80
81 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async {
83 while let Ok(Some(line)) = lines.next_line().await {
90 if let Some(captures) = local_address_and_port_regex.captures(&line) {
91 local_address = Some(captures[1].to_string());
92 local_port = captures[2].parse::<i32>().ok();
93 break;
94 }
95 }
96 })
97 .await;
98
99 if timeout.is_err() {
100 return Err(anyhow::anyhow!("Port forwarding timed out after 5 seconds"));
101 }
102
103 if let (Some(local_address), Some(local_port)) = (local_address, local_port) {
104 info!(
105 "Port forwarding established for {} from ports {}:{} -> {}",
106 &self.service_name, local_address, local_port, &self.target_port
107 );
108 return Ok(PortForwardConnection {
109 _lines: lines,
110 _port_forward_process: child,
111 local_address,
112 local_port,
113 });
114 } else {
115 return Err(anyhow::anyhow!(
116 "Failed to extract local address and port from kubectl-port-forward output"
117 ));
118 }
119 }
120 }
121 Err(anyhow::anyhow!("Failed to spawn port forwarding process"))
122 }
123}
124
125#[derive(Debug)]
126pub struct ServiceInfo {
127 pub service_name: String,
128 pub service_ports: Vec<ServicePort>,
129 pub namespace: String,
130}
131
132pub async fn find_environmentd_service(
134 client: &Client,
135 k8s_namespace: &String,
136 mz_instance_name: &String,
137) -> Result<ServiceInfo> {
138 let services_api: Api<Service> = Api::namespaced(client.clone(), k8s_namespace);
139
140 let label_filter = format!(
141 "materialize.cloud/mz-resource-id,materialize.cloud/organization-name={}",
143 mz_instance_name
144 );
145
146 let services = services_api
147 .list(&ListParams::default().labels(&label_filter))
148 .await
149 .with_context(|| format!("Failed to list services in namespace {}", k8s_namespace))?;
150
151 let maybe_service =
153 services
154 .iter()
155 .find_map(|service| match (&service.metadata.name, &service.spec) {
156 (Some(service_name), Some(spec)) => {
157 if !service_name.to_lowercase().contains("environmentd") {
159 return None;
160 }
161
162 if let Some(ports) = &spec.ports {
163 Some(ServiceInfo {
164 service_name: service_name.clone(),
165 service_ports: ports.clone(),
166 namespace: k8s_namespace.clone(),
167 })
168 } else {
169 None
170 }
171 }
172 _ => None,
173 });
174
175 if let Some(service) = maybe_service {
176 return Ok(service);
177 }
178
179 Err(anyhow::anyhow!("Could not find environmentd service"))
180}
181
182pub async fn find_cluster_services(
184 client: &Client,
185 k8s_namespace: &String,
186 mz_instance_name: &String,
187) -> Result<Vec<ServiceInfo>> {
188 let services: Api<Service> = Api::namespaced(client.clone(), k8s_namespace);
189 let services = services
190 .list(&ListParams::default())
191 .await
192 .with_context(|| format!("Failed to list services in namespace {}", k8s_namespace))?;
193
194 let statefulsets_api: Api<StatefulSet> = Api::namespaced(client.clone(), k8s_namespace);
195
196 let organization_name_filter =
197 format!("materialize.cloud/organization-name={}", mz_instance_name);
198
199 let statefulsets = statefulsets_api
200 .list(&ListParams::default().labels(&organization_name_filter))
201 .await
202 .with_context(|| format!("Failed to list services in namespace {}", k8s_namespace))?;
203
204 let cluster_services: Vec<ServiceInfo> = services
205 .iter()
206 .filter_map(|service| {
207 let name = service.metadata.name.clone()?;
208 let spec = service.spec.clone()?;
209 let selector = spec.selector?;
210 let ports = spec.ports?;
211
212 if selector.get("environmentd.materialize.cloud/namespace")? != "cluster" {
214 return None;
215 }
216
217 let envd_statefulset_reference_name = service
219 .metadata
220 .owner_references
221 .as_ref()?
222 .iter()
223 .find(|owner_reference| owner_reference.kind == "StatefulSet")?
225 .name
226 .clone();
227
228 if !statefulsets
229 .iter()
230 .filter_map(|statefulset| statefulset.metadata.name.clone())
231 .any(|name| name == envd_statefulset_reference_name)
232 {
233 return None;
234 }
235
236 Some(ServiceInfo {
237 service_name: name,
238 service_ports: ports,
239 namespace: k8s_namespace.clone(),
240 })
241 })
242 .collect();
243
244 if !cluster_services.is_empty() {
245 return Ok(cluster_services);
246 }
247
248 Err(anyhow::anyhow!("Could not find cluster services"))
249}
250
251pub async fn create_pg_wire_port_forwarder(
253 client: &Client,
254 k8s_context: &Option<String>,
255 k8s_namespace: &String,
256 mz_instance_name: &String,
257) -> Result<KubectlPortForwarder> {
258 let service_info = find_environmentd_service(client, k8s_namespace, mz_instance_name)
259 .await
260 .with_context(|| "Cannot find ports for environmentd service")?;
261
262 let maybe_external_sql_port = service_info.service_ports.iter().find_map(|port_info| {
263 if let Some(port_name) = &port_info.name {
264 let port_name = port_name.to_lowercase();
265 if port_name == "sql" {
266 return Some(port_info);
267 }
268 }
269 None
270 });
271
272 if let Some(external_sql_port) = maybe_external_sql_port {
273 Ok(KubectlPortForwarder {
274 context: k8s_context.clone(),
275 namespace: service_info.namespace,
276 service_name: service_info.service_name,
277 target_port: external_sql_port.port,
278 })
279 } else {
280 Err(anyhow::anyhow!(
281 "No SQL port forwarding info found. Set --mz-connection-url to a Materialize instance."
282 ))
283 }
284}