Skip to main content

mz_deploy/
docker_runtime.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Docker runtime for managing a persistent Materialize container.
11//!
12//! Provides a connected client backed by a long-lived `materialize/materialized`
13//! container. Used by the `test` command to execute unit tests against a real
14//! Materialize instance.
15
16use crate::client::{Client, ConnectionError, Profile};
17use crate::config::default_docker_image;
18use crate::verbose;
19use thiserror::Error;
20use tokio::process::Command;
21use tokio::time::{Duration, sleep};
22
23/// Errors raised while starting or connecting to the Materialize container.
24#[derive(Debug, Error)]
25pub(crate) enum DockerRuntimeError {
26    #[error("failed to start Materialize container: {0}")]
27    ContainerStartFailed(#[source] Box<dyn std::error::Error + Send + Sync>),
28
29    #[error("failed to connect to Materialize: {0}")]
30    ConnectionFailed(#[from] ConnectionError),
31}
32
33/// Possible states of Docker availability on the host system.
34pub(crate) enum DockerStatus {
35    Running,
36    NotRunning,
37    NotInstalled,
38}
39
40/// Name of the persistent Docker container used by `test` and `explain`.
41const CONTAINER_NAME: &str = "mz-deploy-sandbox";
42
43/// Host port to bind for the persistent container.
44const CONTAINER_PORT: u16 = 16875;
45
46/// Manages the Materialize container used for runtime validation.
47pub(crate) struct DockerRuntime {
48    image: String,
49}
50
51impl DockerRuntime {
52    pub(crate) async fn check_availability() -> DockerStatus {
53        let result = Command::new("docker")
54            .arg("info")
55            .stdout(std::process::Stdio::null())
56            .stderr(std::process::Stdio::null())
57            .status()
58            .await;
59
60        match result {
61            Ok(status) if status.success() => DockerStatus::Running,
62            Ok(_) => DockerStatus::NotRunning,
63            Err(_) => DockerStatus::NotInstalled,
64        }
65    }
66
67    pub(crate) fn new() -> Self {
68        Self {
69            image: default_docker_image(),
70        }
71    }
72
73    pub(crate) fn with_image(mut self, image: impl Into<String>) -> Self {
74        self.image = image.into();
75        self
76    }
77
78    pub(crate) async fn get_client(&self) -> Result<Client, DockerRuntimeError> {
79        let profile = Self::make_profile();
80        // The ephemeral Docker container has no `_mz_deploy_server` cluster,
81        // so bypass the usual session-cluster pin and use the server default.
82        let client = match Client::connect_with_profile_no_pin(profile).await {
83            Ok(client) => {
84                verbose!("Fast-path connect succeeded");
85                client
86            }
87            Err(_) => {
88                verbose!("Fast-path connect failed, falling back to Docker CLI");
89                self.ensure_container().await?;
90                let profile = Self::make_profile();
91                verbose!("Connecting to Materialize...");
92                let client = Client::connect_with_profile_no_pin(profile).await?;
93                verbose!("Connected");
94                client
95            }
96        };
97
98        Ok(client)
99    }
100
101    fn make_profile() -> Profile {
102        Profile {
103            name: "docker-sandbox".to_string(),
104            host: Some("localhost".to_string()),
105            port: CONTAINER_PORT,
106            username: "materialize".to_string(),
107            password: None,
108            options: Default::default(),
109            sslmode: None,
110            sslrootcert: None,
111            http_host: None,
112        }
113    }
114
115    async fn container_exists(&self) -> Result<bool, DockerRuntimeError> {
116        let output = Command::new("docker")
117            .args([
118                "ps",
119                "-a",
120                "--filter",
121                &format!("name=^{}$", CONTAINER_NAME),
122                "--format",
123                "{{.Names}}",
124            ])
125            .output()
126            .await
127            .map_err(|e| DockerRuntimeError::ContainerStartFailed(Box::new(e)))?;
128
129        Ok(output.status.success() && !output.stdout.is_empty())
130    }
131
132    async fn container_is_running(&self) -> Result<bool, DockerRuntimeError> {
133        let output = Command::new("docker")
134            .args([
135                "ps",
136                "--filter",
137                &format!("name=^{}$", CONTAINER_NAME),
138                "--format",
139                "{{.Names}}",
140            ])
141            .output()
142            .await
143            .map_err(|e| DockerRuntimeError::ContainerStartFailed(Box::new(e)))?;
144
145        Ok(output.status.success() && !output.stdout.is_empty())
146    }
147
148    async fn container_is_healthy(&self) -> bool {
149        Client::connect_with_profile_no_pin(Self::make_profile())
150            .await
151            .is_ok()
152    }
153
154    async fn remove_container(&self) -> Result<(), DockerRuntimeError> {
155        verbose!("Removing existing container: {}", CONTAINER_NAME);
156        let output = Command::new("docker")
157            .args(["rm", "-f", CONTAINER_NAME])
158            .output()
159            .await
160            .map_err(|e| DockerRuntimeError::ContainerStartFailed(Box::new(e)))?;
161
162        if !output.status.success() {
163            let stderr = String::from_utf8_lossy(&output.stderr);
164            return Err(DockerRuntimeError::ContainerStartFailed(
165                format!("Failed to remove container: {}", stderr).into(),
166            ));
167        }
168        Ok(())
169    }
170
171    async fn create_container(&self) -> Result<(), DockerRuntimeError> {
172        verbose!(
173            "Creating persistent container: {} (image: {})",
174            CONTAINER_NAME,
175            self.image
176        );
177
178        let output = Command::new("docker")
179            .args([
180                "run",
181                "-d",
182                "--name",
183                CONTAINER_NAME,
184                "-e",
185                "MZ_EAT_MY_DATA=1",
186                "-p",
187                &format!("{}:6875", CONTAINER_PORT),
188                &self.image,
189                "--system-parameter-default=max_tables=10000",
190                "--system-parameter-default=max_objects_per_schema=10000",
191            ])
192            .output()
193            .await
194            .map_err(|e| DockerRuntimeError::ContainerStartFailed(Box::new(e)))?;
195
196        if !output.status.success() {
197            let stderr = String::from_utf8_lossy(&output.stderr);
198            return Err(DockerRuntimeError::ContainerStartFailed(
199                format!("Failed to create container: {}", stderr).into(),
200            ));
201        }
202        Ok(())
203    }
204
205    async fn start_existing_container(&self) -> Result<(), DockerRuntimeError> {
206        verbose!("Starting existing container: {}", CONTAINER_NAME);
207
208        let output = Command::new("docker")
209            .args(["start", CONTAINER_NAME])
210            .output()
211            .await
212            .map_err(|e| DockerRuntimeError::ContainerStartFailed(Box::new(e)))?;
213
214        if !output.status.success() {
215            let stderr = String::from_utf8_lossy(&output.stderr);
216            return Err(DockerRuntimeError::ContainerStartFailed(
217                format!("Failed to start container: {}", stderr).into(),
218            ));
219        }
220        Ok(())
221    }
222
223    async fn wait_for_container(&self) -> Result<(), DockerRuntimeError> {
224        verbose!("Waiting for container to be ready...");
225        for i in 0..30 {
226            if self.container_is_healthy().await {
227                verbose!("Container is ready!");
228                return Ok(());
229            }
230            if i < 29 {
231                sleep(Duration::from_secs(1)).await;
232            }
233        }
234        Err(DockerRuntimeError::ContainerStartFailed(
235            "Container failed to become healthy within 30 seconds".into(),
236        ))
237    }
238
239    async fn ensure_container(&self) -> Result<(), DockerRuntimeError> {
240        let exists = self.container_exists().await?;
241        let is_running = if exists {
242            self.container_is_running().await?
243        } else {
244            false
245        };
246
247        if is_running {
248            verbose!("Found running container: {}", CONTAINER_NAME);
249            if self.container_is_healthy().await {
250                verbose!("Container is healthy, reusing it");
251                return Ok(());
252            } else {
253                verbose!("Container is unhealthy, recreating it");
254                self.remove_container().await?;
255            }
256        } else if exists {
257            verbose!("Found stopped container: {}", CONTAINER_NAME);
258            match self.start_existing_container().await {
259                Ok(_) => {
260                    self.wait_for_container().await?;
261                    return Ok(());
262                }
263                Err(_) => {
264                    verbose!("Failed to start stopped container, recreating it");
265                    self.remove_container().await?;
266                }
267            }
268        }
269
270        self.create_container().await?;
271        self.wait_for_container().await?;
272        Ok(())
273    }
274}
275
276impl Default for DockerRuntime {
277    fn default() -> Self {
278        Self::new()
279    }
280}