mz_debug/
k8s_dumper.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Dumps k8s resources to files.
17
18use std::fmt::Debug;
19use std::fs::{File, create_dir_all};
20use std::future::Future;
21use std::io::Write;
22use std::path::PathBuf;
23use std::pin::Pin;
24
25use chrono::{DateTime, Utc};
26use futures::future::join_all;
27use k8s_openapi::NamespaceResourceScope;
28use k8s_openapi::api::admissionregistration::v1::{
29    MutatingWebhookConfiguration, ValidatingWebhookConfiguration,
30};
31use k8s_openapi::api::apps::v1::{DaemonSet, Deployment, ReplicaSet, StatefulSet};
32use k8s_openapi::api::core::v1::{
33    ConfigMap, Event, Node, PersistentVolume, PersistentVolumeClaim, Pod, Secret, Service,
34    ServiceAccount,
35};
36use k8s_openapi::api::networking::v1::NetworkPolicy;
37use k8s_openapi::api::rbac::v1::{Role, RoleBinding};
38use k8s_openapi::api::storage::v1::StorageClass;
39use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
40use kube::api::{ListParams, LogParams};
41use kube::{Api, Client};
42use mz_cloud_resources::crd::generated::cert_manager::certificates::Certificate;
43use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
44
45use serde::{Serialize, de::DeserializeOwned};
46use tracing::{info, warn};
47
48use crate::utils::format_base_path;
49use crate::{ContainerDumper, Context};
50
51struct K8sResourceDumper<'n, K> {
52    context: &'n Context,
53    api: Api<K>,
54    namespace: Option<String>,
55    resource_type: String,
56    dump_secret_values: bool,
57}
58
59impl<'n, K> K8sResourceDumper<'n, K>
60where
61    K: kube::Resource<DynamicType = ()> + Clone + Debug + Serialize + DeserializeOwned,
62{
63    fn cluster(context: &'n Context, client: Client, dump_secret_values: bool) -> Self {
64        Self {
65            context,
66            api: Api::<K>::all(client),
67            namespace: None,
68            resource_type: K::plural(&()).into_owned(),
69            dump_secret_values,
70        }
71    }
72
73    fn namespaced(
74        context: &'n Context,
75        client: Client,
76        namespace: String,
77        dump_secret_values: bool,
78    ) -> Self
79    where
80        K: kube::Resource<Scope = NamespaceResourceScope>,
81    {
82        Self {
83            context,
84            api: Api::<K>::namespaced(client, namespace.as_str()),
85            namespace: Some(namespace),
86            resource_type: K::plural(&()).into_owned(),
87            dump_secret_values,
88        }
89    }
90
91    async fn _dump(&self) -> Result<(), anyhow::Error> {
92        let object_list = self.api.list(&ListParams::default()).await?;
93
94        if object_list.items.is_empty() {
95            let mut err_msg = format!("No {} found", self.resource_type);
96            if let Some(namespace) = &self.namespace {
97                err_msg = format!("{} for namespace {}", err_msg, namespace);
98            }
99            warn!("{}", err_msg);
100            return Ok(());
101        }
102        let file_path = format_resource_path(
103            self.context.start_time,
104            self.resource_type.as_str(),
105            self.namespace.as_ref(),
106        );
107        create_dir_all(&file_path)?;
108
109        for (i, item) in object_list.items.iter().enumerate() {
110            let file_name = file_path.join(format!(
111                "{}.yaml",
112                &item
113                    .meta()
114                    .name
115                    .clone()
116                    .unwrap_or_else(|| format!("unknown_{}", i))
117            ));
118            let mut file = File::create(&file_name)?;
119
120            // If the resource is a secret, we hide its values by default.
121            if self.resource_type == "secrets" && !self.dump_secret_values {
122                serde_yaml::to_writer(&mut file, item.meta())?;
123            } else {
124                serde_yaml::to_writer(&mut file, &item)?;
125            }
126
127            info!("Exported {}", file_name.display());
128        }
129
130        Ok(())
131    }
132
133    async fn dump(&self) {
134        if let Err(e) = self._dump().await {
135            warn!("Failed to write k8s {}: {}", self.resource_type, e);
136        }
137    }
138}
139
140pub struct K8sDumper<'n> {
141    context: &'n Context,
142    /// The kubernetes client to use.
143    client: Client,
144    /// A list of namespaces to dump.
145    k8s_namespaces: Vec<String>,
146    /// The kubernetes context to use.
147    k8s_context: Option<String>,
148    /// If true, the tool will dump the values of secrets in the Kubernetes cluster.
149    k8s_dump_secret_values: bool,
150}
151
152impl<'n> K8sDumper<'n> {
153    pub fn new(
154        context: &'n Context,
155        client: Client,
156        k8s_namespaces: Vec<String>,
157        k8s_context: Option<String>,
158        k8s_dump_secret_values: bool,
159    ) -> Self {
160        Self {
161            context,
162            client,
163            k8s_namespaces,
164            k8s_context,
165            k8s_dump_secret_values,
166        }
167    }
168
169    async fn _dump_kubectl_describe<K>(
170        &self,
171        namespace: Option<&String>,
172    ) -> Result<(), anyhow::Error>
173    where
174        K: kube::Resource<DynamicType = ()>,
175    {
176        let resource_type = K::plural(&()).into_owned();
177        let mut args = vec!["describe", &resource_type];
178        if let Some(namespace) = namespace {
179            args.extend(["-n", namespace]);
180        } else {
181            args.push("--all-namespaces");
182        }
183
184        if let Some(k8s_context) = &self.k8s_context {
185            args.extend(["--context", k8s_context]);
186        }
187
188        let output = tokio::process::Command::new("kubectl")
189            .args(args)
190            .stderr(std::process::Stdio::null()) // Silence stderr
191            .output()
192            .await?;
193
194        if !output.status.success() {
195            return Err(anyhow::anyhow!(
196                "{}",
197                String::from_utf8_lossy(&output.stderr)
198            ));
199        }
200
201        if output.stdout.is_empty() {
202            let mut err_msg = format!("Describe: No {} found", resource_type);
203            if let Some(namespace) = namespace {
204                err_msg = format!("{} for namespace {}", err_msg, namespace);
205            }
206            warn!("{}", err_msg);
207            return Ok(());
208        }
209
210        let file_path =
211            format_resource_path(self.context.start_time, resource_type.as_str(), namespace);
212        let file_name = file_path.join("describe.txt");
213        create_dir_all(&file_path)?;
214        let mut file = File::create(&file_name)?;
215        file.write_all(&output.stdout)?;
216
217        info!("Exported {}", file_name.display());
218
219        Ok(())
220    }
221
222    async fn dump_kubectl_describe<K>(&self, namespace: Option<&String>)
223    where
224        K: kube::Resource<DynamicType = ()>,
225    {
226        if let Err(e) = self._dump_kubectl_describe::<K>(namespace).await {
227            warn!(
228                "Failed to dump kubectl describe for {}: {}",
229                K::plural(&()).into_owned(),
230                e
231            );
232        }
233    }
234
235    /// Write cluster-level k8s resources to a yaml file per resource.
236    async fn dump_cluster_resources(&self) {
237        K8sResourceDumper::<Node>::cluster(
238            self.context,
239            self.client.clone(),
240            self.k8s_dump_secret_values,
241        )
242        .dump()
243        .await;
244
245        K8sResourceDumper::<StorageClass>::cluster(
246            self.context,
247            self.client.clone(),
248            self.k8s_dump_secret_values,
249        )
250        .dump()
251        .await;
252
253        K8sResourceDumper::<PersistentVolume>::cluster(
254            self.context,
255            self.client.clone(),
256            self.k8s_dump_secret_values,
257        )
258        .dump()
259        .await;
260
261        K8sResourceDumper::<MutatingWebhookConfiguration>::cluster(
262            self.context,
263            self.client.clone(),
264            self.k8s_dump_secret_values,
265        )
266        .dump()
267        .await;
268
269        K8sResourceDumper::<ValidatingWebhookConfiguration>::cluster(
270            self.context,
271            self.client.clone(),
272            self.k8s_dump_secret_values,
273        )
274        .dump()
275        .await;
276        K8sResourceDumper::<DaemonSet>::cluster(
277            self.context,
278            self.client.clone(),
279            self.k8s_dump_secret_values,
280        )
281        .dump()
282        .await;
283        K8sResourceDumper::<CustomResourceDefinition>::cluster(
284            self.context,
285            self.client.clone(),
286            self.k8s_dump_secret_values,
287        )
288        .dump()
289        .await;
290    }
291
292    async fn _dump_k8s_pod_logs(&self, namespace: &String) -> Result<(), anyhow::Error> {
293        let file_path = format_resource_path(self.context.start_time, "logs", Some(namespace));
294        create_dir_all(&file_path)?;
295
296        let pods: Api<Pod> = Api::<Pod>::namespaced(self.client.clone(), namespace);
297        let pod_list = pods.list(&ListParams::default()).await?;
298
299        for (i, pod) in pod_list.items.iter().enumerate() {
300            let pod_name = pod
301                .metadata
302                .name
303                .clone()
304                .unwrap_or_else(|| format!("unknown_{}", i));
305            async fn export_pod_logs(
306                pods: &Api<Pod>,
307                pod_name: &str,
308                file_path: &PathBuf,
309                is_previous: bool,
310            ) -> Result<(), anyhow::Error> {
311                let suffix = if is_previous { "previous" } else { "current" };
312                let file_name = file_path.join(format!("{}.{}.log", pod_name, suffix));
313
314                let logs = pods
315                    .logs(
316                        pod_name,
317                        &LogParams {
318                            previous: is_previous,
319                            timestamps: true,
320                            ..Default::default()
321                        },
322                    )
323                    .await?;
324
325                if logs.is_empty() {
326                    warn!("No {} logs found for pod {}", suffix, pod_name);
327                    return Ok(());
328                }
329
330                let mut file = File::create(&file_name)?;
331                file.write_all(logs.as_bytes())?;
332                info!("Exported {}", file_name.display());
333
334                Ok(())
335            }
336
337            if let Err(e) = export_pod_logs(&pods, &pod_name, &file_path, true).await {
338                match e.downcast_ref::<kube::Error>() {
339                    Some(kube::Error::Api(e)) if e.code == 400 => {
340                        warn!("No previous logs available for pod {}", pod_name);
341                    }
342                    _ => {
343                        warn!(
344                            "Failed to export previous logs for pod {}: {}",
345                            &pod_name, e
346                        );
347                    }
348                }
349            }
350
351            if let Err(e) = export_pod_logs(&pods, &pod_name, &file_path, false).await {
352                warn!("Failed to export current logs for pod {}: {}", &pod_name, e);
353            }
354        }
355        Ok(())
356    }
357
358    /// Write k8s pod logs to a yaml file per pod.
359    async fn dump_k8s_pod_logs(&self, namespace: &String) {
360        if let Err(e) = self._dump_k8s_pod_logs(namespace).await {
361            warn!("Failed to dump k8s pod logs: {}", e);
362        }
363    }
364
365    /// Write namespace-level k8s resources to a yaml file per resource.
366    pub async fn dump_namespaced_resources(&self, namespace: String) {
367        K8sResourceDumper::<Pod>::namespaced(
368            self.context,
369            self.client.clone(),
370            namespace.clone(),
371            self.k8s_dump_secret_values,
372        )
373        .dump()
374        .await;
375        K8sResourceDumper::<Service>::namespaced(
376            self.context,
377            self.client.clone(),
378            namespace.clone(),
379            self.k8s_dump_secret_values,
380        )
381        .dump()
382        .await;
383        K8sResourceDumper::<Deployment>::namespaced(
384            self.context,
385            self.client.clone(),
386            namespace.clone(),
387            self.k8s_dump_secret_values,
388        )
389        .dump()
390        .await;
391        K8sResourceDumper::<StatefulSet>::namespaced(
392            self.context,
393            self.client.clone(),
394            namespace.clone(),
395            self.k8s_dump_secret_values,
396        )
397        .dump()
398        .await;
399        K8sResourceDumper::<ReplicaSet>::namespaced(
400            self.context,
401            self.client.clone(),
402            namespace.clone(),
403            self.k8s_dump_secret_values,
404        )
405        .dump()
406        .await;
407        K8sResourceDumper::<NetworkPolicy>::namespaced(
408            self.context,
409            self.client.clone(),
410            namespace.clone(),
411            self.k8s_dump_secret_values,
412        )
413        .dump()
414        .await;
415        K8sResourceDumper::<Event>::namespaced(
416            self.context,
417            self.client.clone(),
418            namespace.clone(),
419            self.k8s_dump_secret_values,
420        )
421        .dump()
422        .await;
423        K8sResourceDumper::<Materialize>::namespaced(
424            self.context,
425            self.client.clone(),
426            namespace.clone(),
427            self.k8s_dump_secret_values,
428        )
429        .dump()
430        .await;
431        K8sResourceDumper::<Role>::namespaced(
432            self.context,
433            self.client.clone(),
434            namespace.clone(),
435            self.k8s_dump_secret_values,
436        )
437        .dump()
438        .await;
439        K8sResourceDumper::<RoleBinding>::namespaced(
440            self.context,
441            self.client.clone(),
442            namespace.clone(),
443            self.k8s_dump_secret_values,
444        )
445        .dump()
446        .await;
447        K8sResourceDumper::<ConfigMap>::namespaced(
448            self.context,
449            self.client.clone(),
450            namespace.clone(),
451            self.k8s_dump_secret_values,
452        )
453        .dump()
454        .await;
455        K8sResourceDumper::<Secret>::namespaced(
456            self.context,
457            self.client.clone(),
458            namespace.clone(),
459            self.k8s_dump_secret_values,
460        )
461        .dump()
462        .await;
463        K8sResourceDumper::<PersistentVolumeClaim>::namespaced(
464            self.context,
465            self.client.clone(),
466            namespace.clone(),
467            self.k8s_dump_secret_values,
468        )
469        .dump()
470        .await;
471        K8sResourceDumper::<ServiceAccount>::namespaced(
472            self.context,
473            self.client.clone(),
474            namespace.clone(),
475            self.k8s_dump_secret_values,
476        )
477        .dump()
478        .await;
479
480        K8sResourceDumper::<Certificate>::namespaced(
481            self.context,
482            self.client.clone(),
483            namespace.clone(),
484            self.k8s_dump_secret_values,
485        )
486        .dump()
487        .await;
488
489        self.dump_k8s_pod_logs(&namespace).await;
490    }
491}
492
493impl<'n> ContainerDumper for K8sDumper<'n> {
494    async fn dump_container_resources(&self) {
495        let mut futs: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![];
496
497        for namespace in &self.k8s_namespaces {
498            futs.push(Box::pin(self.dump_kubectl_describe::<Pod>(Some(namespace))));
499            futs.push(Box::pin(
500                self.dump_kubectl_describe::<Service>(Some(namespace)),
501            ));
502            futs.push(Box::pin(
503                self.dump_kubectl_describe::<Deployment>(Some(namespace)),
504            ));
505            futs.push(Box::pin(
506                self.dump_kubectl_describe::<StatefulSet>(Some(namespace)),
507            ));
508            futs.push(Box::pin(
509                self.dump_kubectl_describe::<ReplicaSet>(Some(namespace)),
510            ));
511            futs.push(Box::pin(
512                self.dump_kubectl_describe::<NetworkPolicy>(Some(namespace)),
513            ));
514            futs.push(Box::pin(
515                self.dump_kubectl_describe::<Event>(Some(namespace)),
516            ));
517            futs.push(Box::pin(
518                self.dump_kubectl_describe::<Materialize>(Some(namespace)),
519            ));
520            futs.push(Box::pin(
521                self.dump_kubectl_describe::<Role>(Some(namespace)),
522            ));
523            futs.push(Box::pin(
524                self.dump_kubectl_describe::<RoleBinding>(Some(namespace)),
525            ));
526            futs.push(Box::pin(
527                self.dump_kubectl_describe::<ConfigMap>(Some(namespace)),
528            ));
529            futs.push(Box::pin(
530                self.dump_kubectl_describe::<Secret>(Some(namespace)),
531            ));
532            futs.push(Box::pin(
533                self.dump_kubectl_describe::<PersistentVolumeClaim>(Some(namespace)),
534            ));
535            futs.push(Box::pin(
536                self.dump_kubectl_describe::<ServiceAccount>(Some(namespace)),
537            ));
538            futs.push(Box::pin(
539                self.dump_kubectl_describe::<Certificate>(Some(namespace)),
540            ));
541        }
542
543        futs.push(Box::pin(self.dump_kubectl_describe::<Node>(None)));
544        futs.push(Box::pin(self.dump_kubectl_describe::<DaemonSet>(None)));
545        futs.push(Box::pin(self.dump_kubectl_describe::<StorageClass>(None)));
546        futs.push(Box::pin(
547            self.dump_kubectl_describe::<PersistentVolume>(None),
548        ));
549        futs.push(Box::pin(
550            self.dump_kubectl_describe::<MutatingWebhookConfiguration>(None),
551        ));
552        futs.push(Box::pin(
553            self.dump_kubectl_describe::<ValidatingWebhookConfiguration>(None),
554        ));
555        futs.push(Box::pin(
556            self.dump_kubectl_describe::<CustomResourceDefinition>(None),
557        ));
558
559        for namespace in self.k8s_namespaces.clone() {
560            futs.push(Box::pin(self.dump_namespaced_resources(namespace)));
561        }
562        futs.push(Box::pin(self.dump_cluster_resources()));
563
564        join_all(futs).await;
565    }
566}
567
568fn format_resource_path(
569    date_time: DateTime<Utc>,
570    resource_type: &str,
571    namespace: Option<&String>,
572) -> PathBuf {
573    let mut path = format_base_path(date_time).join(resource_type);
574
575    if let Some(namespace) = namespace {
576        path = path.join(namespace);
577    }
578    path
579}