1use std::fmt::Debug;
19use std::fs::{File, create_dir_all};
20use std::future::Future;
21use std::io::Write;
22use std::path::PathBuf;
23use std::pin::Pin;
24
25use futures::future::join_all;
26use k8s_openapi::NamespaceResourceScope;
27use k8s_openapi::api::admissionregistration::v1::{
28 MutatingWebhookConfiguration, ValidatingWebhookConfiguration,
29};
30use k8s_openapi::api::apps::v1::{DaemonSet, Deployment, ReplicaSet, StatefulSet};
31use k8s_openapi::api::core::v1::{
32 ConfigMap, Event, Node, PersistentVolume, PersistentVolumeClaim, Pod, Service, ServiceAccount,
33};
34use k8s_openapi::api::networking::v1::NetworkPolicy;
35use k8s_openapi::api::rbac::v1::{Role, RoleBinding};
36use k8s_openapi::api::storage::v1::StorageClass;
37use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
38use kube::api::{ListParams, LogParams};
39use kube::{Api, Client};
40use mz_cloud_resources::crd::generated::cert_manager::certificates::Certificate;
41use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
42
43use serde::{Serialize, de::DeserializeOwned};
44use tracing::{info, warn};
45
46use crate::{ContainerDumper, Context};
47
48struct K8sResourceDumper<'n, K> {
49 context: &'n Context,
50 api: Api<K>,
51 namespace: Option<String>,
52 resource_type: String,
53}
54
55impl<'n, K> K8sResourceDumper<'n, K>
56where
57 K: kube::Resource<DynamicType = ()> + Clone + Debug + Serialize + DeserializeOwned,
58{
59 fn cluster(context: &'n Context, client: Client) -> Self {
60 Self {
61 context,
62 api: Api::<K>::all(client),
63 namespace: None,
64 resource_type: K::plural(&()).into_owned(),
65 }
66 }
67
68 fn namespaced(context: &'n Context, client: Client, namespace: String) -> Self
69 where
70 K: kube::Resource<Scope = NamespaceResourceScope>,
71 {
72 Self {
73 context,
74 api: Api::<K>::namespaced(client, namespace.as_str()),
75 namespace: Some(namespace),
76 resource_type: K::plural(&()).into_owned(),
77 }
78 }
79
80 async fn _dump(&self) -> Result<(), anyhow::Error> {
81 let object_list = self.api.list(&ListParams::default()).await?;
82
83 if object_list.items.is_empty() {
84 let mut err_msg = format!("No {} found", self.resource_type);
85 if let Some(namespace) = &self.namespace {
86 err_msg = format!("{} for namespace {}", err_msg, namespace);
87 }
88 warn!("{}", err_msg);
89 return Ok(());
90 }
91 let file_path = format_resource_path(
92 self.context.base_path.clone(),
93 self.resource_type.as_str(),
94 self.namespace.as_ref(),
95 );
96 create_dir_all(&file_path)?;
97
98 for (i, item) in object_list.items.iter().enumerate() {
99 let file_name = file_path.join(format!(
100 "{}.yaml",
101 &item
102 .meta()
103 .name
104 .clone()
105 .unwrap_or_else(|| format!("unknown_{}", i))
106 ));
107 let mut file = File::create(&file_name)?;
108
109 serde_yaml::to_writer(&mut file, &item)?;
110
111 info!("Exported {}", file_name.display());
112 }
113
114 Ok(())
115 }
116
117 async fn dump(&self) {
118 if let Err(e) = self._dump().await {
119 warn!("Failed to write k8s {}: {}", self.resource_type, e);
120 }
121 }
122}
123
124pub struct K8sDumper<'n> {
125 context: &'n Context,
126 client: Client,
128 k8s_namespace: String,
130 k8s_additional_namespaces: Option<Vec<String>>,
132 k8s_context: Option<String>,
134}
135
136impl<'n> K8sDumper<'n> {
137 pub fn new(
138 context: &'n Context,
139 client: Client,
140 k8s_namespace: String,
141 k8s_additional_namespaces: Option<Vec<String>>,
142 k8s_context: Option<String>,
143 ) -> Self {
144 Self {
145 context,
146 client,
147 k8s_namespace,
148 k8s_additional_namespaces,
149 k8s_context,
150 }
151 }
152
153 async fn _dump_kubectl_describe<K>(
154 &self,
155 namespace: Option<&String>,
156 ) -> Result<(), anyhow::Error>
157 where
158 K: kube::Resource<DynamicType = ()>,
159 {
160 let resource_type = K::plural(&()).into_owned();
161 let mut args = vec!["describe", &resource_type];
162 if let Some(namespace) = namespace {
163 args.extend(["-n", namespace]);
164 } else {
165 args.push("--all-namespaces");
166 }
167
168 if let Some(k8s_context) = &self.k8s_context {
169 args.extend(["--context", k8s_context]);
170 }
171
172 let output = tokio::process::Command::new("kubectl")
173 .args(args)
174 .stderr(std::process::Stdio::null()) .output()
176 .await?;
177
178 if !output.status.success() {
179 return Err(anyhow::anyhow!(
180 "{}",
181 String::from_utf8_lossy(&output.stderr)
182 ));
183 }
184
185 if output.stdout.is_empty() {
186 let mut err_msg = format!("Describe: No {} found", resource_type);
187 if let Some(namespace) = namespace {
188 err_msg = format!("{} for namespace {}", err_msg, namespace);
189 }
190 warn!("{}", err_msg);
191 return Ok(());
192 }
193
194 let file_path = format_resource_path(
195 self.context.base_path.clone(),
196 resource_type.as_str(),
197 namespace,
198 );
199 let file_name = file_path.join("describe.txt");
200 create_dir_all(&file_path)?;
201 let mut file = File::create(&file_name)?;
202 file.write_all(&output.stdout)?;
203
204 info!("Exported {}", file_name.display());
205
206 Ok(())
207 }
208
209 async fn dump_kubectl_describe<K>(&self, namespace: Option<&String>)
210 where
211 K: kube::Resource<DynamicType = ()>,
212 {
213 if let Err(e) = self._dump_kubectl_describe::<K>(namespace).await {
214 warn!(
215 "Failed to dump kubectl describe for {}: {}",
216 K::plural(&()).into_owned(),
217 e
218 );
219 }
220 }
221
222 async fn dump_cluster_resources(&self) {
224 K8sResourceDumper::<Node>::cluster(self.context, self.client.clone())
225 .dump()
226 .await;
227
228 K8sResourceDumper::<StorageClass>::cluster(self.context, self.client.clone())
229 .dump()
230 .await;
231
232 K8sResourceDumper::<PersistentVolume>::cluster(self.context, self.client.clone())
233 .dump()
234 .await;
235
236 K8sResourceDumper::<MutatingWebhookConfiguration>::cluster(
237 self.context,
238 self.client.clone(),
239 )
240 .dump()
241 .await;
242
243 K8sResourceDumper::<ValidatingWebhookConfiguration>::cluster(
244 self.context,
245 self.client.clone(),
246 )
247 .dump()
248 .await;
249 K8sResourceDumper::<DaemonSet>::cluster(self.context, self.client.clone())
250 .dump()
251 .await;
252 K8sResourceDumper::<CustomResourceDefinition>::cluster(self.context, self.client.clone())
253 .dump()
254 .await;
255 }
256
257 async fn _dump_k8s_pod_logs(&self, namespace: &String) -> Result<(), anyhow::Error> {
258 let file_path =
259 format_resource_path(self.context.base_path.clone(), "logs", Some(namespace));
260 create_dir_all(&file_path)?;
261
262 let pods: Api<Pod> = Api::<Pod>::namespaced(self.client.clone(), namespace);
263 let pod_list = pods.list(&ListParams::default()).await?;
264
265 for (i, pod) in pod_list.items.iter().enumerate() {
266 let pod_name = pod
267 .metadata
268 .name
269 .clone()
270 .unwrap_or_else(|| format!("unknown_{}", i));
271 async fn export_pod_logs(
272 pods: &Api<Pod>,
273 pod_name: &str,
274 file_path: &PathBuf,
275 is_previous: bool,
276 ) -> Result<(), anyhow::Error> {
277 let suffix = if is_previous { "previous" } else { "current" };
278 let file_name = file_path.join(format!("{}.{}.log", pod_name, suffix));
279
280 let logs = pods
281 .logs(
282 pod_name,
283 &LogParams {
284 previous: is_previous,
285 timestamps: true,
286 ..Default::default()
287 },
288 )
289 .await?;
290
291 if logs.is_empty() {
292 warn!("No {} logs found for pod {}", suffix, pod_name);
293 return Ok(());
294 }
295
296 let mut file = File::create(&file_name)?;
297 file.write_all(logs.as_bytes())?;
298 info!("Exported {}", file_name.display());
299
300 Ok(())
301 }
302
303 if let Err(e) = export_pod_logs(&pods, &pod_name, &file_path, true).await {
304 match e.downcast_ref::<kube::Error>() {
305 Some(kube::Error::Api(e)) if e.code == 400 => {
306 warn!("No previous logs available for pod {}", pod_name);
307 }
308 _ => {
309 warn!(
310 "Failed to export previous logs for pod {}: {}",
311 &pod_name, e
312 );
313 }
314 }
315 }
316
317 if let Err(e) = export_pod_logs(&pods, &pod_name, &file_path, false).await {
318 warn!("Failed to export current logs for pod {}: {}", &pod_name, e);
319 }
320 }
321 Ok(())
322 }
323
324 async fn dump_k8s_pod_logs(&self, namespace: &String) {
326 if let Err(e) = self._dump_k8s_pod_logs(namespace).await {
327 warn!("Failed to dump k8s pod logs: {}", e);
328 }
329 }
330
331 pub async fn dump_namespaced_resources(&self, namespace: String) {
333 K8sResourceDumper::<Pod>::namespaced(self.context, self.client.clone(), namespace.clone())
334 .dump()
335 .await;
336 K8sResourceDumper::<Service>::namespaced(
337 self.context,
338 self.client.clone(),
339 namespace.clone(),
340 )
341 .dump()
342 .await;
343 K8sResourceDumper::<Deployment>::namespaced(
344 self.context,
345 self.client.clone(),
346 namespace.clone(),
347 )
348 .dump()
349 .await;
350 K8sResourceDumper::<StatefulSet>::namespaced(
351 self.context,
352 self.client.clone(),
353 namespace.clone(),
354 )
355 .dump()
356 .await;
357 K8sResourceDumper::<ReplicaSet>::namespaced(
358 self.context,
359 self.client.clone(),
360 namespace.clone(),
361 )
362 .dump()
363 .await;
364 K8sResourceDumper::<NetworkPolicy>::namespaced(
365 self.context,
366 self.client.clone(),
367 namespace.clone(),
368 )
369 .dump()
370 .await;
371 K8sResourceDumper::<Event>::namespaced(
372 self.context,
373 self.client.clone(),
374 namespace.clone(),
375 )
376 .dump()
377 .await;
378 K8sResourceDumper::<Materialize>::namespaced(
379 self.context,
380 self.client.clone(),
381 namespace.clone(),
382 )
383 .dump()
384 .await;
385 K8sResourceDumper::<Role>::namespaced(self.context, self.client.clone(), namespace.clone())
386 .dump()
387 .await;
388 K8sResourceDumper::<RoleBinding>::namespaced(
389 self.context,
390 self.client.clone(),
391 namespace.clone(),
392 )
393 .dump()
394 .await;
395 K8sResourceDumper::<ConfigMap>::namespaced(
396 self.context,
397 self.client.clone(),
398 namespace.clone(),
399 )
400 .dump()
401 .await;
402 K8sResourceDumper::<PersistentVolumeClaim>::namespaced(
403 self.context,
404 self.client.clone(),
405 namespace.clone(),
406 )
407 .dump()
408 .await;
409 K8sResourceDumper::<ServiceAccount>::namespaced(
410 self.context,
411 self.client.clone(),
412 namespace.clone(),
413 )
414 .dump()
415 .await;
416
417 K8sResourceDumper::<Certificate>::namespaced(
418 self.context,
419 self.client.clone(),
420 namespace.clone(),
421 )
422 .dump()
423 .await;
424
425 self.dump_k8s_pod_logs(&namespace).await;
426 }
427}
428
429impl<'n> ContainerDumper for K8sDumper<'n> {
430 async fn dump_container_resources(&self) {
431 let mut futs: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![];
432
433 let k8s_namespaces_iter = std::iter::once(&self.k8s_namespace)
434 .chain(self.k8s_additional_namespaces.iter().flatten());
435
436 for namespace in k8s_namespaces_iter.clone() {
437 futs.push(Box::pin(self.dump_kubectl_describe::<Pod>(Some(namespace))));
438 futs.push(Box::pin(
439 self.dump_kubectl_describe::<Service>(Some(namespace)),
440 ));
441 futs.push(Box::pin(
442 self.dump_kubectl_describe::<Deployment>(Some(namespace)),
443 ));
444 futs.push(Box::pin(
445 self.dump_kubectl_describe::<StatefulSet>(Some(namespace)),
446 ));
447 futs.push(Box::pin(
448 self.dump_kubectl_describe::<ReplicaSet>(Some(namespace)),
449 ));
450 futs.push(Box::pin(
451 self.dump_kubectl_describe::<NetworkPolicy>(Some(namespace)),
452 ));
453 futs.push(Box::pin(
454 self.dump_kubectl_describe::<Event>(Some(namespace)),
455 ));
456 futs.push(Box::pin(
457 self.dump_kubectl_describe::<Materialize>(Some(namespace)),
458 ));
459 futs.push(Box::pin(
460 self.dump_kubectl_describe::<Role>(Some(namespace)),
461 ));
462 futs.push(Box::pin(
463 self.dump_kubectl_describe::<RoleBinding>(Some(namespace)),
464 ));
465 futs.push(Box::pin(
466 self.dump_kubectl_describe::<ConfigMap>(Some(namespace)),
467 ));
468 futs.push(Box::pin(
469 self.dump_kubectl_describe::<PersistentVolumeClaim>(Some(namespace)),
470 ));
471 futs.push(Box::pin(
472 self.dump_kubectl_describe::<ServiceAccount>(Some(namespace)),
473 ));
474 futs.push(Box::pin(
475 self.dump_kubectl_describe::<Certificate>(Some(namespace)),
476 ));
477 }
478
479 futs.push(Box::pin(self.dump_kubectl_describe::<Node>(None)));
480 futs.push(Box::pin(self.dump_kubectl_describe::<DaemonSet>(None)));
481 futs.push(Box::pin(self.dump_kubectl_describe::<StorageClass>(None)));
482 futs.push(Box::pin(
483 self.dump_kubectl_describe::<PersistentVolume>(None),
484 ));
485 futs.push(Box::pin(
486 self.dump_kubectl_describe::<MutatingWebhookConfiguration>(None),
487 ));
488 futs.push(Box::pin(
489 self.dump_kubectl_describe::<ValidatingWebhookConfiguration>(None),
490 ));
491 futs.push(Box::pin(
492 self.dump_kubectl_describe::<CustomResourceDefinition>(None),
493 ));
494
495 for namespace in k8s_namespaces_iter.clone() {
496 futs.push(Box::pin(self.dump_namespaced_resources(namespace.clone())));
497 }
498 futs.push(Box::pin(self.dump_cluster_resources()));
499
500 join_all(futs).await;
501 }
502}
503
504fn format_resource_path(
505 base_path: PathBuf,
506 resource_type: &str,
507 namespace: Option<&String>,
508) -> PathBuf {
509 let mut path = base_path.join(resource_type);
510
511 if let Some(namespace) = namespace {
512 path = path.join(namespace);
513 }
514 path
515}