mz_deploy/
docker_runtime.rs1use crate::client::{Client, ConnectionError, Profile};
17use crate::config::default_docker_image;
18use crate::verbose;
19use thiserror::Error;
20use tokio::process::Command;
21use tokio::time::{Duration, sleep};
22
23#[derive(Debug, Error)]
25pub(crate) enum DockerRuntimeError {
26 #[error("failed to start Materialize container: {0}")]
27 ContainerStartFailed(#[source] Box<dyn std::error::Error + Send + Sync>),
28
29 #[error("failed to connect to Materialize: {0}")]
30 ConnectionFailed(#[from] ConnectionError),
31}
32
33pub(crate) enum DockerStatus {
35 Running,
36 NotRunning,
37 NotInstalled,
38}
39
40const CONTAINER_NAME: &str = "mz-deploy-sandbox";
42
43const CONTAINER_PORT: u16 = 16875;
45
46pub(crate) struct DockerRuntime {
48 image: String,
49}
50
51impl DockerRuntime {
52 pub(crate) async fn check_availability() -> DockerStatus {
53 let result = Command::new("docker")
54 .arg("info")
55 .stdout(std::process::Stdio::null())
56 .stderr(std::process::Stdio::null())
57 .status()
58 .await;
59
60 match result {
61 Ok(status) if status.success() => DockerStatus::Running,
62 Ok(_) => DockerStatus::NotRunning,
63 Err(_) => DockerStatus::NotInstalled,
64 }
65 }
66
67 pub(crate) fn new() -> Self {
68 Self {
69 image: default_docker_image(),
70 }
71 }
72
73 pub(crate) fn with_image(mut self, image: impl Into<String>) -> Self {
74 self.image = image.into();
75 self
76 }
77
78 pub(crate) async fn get_client(&self) -> Result<Client, DockerRuntimeError> {
79 let profile = Self::make_profile();
80 let client = match Client::connect_with_profile_no_pin(profile).await {
83 Ok(client) => {
84 verbose!("Fast-path connect succeeded");
85 client
86 }
87 Err(_) => {
88 verbose!("Fast-path connect failed, falling back to Docker CLI");
89 self.ensure_container().await?;
90 let profile = Self::make_profile();
91 verbose!("Connecting to Materialize...");
92 let client = Client::connect_with_profile_no_pin(profile).await?;
93 verbose!("Connected");
94 client
95 }
96 };
97
98 Ok(client)
99 }
100
101 fn make_profile() -> Profile {
102 Profile {
103 name: "docker-sandbox".to_string(),
104 host: Some("localhost".to_string()),
105 port: CONTAINER_PORT,
106 username: "materialize".to_string(),
107 password: None,
108 options: Default::default(),
109 sslmode: None,
110 sslrootcert: None,
111 http_host: None,
112 }
113 }
114
115 async fn container_exists(&self) -> Result<bool, DockerRuntimeError> {
116 let output = Command::new("docker")
117 .args([
118 "ps",
119 "-a",
120 "--filter",
121 &format!("name=^{}$", CONTAINER_NAME),
122 "--format",
123 "{{.Names}}",
124 ])
125 .output()
126 .await
127 .map_err(|e| DockerRuntimeError::ContainerStartFailed(Box::new(e)))?;
128
129 Ok(output.status.success() && !output.stdout.is_empty())
130 }
131
132 async fn container_is_running(&self) -> Result<bool, DockerRuntimeError> {
133 let output = Command::new("docker")
134 .args([
135 "ps",
136 "--filter",
137 &format!("name=^{}$", CONTAINER_NAME),
138 "--format",
139 "{{.Names}}",
140 ])
141 .output()
142 .await
143 .map_err(|e| DockerRuntimeError::ContainerStartFailed(Box::new(e)))?;
144
145 Ok(output.status.success() && !output.stdout.is_empty())
146 }
147
148 async fn container_is_healthy(&self) -> bool {
149 Client::connect_with_profile_no_pin(Self::make_profile())
150 .await
151 .is_ok()
152 }
153
154 async fn remove_container(&self) -> Result<(), DockerRuntimeError> {
155 verbose!("Removing existing container: {}", CONTAINER_NAME);
156 let output = Command::new("docker")
157 .args(["rm", "-f", CONTAINER_NAME])
158 .output()
159 .await
160 .map_err(|e| DockerRuntimeError::ContainerStartFailed(Box::new(e)))?;
161
162 if !output.status.success() {
163 let stderr = String::from_utf8_lossy(&output.stderr);
164 return Err(DockerRuntimeError::ContainerStartFailed(
165 format!("Failed to remove container: {}", stderr).into(),
166 ));
167 }
168 Ok(())
169 }
170
171 async fn create_container(&self) -> Result<(), DockerRuntimeError> {
172 verbose!(
173 "Creating persistent container: {} (image: {})",
174 CONTAINER_NAME,
175 self.image
176 );
177
178 let output = Command::new("docker")
179 .args([
180 "run",
181 "-d",
182 "--name",
183 CONTAINER_NAME,
184 "-e",
185 "MZ_EAT_MY_DATA=1",
186 "-p",
187 &format!("{}:6875", CONTAINER_PORT),
188 &self.image,
189 "--system-parameter-default=max_tables=10000",
190 "--system-parameter-default=max_objects_per_schema=10000",
191 ])
192 .output()
193 .await
194 .map_err(|e| DockerRuntimeError::ContainerStartFailed(Box::new(e)))?;
195
196 if !output.status.success() {
197 let stderr = String::from_utf8_lossy(&output.stderr);
198 return Err(DockerRuntimeError::ContainerStartFailed(
199 format!("Failed to create container: {}", stderr).into(),
200 ));
201 }
202 Ok(())
203 }
204
205 async fn start_existing_container(&self) -> Result<(), DockerRuntimeError> {
206 verbose!("Starting existing container: {}", CONTAINER_NAME);
207
208 let output = Command::new("docker")
209 .args(["start", CONTAINER_NAME])
210 .output()
211 .await
212 .map_err(|e| DockerRuntimeError::ContainerStartFailed(Box::new(e)))?;
213
214 if !output.status.success() {
215 let stderr = String::from_utf8_lossy(&output.stderr);
216 return Err(DockerRuntimeError::ContainerStartFailed(
217 format!("Failed to start container: {}", stderr).into(),
218 ));
219 }
220 Ok(())
221 }
222
223 async fn wait_for_container(&self) -> Result<(), DockerRuntimeError> {
224 verbose!("Waiting for container to be ready...");
225 for i in 0..30 {
226 if self.container_is_healthy().await {
227 verbose!("Container is ready!");
228 return Ok(());
229 }
230 if i < 29 {
231 sleep(Duration::from_secs(1)).await;
232 }
233 }
234 Err(DockerRuntimeError::ContainerStartFailed(
235 "Container failed to become healthy within 30 seconds".into(),
236 ))
237 }
238
239 async fn ensure_container(&self) -> Result<(), DockerRuntimeError> {
240 let exists = self.container_exists().await?;
241 let is_running = if exists {
242 self.container_is_running().await?
243 } else {
244 false
245 };
246
247 if is_running {
248 verbose!("Found running container: {}", CONTAINER_NAME);
249 if self.container_is_healthy().await {
250 verbose!("Container is healthy, reusing it");
251 return Ok(());
252 } else {
253 verbose!("Container is unhealthy, recreating it");
254 self.remove_container().await?;
255 }
256 } else if exists {
257 verbose!("Found stopped container: {}", CONTAINER_NAME);
258 match self.start_existing_container().await {
259 Ok(_) => {
260 self.wait_for_container().await?;
261 return Ok(());
262 }
263 Err(_) => {
264 verbose!("Failed to start stopped container, recreating it");
265 self.remove_container().await?;
266 }
267 }
268 }
269
270 self.create_container().await?;
271 self.wait_for_container().await?;
272 Ok(())
273 }
274}
275
276impl Default for DockerRuntime {
277 fn default() -> Self {
278 Self::new()
279 }
280}