mz_debug/
k8s_dumper.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Dumps k8s resources to files.
17
18use std::fmt::Debug;
19use std::fs::{File, create_dir_all};
20use std::future::Future;
21use std::io::Write;
22use std::path::PathBuf;
23use std::pin::Pin;
24
25use futures::future::join_all;
26use k8s_openapi::NamespaceResourceScope;
27use k8s_openapi::api::admissionregistration::v1::{
28    MutatingWebhookConfiguration, ValidatingWebhookConfiguration,
29};
30use k8s_openapi::api::apps::v1::{DaemonSet, Deployment, ReplicaSet, StatefulSet};
31use k8s_openapi::api::core::v1::{
32    ConfigMap, Event, Node, PersistentVolume, PersistentVolumeClaim, Pod, Service, ServiceAccount,
33};
34use k8s_openapi::api::networking::v1::NetworkPolicy;
35use k8s_openapi::api::rbac::v1::{Role, RoleBinding};
36use k8s_openapi::api::storage::v1::StorageClass;
37use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
38use kube::api::{ListParams, LogParams};
39use kube::{Api, Client};
40use mz_cloud_resources::crd::generated::cert_manager::certificates::Certificate;
41use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
42
43use serde::{Serialize, de::DeserializeOwned};
44use tracing::{info, warn};
45
46use crate::{ContainerDumper, Context};
47
48struct K8sResourceDumper<'n, K> {
49    context: &'n Context,
50    api: Api<K>,
51    namespace: Option<String>,
52    resource_type: String,
53}
54
55impl<'n, K> K8sResourceDumper<'n, K>
56where
57    K: kube::Resource<DynamicType = ()> + Clone + Debug + Serialize + DeserializeOwned,
58{
59    fn cluster(context: &'n Context, client: Client) -> Self {
60        Self {
61            context,
62            api: Api::<K>::all(client),
63            namespace: None,
64            resource_type: K::plural(&()).into_owned(),
65        }
66    }
67
68    fn namespaced(context: &'n Context, client: Client, namespace: String) -> Self
69    where
70        K: kube::Resource<Scope = NamespaceResourceScope>,
71    {
72        Self {
73            context,
74            api: Api::<K>::namespaced(client, namespace.as_str()),
75            namespace: Some(namespace),
76            resource_type: K::plural(&()).into_owned(),
77        }
78    }
79
80    async fn _dump(&self) -> Result<(), anyhow::Error> {
81        let object_list = self.api.list(&ListParams::default()).await?;
82
83        if object_list.items.is_empty() {
84            let mut err_msg = format!("No {} found", self.resource_type);
85            if let Some(namespace) = &self.namespace {
86                err_msg = format!("{} for namespace {}", err_msg, namespace);
87            }
88            warn!("{}", err_msg);
89            return Ok(());
90        }
91        let file_path = format_resource_path(
92            self.context.base_path.clone(),
93            self.resource_type.as_str(),
94            self.namespace.as_ref(),
95        );
96        create_dir_all(&file_path)?;
97
98        for (i, item) in object_list.items.iter().enumerate() {
99            let file_name = file_path.join(format!(
100                "{}.yaml",
101                &item
102                    .meta()
103                    .name
104                    .clone()
105                    .unwrap_or_else(|| format!("unknown_{}", i))
106            ));
107            let mut file = File::create(&file_name)?;
108
109            serde_yaml::to_writer(&mut file, &item)?;
110
111            info!("Exported {}", file_name.display());
112        }
113
114        Ok(())
115    }
116
117    async fn dump(&self) {
118        if let Err(e) = self._dump().await {
119            warn!("Failed to write k8s {}: {}", self.resource_type, e);
120        }
121    }
122}
123
124pub struct K8sDumper<'n> {
125    context: &'n Context,
126    /// The kubernetes client to use.
127    client: Client,
128    /// The k8s namespace to dump.
129    k8s_namespace: String,
130    /// A list of additional k8s namespaces to dump.
131    k8s_additional_namespaces: Option<Vec<String>>,
132    /// The kubernetes context to use.
133    k8s_context: Option<String>,
134}
135
136impl<'n> K8sDumper<'n> {
137    pub fn new(
138        context: &'n Context,
139        client: Client,
140        k8s_namespace: String,
141        k8s_additional_namespaces: Option<Vec<String>>,
142        k8s_context: Option<String>,
143    ) -> Self {
144        Self {
145            context,
146            client,
147            k8s_namespace,
148            k8s_additional_namespaces,
149            k8s_context,
150        }
151    }
152
153    async fn _dump_kubectl_describe<K>(
154        &self,
155        namespace: Option<&String>,
156    ) -> Result<(), anyhow::Error>
157    where
158        K: kube::Resource<DynamicType = ()>,
159    {
160        let resource_type = K::plural(&()).into_owned();
161        let mut args = vec!["describe", &resource_type];
162        if let Some(namespace) = namespace {
163            args.extend(["-n", namespace]);
164        } else {
165            args.push("--all-namespaces");
166        }
167
168        if let Some(k8s_context) = &self.k8s_context {
169            args.extend(["--context", k8s_context]);
170        }
171
172        let output = tokio::process::Command::new("kubectl")
173            .args(args)
174            .stderr(std::process::Stdio::null()) // Silence stderr
175            .output()
176            .await?;
177
178        if !output.status.success() {
179            return Err(anyhow::anyhow!(
180                "{}",
181                String::from_utf8_lossy(&output.stderr)
182            ));
183        }
184
185        if output.stdout.is_empty() {
186            let mut err_msg = format!("Describe: No {} found", resource_type);
187            if let Some(namespace) = namespace {
188                err_msg = format!("{} for namespace {}", err_msg, namespace);
189            }
190            warn!("{}", err_msg);
191            return Ok(());
192        }
193
194        let file_path = format_resource_path(
195            self.context.base_path.clone(),
196            resource_type.as_str(),
197            namespace,
198        );
199        let file_name = file_path.join("describe.txt");
200        create_dir_all(&file_path)?;
201        let mut file = File::create(&file_name)?;
202        file.write_all(&output.stdout)?;
203
204        info!("Exported {}", file_name.display());
205
206        Ok(())
207    }
208
209    async fn dump_kubectl_describe<K>(&self, namespace: Option<&String>)
210    where
211        K: kube::Resource<DynamicType = ()>,
212    {
213        if let Err(e) = self._dump_kubectl_describe::<K>(namespace).await {
214            warn!(
215                "Failed to dump kubectl describe for {}: {}",
216                K::plural(&()).into_owned(),
217                e
218            );
219        }
220    }
221
222    /// Write cluster-level k8s resources to a yaml file per resource.
223    async fn dump_cluster_resources(&self) {
224        K8sResourceDumper::<Node>::cluster(self.context, self.client.clone())
225            .dump()
226            .await;
227
228        K8sResourceDumper::<StorageClass>::cluster(self.context, self.client.clone())
229            .dump()
230            .await;
231
232        K8sResourceDumper::<PersistentVolume>::cluster(self.context, self.client.clone())
233            .dump()
234            .await;
235
236        K8sResourceDumper::<MutatingWebhookConfiguration>::cluster(
237            self.context,
238            self.client.clone(),
239        )
240        .dump()
241        .await;
242
243        K8sResourceDumper::<ValidatingWebhookConfiguration>::cluster(
244            self.context,
245            self.client.clone(),
246        )
247        .dump()
248        .await;
249        K8sResourceDumper::<DaemonSet>::cluster(self.context, self.client.clone())
250            .dump()
251            .await;
252        K8sResourceDumper::<CustomResourceDefinition>::cluster(self.context, self.client.clone())
253            .dump()
254            .await;
255    }
256
257    async fn _dump_k8s_pod_logs(&self, namespace: &String) -> Result<(), anyhow::Error> {
258        let file_path =
259            format_resource_path(self.context.base_path.clone(), "logs", Some(namespace));
260        create_dir_all(&file_path)?;
261
262        let pods: Api<Pod> = Api::<Pod>::namespaced(self.client.clone(), namespace);
263        let pod_list = pods.list(&ListParams::default()).await?;
264
265        for (i, pod) in pod_list.items.iter().enumerate() {
266            let pod_name = pod
267                .metadata
268                .name
269                .clone()
270                .unwrap_or_else(|| format!("unknown_{}", i));
271            async fn export_pod_logs(
272                pods: &Api<Pod>,
273                pod_name: &str,
274                file_path: &PathBuf,
275                is_previous: bool,
276            ) -> Result<(), anyhow::Error> {
277                let suffix = if is_previous { "previous" } else { "current" };
278                let file_name = file_path.join(format!("{}.{}.log", pod_name, suffix));
279
280                let logs = pods
281                    .logs(
282                        pod_name,
283                        &LogParams {
284                            previous: is_previous,
285                            timestamps: true,
286                            ..Default::default()
287                        },
288                    )
289                    .await?;
290
291                if logs.is_empty() {
292                    warn!("No {} logs found for pod {}", suffix, pod_name);
293                    return Ok(());
294                }
295
296                let mut file = File::create(&file_name)?;
297                file.write_all(logs.as_bytes())?;
298                info!("Exported {}", file_name.display());
299
300                Ok(())
301            }
302
303            if let Err(e) = export_pod_logs(&pods, &pod_name, &file_path, true).await {
304                match e.downcast_ref::<kube::Error>() {
305                    Some(kube::Error::Api(e)) if e.code == 400 => {
306                        warn!("No previous logs available for pod {}", pod_name);
307                    }
308                    _ => {
309                        warn!(
310                            "Failed to export previous logs for pod {}: {}",
311                            &pod_name, e
312                        );
313                    }
314                }
315            }
316
317            if let Err(e) = export_pod_logs(&pods, &pod_name, &file_path, false).await {
318                warn!("Failed to export current logs for pod {}: {}", &pod_name, e);
319            }
320        }
321        Ok(())
322    }
323
324    /// Write k8s pod logs to a yaml file per pod.
325    async fn dump_k8s_pod_logs(&self, namespace: &String) {
326        if let Err(e) = self._dump_k8s_pod_logs(namespace).await {
327            warn!("Failed to dump k8s pod logs: {}", e);
328        }
329    }
330
331    /// Write namespace-level k8s resources to a yaml file per resource.
332    pub async fn dump_namespaced_resources(&self, namespace: String) {
333        K8sResourceDumper::<Pod>::namespaced(self.context, self.client.clone(), namespace.clone())
334            .dump()
335            .await;
336        K8sResourceDumper::<Service>::namespaced(
337            self.context,
338            self.client.clone(),
339            namespace.clone(),
340        )
341        .dump()
342        .await;
343        K8sResourceDumper::<Deployment>::namespaced(
344            self.context,
345            self.client.clone(),
346            namespace.clone(),
347        )
348        .dump()
349        .await;
350        K8sResourceDumper::<StatefulSet>::namespaced(
351            self.context,
352            self.client.clone(),
353            namespace.clone(),
354        )
355        .dump()
356        .await;
357        K8sResourceDumper::<ReplicaSet>::namespaced(
358            self.context,
359            self.client.clone(),
360            namespace.clone(),
361        )
362        .dump()
363        .await;
364        K8sResourceDumper::<NetworkPolicy>::namespaced(
365            self.context,
366            self.client.clone(),
367            namespace.clone(),
368        )
369        .dump()
370        .await;
371        K8sResourceDumper::<Event>::namespaced(
372            self.context,
373            self.client.clone(),
374            namespace.clone(),
375        )
376        .dump()
377        .await;
378        K8sResourceDumper::<Materialize>::namespaced(
379            self.context,
380            self.client.clone(),
381            namespace.clone(),
382        )
383        .dump()
384        .await;
385        K8sResourceDumper::<Role>::namespaced(self.context, self.client.clone(), namespace.clone())
386            .dump()
387            .await;
388        K8sResourceDumper::<RoleBinding>::namespaced(
389            self.context,
390            self.client.clone(),
391            namespace.clone(),
392        )
393        .dump()
394        .await;
395        K8sResourceDumper::<ConfigMap>::namespaced(
396            self.context,
397            self.client.clone(),
398            namespace.clone(),
399        )
400        .dump()
401        .await;
402        K8sResourceDumper::<PersistentVolumeClaim>::namespaced(
403            self.context,
404            self.client.clone(),
405            namespace.clone(),
406        )
407        .dump()
408        .await;
409        K8sResourceDumper::<ServiceAccount>::namespaced(
410            self.context,
411            self.client.clone(),
412            namespace.clone(),
413        )
414        .dump()
415        .await;
416
417        K8sResourceDumper::<Certificate>::namespaced(
418            self.context,
419            self.client.clone(),
420            namespace.clone(),
421        )
422        .dump()
423        .await;
424
425        self.dump_k8s_pod_logs(&namespace).await;
426    }
427}
428
429impl<'n> ContainerDumper for K8sDumper<'n> {
430    async fn dump_container_resources(&self) {
431        let mut futs: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![];
432
433        let k8s_namespaces_iter = std::iter::once(&self.k8s_namespace)
434            .chain(self.k8s_additional_namespaces.iter().flatten());
435
436        for namespace in k8s_namespaces_iter.clone() {
437            futs.push(Box::pin(self.dump_kubectl_describe::<Pod>(Some(namespace))));
438            futs.push(Box::pin(
439                self.dump_kubectl_describe::<Service>(Some(namespace)),
440            ));
441            futs.push(Box::pin(
442                self.dump_kubectl_describe::<Deployment>(Some(namespace)),
443            ));
444            futs.push(Box::pin(
445                self.dump_kubectl_describe::<StatefulSet>(Some(namespace)),
446            ));
447            futs.push(Box::pin(
448                self.dump_kubectl_describe::<ReplicaSet>(Some(namespace)),
449            ));
450            futs.push(Box::pin(
451                self.dump_kubectl_describe::<NetworkPolicy>(Some(namespace)),
452            ));
453            futs.push(Box::pin(
454                self.dump_kubectl_describe::<Event>(Some(namespace)),
455            ));
456            futs.push(Box::pin(
457                self.dump_kubectl_describe::<Materialize>(Some(namespace)),
458            ));
459            futs.push(Box::pin(
460                self.dump_kubectl_describe::<Role>(Some(namespace)),
461            ));
462            futs.push(Box::pin(
463                self.dump_kubectl_describe::<RoleBinding>(Some(namespace)),
464            ));
465            futs.push(Box::pin(
466                self.dump_kubectl_describe::<ConfigMap>(Some(namespace)),
467            ));
468            futs.push(Box::pin(
469                self.dump_kubectl_describe::<PersistentVolumeClaim>(Some(namespace)),
470            ));
471            futs.push(Box::pin(
472                self.dump_kubectl_describe::<ServiceAccount>(Some(namespace)),
473            ));
474            futs.push(Box::pin(
475                self.dump_kubectl_describe::<Certificate>(Some(namespace)),
476            ));
477        }
478
479        futs.push(Box::pin(self.dump_kubectl_describe::<Node>(None)));
480        futs.push(Box::pin(self.dump_kubectl_describe::<DaemonSet>(None)));
481        futs.push(Box::pin(self.dump_kubectl_describe::<StorageClass>(None)));
482        futs.push(Box::pin(
483            self.dump_kubectl_describe::<PersistentVolume>(None),
484        ));
485        futs.push(Box::pin(
486            self.dump_kubectl_describe::<MutatingWebhookConfiguration>(None),
487        ));
488        futs.push(Box::pin(
489            self.dump_kubectl_describe::<ValidatingWebhookConfiguration>(None),
490        ));
491        futs.push(Box::pin(
492            self.dump_kubectl_describe::<CustomResourceDefinition>(None),
493        ));
494
495        for namespace in k8s_namespaces_iter.clone() {
496            futs.push(Box::pin(self.dump_namespaced_resources(namespace.clone())));
497        }
498        futs.push(Box::pin(self.dump_cluster_resources()));
499
500        join_all(futs).await;
501    }
502}
503
504fn format_resource_path(
505    base_path: PathBuf,
506    resource_type: &str,
507    namespace: Option<&String>,
508) -> PathBuf {
509    let mut path = base_path.join(resource_type);
510
511    if let Some(namespace) = namespace {
512        path = path.join(namespace);
513    }
514    path
515}