1use std::fmt::Debug;
19use std::fs::{File, create_dir_all};
20use std::future::Future;
21use std::io::Write;
22use std::path::PathBuf;
23use std::pin::Pin;
24
25use chrono::{DateTime, Utc};
26use futures::future::join_all;
27use k8s_openapi::NamespaceResourceScope;
28use k8s_openapi::api::admissionregistration::v1::{
29 MutatingWebhookConfiguration, ValidatingWebhookConfiguration,
30};
31use k8s_openapi::api::apps::v1::{DaemonSet, Deployment, ReplicaSet, StatefulSet};
32use k8s_openapi::api::core::v1::{
33 ConfigMap, Event, Node, PersistentVolume, PersistentVolumeClaim, Pod, Secret, Service,
34 ServiceAccount,
35};
36use k8s_openapi::api::networking::v1::NetworkPolicy;
37use k8s_openapi::api::rbac::v1::{Role, RoleBinding};
38use k8s_openapi::api::storage::v1::StorageClass;
39use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
40use kube::api::{ListParams, LogParams};
41use kube::{Api, Client};
42use mz_cloud_resources::crd::generated::cert_manager::certificates::Certificate;
43use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
44
45use serde::{Serialize, de::DeserializeOwned};
46use tracing::{info, warn};
47
48use crate::utils::format_base_path;
49use crate::{ContainerDumper, Context};
50
51struct K8sResourceDumper<'n, K> {
52 context: &'n Context,
53 api: Api<K>,
54 namespace: Option<String>,
55 resource_type: String,
56 dump_secret_values: bool,
57}
58
59impl<'n, K> K8sResourceDumper<'n, K>
60where
61 K: kube::Resource<DynamicType = ()> + Clone + Debug + Serialize + DeserializeOwned,
62{
63 fn cluster(context: &'n Context, client: Client, dump_secret_values: bool) -> Self {
64 Self {
65 context,
66 api: Api::<K>::all(client),
67 namespace: None,
68 resource_type: K::plural(&()).into_owned(),
69 dump_secret_values,
70 }
71 }
72
73 fn namespaced(
74 context: &'n Context,
75 client: Client,
76 namespace: String,
77 dump_secret_values: bool,
78 ) -> Self
79 where
80 K: kube::Resource<Scope = NamespaceResourceScope>,
81 {
82 Self {
83 context,
84 api: Api::<K>::namespaced(client, namespace.as_str()),
85 namespace: Some(namespace),
86 resource_type: K::plural(&()).into_owned(),
87 dump_secret_values,
88 }
89 }
90
91 async fn _dump(&self) -> Result<(), anyhow::Error> {
92 let object_list = self.api.list(&ListParams::default()).await?;
93
94 if object_list.items.is_empty() {
95 let mut err_msg = format!("No {} found", self.resource_type);
96 if let Some(namespace) = &self.namespace {
97 err_msg = format!("{} for namespace {}", err_msg, namespace);
98 }
99 warn!("{}", err_msg);
100 return Ok(());
101 }
102 let file_path = format_resource_path(
103 self.context.start_time,
104 self.resource_type.as_str(),
105 self.namespace.as_ref(),
106 );
107 create_dir_all(&file_path)?;
108
109 for (i, item) in object_list.items.iter().enumerate() {
110 let file_name = file_path.join(format!(
111 "{}.yaml",
112 &item
113 .meta()
114 .name
115 .clone()
116 .unwrap_or_else(|| format!("unknown_{}", i))
117 ));
118 let mut file = File::create(&file_name)?;
119
120 if self.resource_type == "secrets" && !self.dump_secret_values {
122 serde_yaml::to_writer(&mut file, item.meta())?;
123 } else {
124 serde_yaml::to_writer(&mut file, &item)?;
125 }
126
127 info!("Exported {}", file_name.display());
128 }
129
130 Ok(())
131 }
132
133 async fn dump(&self) {
134 if let Err(e) = self._dump().await {
135 warn!("Failed to write k8s {}: {}", self.resource_type, e);
136 }
137 }
138}
139
140pub struct K8sDumper<'n> {
141 context: &'n Context,
142 client: Client,
144 k8s_namespaces: Vec<String>,
146 k8s_context: Option<String>,
148 k8s_dump_secret_values: bool,
150}
151
152impl<'n> K8sDumper<'n> {
153 pub fn new(
154 context: &'n Context,
155 client: Client,
156 k8s_namespaces: Vec<String>,
157 k8s_context: Option<String>,
158 k8s_dump_secret_values: bool,
159 ) -> Self {
160 Self {
161 context,
162 client,
163 k8s_namespaces,
164 k8s_context,
165 k8s_dump_secret_values,
166 }
167 }
168
169 async fn _dump_kubectl_describe<K>(
170 &self,
171 namespace: Option<&String>,
172 ) -> Result<(), anyhow::Error>
173 where
174 K: kube::Resource<DynamicType = ()>,
175 {
176 let resource_type = K::plural(&()).into_owned();
177 let mut args = vec!["describe", &resource_type];
178 if let Some(namespace) = namespace {
179 args.extend(["-n", namespace]);
180 } else {
181 args.push("--all-namespaces");
182 }
183
184 if let Some(k8s_context) = &self.k8s_context {
185 args.extend(["--context", k8s_context]);
186 }
187
188 let output = tokio::process::Command::new("kubectl")
189 .args(args)
190 .stderr(std::process::Stdio::null()) .output()
192 .await?;
193
194 if !output.status.success() {
195 return Err(anyhow::anyhow!(
196 "{}",
197 String::from_utf8_lossy(&output.stderr)
198 ));
199 }
200
201 if output.stdout.is_empty() {
202 let mut err_msg = format!("Describe: No {} found", resource_type);
203 if let Some(namespace) = namespace {
204 err_msg = format!("{} for namespace {}", err_msg, namespace);
205 }
206 warn!("{}", err_msg);
207 return Ok(());
208 }
209
210 let file_path =
211 format_resource_path(self.context.start_time, resource_type.as_str(), namespace);
212 let file_name = file_path.join("describe.txt");
213 create_dir_all(&file_path)?;
214 let mut file = File::create(&file_name)?;
215 file.write_all(&output.stdout)?;
216
217 info!("Exported {}", file_name.display());
218
219 Ok(())
220 }
221
222 async fn dump_kubectl_describe<K>(&self, namespace: Option<&String>)
223 where
224 K: kube::Resource<DynamicType = ()>,
225 {
226 if let Err(e) = self._dump_kubectl_describe::<K>(namespace).await {
227 warn!(
228 "Failed to dump kubectl describe for {}: {}",
229 K::plural(&()).into_owned(),
230 e
231 );
232 }
233 }
234
235 async fn dump_cluster_resources(&self) {
237 K8sResourceDumper::<Node>::cluster(
238 self.context,
239 self.client.clone(),
240 self.k8s_dump_secret_values,
241 )
242 .dump()
243 .await;
244
245 K8sResourceDumper::<StorageClass>::cluster(
246 self.context,
247 self.client.clone(),
248 self.k8s_dump_secret_values,
249 )
250 .dump()
251 .await;
252
253 K8sResourceDumper::<PersistentVolume>::cluster(
254 self.context,
255 self.client.clone(),
256 self.k8s_dump_secret_values,
257 )
258 .dump()
259 .await;
260
261 K8sResourceDumper::<MutatingWebhookConfiguration>::cluster(
262 self.context,
263 self.client.clone(),
264 self.k8s_dump_secret_values,
265 )
266 .dump()
267 .await;
268
269 K8sResourceDumper::<ValidatingWebhookConfiguration>::cluster(
270 self.context,
271 self.client.clone(),
272 self.k8s_dump_secret_values,
273 )
274 .dump()
275 .await;
276 K8sResourceDumper::<DaemonSet>::cluster(
277 self.context,
278 self.client.clone(),
279 self.k8s_dump_secret_values,
280 )
281 .dump()
282 .await;
283 K8sResourceDumper::<CustomResourceDefinition>::cluster(
284 self.context,
285 self.client.clone(),
286 self.k8s_dump_secret_values,
287 )
288 .dump()
289 .await;
290 }
291
292 async fn _dump_k8s_pod_logs(&self, namespace: &String) -> Result<(), anyhow::Error> {
293 let file_path = format_resource_path(self.context.start_time, "logs", Some(namespace));
294 create_dir_all(&file_path)?;
295
296 let pods: Api<Pod> = Api::<Pod>::namespaced(self.client.clone(), namespace);
297 let pod_list = pods.list(&ListParams::default()).await?;
298
299 for (i, pod) in pod_list.items.iter().enumerate() {
300 let pod_name = pod
301 .metadata
302 .name
303 .clone()
304 .unwrap_or_else(|| format!("unknown_{}", i));
305 async fn export_pod_logs(
306 pods: &Api<Pod>,
307 pod_name: &str,
308 file_path: &PathBuf,
309 is_previous: bool,
310 ) -> Result<(), anyhow::Error> {
311 let suffix = if is_previous { "previous" } else { "current" };
312 let file_name = file_path.join(format!("{}.{}.log", pod_name, suffix));
313
314 let logs = pods
315 .logs(
316 pod_name,
317 &LogParams {
318 previous: is_previous,
319 timestamps: true,
320 ..Default::default()
321 },
322 )
323 .await?;
324
325 if logs.is_empty() {
326 warn!("No {} logs found for pod {}", suffix, pod_name);
327 return Ok(());
328 }
329
330 let mut file = File::create(&file_name)?;
331 file.write_all(logs.as_bytes())?;
332 info!("Exported {}", file_name.display());
333
334 Ok(())
335 }
336
337 if let Err(e) = export_pod_logs(&pods, &pod_name, &file_path, true).await {
338 match e.downcast_ref::<kube::Error>() {
339 Some(kube::Error::Api(e)) if e.code == 400 => {
340 warn!("No previous logs available for pod {}", pod_name);
341 }
342 _ => {
343 warn!(
344 "Failed to export previous logs for pod {}: {}",
345 &pod_name, e
346 );
347 }
348 }
349 }
350
351 if let Err(e) = export_pod_logs(&pods, &pod_name, &file_path, false).await {
352 warn!("Failed to export current logs for pod {}: {}", &pod_name, e);
353 }
354 }
355 Ok(())
356 }
357
358 async fn dump_k8s_pod_logs(&self, namespace: &String) {
360 if let Err(e) = self._dump_k8s_pod_logs(namespace).await {
361 warn!("Failed to dump k8s pod logs: {}", e);
362 }
363 }
364
365 pub async fn dump_namespaced_resources(&self, namespace: String) {
367 K8sResourceDumper::<Pod>::namespaced(
368 self.context,
369 self.client.clone(),
370 namespace.clone(),
371 self.k8s_dump_secret_values,
372 )
373 .dump()
374 .await;
375 K8sResourceDumper::<Service>::namespaced(
376 self.context,
377 self.client.clone(),
378 namespace.clone(),
379 self.k8s_dump_secret_values,
380 )
381 .dump()
382 .await;
383 K8sResourceDumper::<Deployment>::namespaced(
384 self.context,
385 self.client.clone(),
386 namespace.clone(),
387 self.k8s_dump_secret_values,
388 )
389 .dump()
390 .await;
391 K8sResourceDumper::<StatefulSet>::namespaced(
392 self.context,
393 self.client.clone(),
394 namespace.clone(),
395 self.k8s_dump_secret_values,
396 )
397 .dump()
398 .await;
399 K8sResourceDumper::<ReplicaSet>::namespaced(
400 self.context,
401 self.client.clone(),
402 namespace.clone(),
403 self.k8s_dump_secret_values,
404 )
405 .dump()
406 .await;
407 K8sResourceDumper::<NetworkPolicy>::namespaced(
408 self.context,
409 self.client.clone(),
410 namespace.clone(),
411 self.k8s_dump_secret_values,
412 )
413 .dump()
414 .await;
415 K8sResourceDumper::<Event>::namespaced(
416 self.context,
417 self.client.clone(),
418 namespace.clone(),
419 self.k8s_dump_secret_values,
420 )
421 .dump()
422 .await;
423 K8sResourceDumper::<Materialize>::namespaced(
424 self.context,
425 self.client.clone(),
426 namespace.clone(),
427 self.k8s_dump_secret_values,
428 )
429 .dump()
430 .await;
431 K8sResourceDumper::<Role>::namespaced(
432 self.context,
433 self.client.clone(),
434 namespace.clone(),
435 self.k8s_dump_secret_values,
436 )
437 .dump()
438 .await;
439 K8sResourceDumper::<RoleBinding>::namespaced(
440 self.context,
441 self.client.clone(),
442 namespace.clone(),
443 self.k8s_dump_secret_values,
444 )
445 .dump()
446 .await;
447 K8sResourceDumper::<ConfigMap>::namespaced(
448 self.context,
449 self.client.clone(),
450 namespace.clone(),
451 self.k8s_dump_secret_values,
452 )
453 .dump()
454 .await;
455 K8sResourceDumper::<Secret>::namespaced(
456 self.context,
457 self.client.clone(),
458 namespace.clone(),
459 self.k8s_dump_secret_values,
460 )
461 .dump()
462 .await;
463 K8sResourceDumper::<PersistentVolumeClaim>::namespaced(
464 self.context,
465 self.client.clone(),
466 namespace.clone(),
467 self.k8s_dump_secret_values,
468 )
469 .dump()
470 .await;
471 K8sResourceDumper::<ServiceAccount>::namespaced(
472 self.context,
473 self.client.clone(),
474 namespace.clone(),
475 self.k8s_dump_secret_values,
476 )
477 .dump()
478 .await;
479
480 K8sResourceDumper::<Certificate>::namespaced(
481 self.context,
482 self.client.clone(),
483 namespace.clone(),
484 self.k8s_dump_secret_values,
485 )
486 .dump()
487 .await;
488
489 self.dump_k8s_pod_logs(&namespace).await;
490 }
491}
492
493impl<'n> ContainerDumper for K8sDumper<'n> {
494 async fn dump_container_resources(&self) {
495 let mut futs: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![];
496
497 for namespace in &self.k8s_namespaces {
498 futs.push(Box::pin(self.dump_kubectl_describe::<Pod>(Some(namespace))));
499 futs.push(Box::pin(
500 self.dump_kubectl_describe::<Service>(Some(namespace)),
501 ));
502 futs.push(Box::pin(
503 self.dump_kubectl_describe::<Deployment>(Some(namespace)),
504 ));
505 futs.push(Box::pin(
506 self.dump_kubectl_describe::<StatefulSet>(Some(namespace)),
507 ));
508 futs.push(Box::pin(
509 self.dump_kubectl_describe::<ReplicaSet>(Some(namespace)),
510 ));
511 futs.push(Box::pin(
512 self.dump_kubectl_describe::<NetworkPolicy>(Some(namespace)),
513 ));
514 futs.push(Box::pin(
515 self.dump_kubectl_describe::<Event>(Some(namespace)),
516 ));
517 futs.push(Box::pin(
518 self.dump_kubectl_describe::<Materialize>(Some(namespace)),
519 ));
520 futs.push(Box::pin(
521 self.dump_kubectl_describe::<Role>(Some(namespace)),
522 ));
523 futs.push(Box::pin(
524 self.dump_kubectl_describe::<RoleBinding>(Some(namespace)),
525 ));
526 futs.push(Box::pin(
527 self.dump_kubectl_describe::<ConfigMap>(Some(namespace)),
528 ));
529 futs.push(Box::pin(
530 self.dump_kubectl_describe::<Secret>(Some(namespace)),
531 ));
532 futs.push(Box::pin(
533 self.dump_kubectl_describe::<PersistentVolumeClaim>(Some(namespace)),
534 ));
535 futs.push(Box::pin(
536 self.dump_kubectl_describe::<ServiceAccount>(Some(namespace)),
537 ));
538 futs.push(Box::pin(
539 self.dump_kubectl_describe::<Certificate>(Some(namespace)),
540 ));
541 }
542
543 futs.push(Box::pin(self.dump_kubectl_describe::<Node>(None)));
544 futs.push(Box::pin(self.dump_kubectl_describe::<DaemonSet>(None)));
545 futs.push(Box::pin(self.dump_kubectl_describe::<StorageClass>(None)));
546 futs.push(Box::pin(
547 self.dump_kubectl_describe::<PersistentVolume>(None),
548 ));
549 futs.push(Box::pin(
550 self.dump_kubectl_describe::<MutatingWebhookConfiguration>(None),
551 ));
552 futs.push(Box::pin(
553 self.dump_kubectl_describe::<ValidatingWebhookConfiguration>(None),
554 ));
555 futs.push(Box::pin(
556 self.dump_kubectl_describe::<CustomResourceDefinition>(None),
557 ));
558
559 for namespace in self.k8s_namespaces.clone() {
560 futs.push(Box::pin(self.dump_namespaced_resources(namespace)));
561 }
562 futs.push(Box::pin(self.dump_cluster_resources()));
563
564 join_all(futs).await;
565 }
566}
567
568fn format_resource_path(
569 date_time: DateTime<Utc>,
570 resource_type: &str,
571 namespace: Option<&String>,
572) -> PathBuf {
573 let mut path = format_base_path(date_time).join(resource_type);
574
575 if let Some(namespace) = namespace {
576 path = path.join(namespace);
577 }
578 path
579}