1use std::collections::BTreeMap;
11use std::env;
12use std::ffi::OsStr;
13use std::fmt::Debug;
14use std::fs::Permissions;
15use std::future::Future;
16use std::net::{IpAddr, SocketAddr, TcpListener as StdTcpListener};
17use std::os::unix::fs::PermissionsExt;
18use std::os::unix::process::ExitStatusExt;
19use std::path::{Path, PathBuf};
20use std::process::{ExitStatus, Stdio};
21use std::str::FromStr;
22use std::sync::atomic::{AtomicBool, Ordering};
23use std::sync::{Arc, Mutex};
24
25use anyhow::{Context, anyhow, bail};
26use async_stream::stream;
27use async_trait::async_trait;
28use chrono::{DateTime, Utc};
29use futures::StreamExt;
30use futures::stream::{BoxStream, FuturesUnordered};
31use itertools::Itertools;
32use libc::{SIGABRT, SIGBUS, SIGILL, SIGSEGV, SIGTRAP};
33use maplit::btreemap;
34use mz_orchestrator::scheduling_config::ServiceSchedulingConfig;
35use mz_orchestrator::{
36 CpuLimit, DiskLimit, MemoryLimit, NamespacedOrchestrator, Orchestrator, Service, ServiceConfig,
37 ServiceEvent, ServicePort, ServiceProcessMetrics, ServiceStatus,
38};
39use mz_ore::cast::{CastFrom, TryCastFrom};
40use mz_ore::error::ErrorExt;
41use mz_ore::netio::UnixSocketAddr;
42use mz_ore::result::ResultExt;
43use mz_ore::task::AbortOnDropHandle;
44use scopeguard::defer;
45use serde::Serialize;
46use sha1::{Digest, Sha1};
47use sysinfo::{Pid, PidExt, Process, ProcessExt, ProcessRefreshKind, System, SystemExt};
48use tokio::fs::remove_dir_all;
49use tokio::net::{TcpListener, UnixStream};
50use tokio::process::{Child, Command};
51use tokio::sync::{broadcast, mpsc, oneshot};
52use tokio::time::{self, Duration};
53use tokio::{fs, io, select};
54use tracing::{debug, error, info, warn};
55
56pub mod secrets;
57
58#[derive(Debug, Clone)]
60pub struct ProcessOrchestratorConfig {
61 pub image_dir: PathBuf,
64 pub suppress_output: bool,
66 pub environment_id: String,
68 pub secrets_dir: PathBuf,
70 pub command_wrapper: Vec<String>,
72 pub propagate_crashes: bool,
74 pub tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
85 pub scratch_directory: PathBuf,
87}
88
89#[derive(Debug, Clone)]
93pub struct ProcessOrchestratorTcpProxyConfig {
94 pub listen_addr: IpAddr,
96 pub prometheus_service_discovery_dir: Option<PathBuf>,
106}
107
108#[derive(Debug)]
114pub struct ProcessOrchestrator {
115 image_dir: PathBuf,
116 suppress_output: bool,
117 namespaces: Mutex<BTreeMap<String, Arc<dyn NamespacedOrchestrator>>>,
118 metadata_dir: PathBuf,
119 secrets_dir: PathBuf,
120 command_wrapper: Vec<String>,
121 propagate_crashes: bool,
122 tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
123 scratch_directory: PathBuf,
124 launch_spec: LaunchSpec,
125}
126
127#[derive(Debug, Clone, Copy)]
128enum LaunchSpec {
129 Direct,
131 Systemd,
133}
134
135impl LaunchSpec {
136 fn determine_implementation() -> Result<Self, anyhow::Error> {
137 match Path::new("/run/systemd/system/").try_exists()? {
141 true => Ok(Self::Systemd),
142 false => Ok(Self::Direct),
143 }
144 }
145
146 fn refine_command(
147 &self,
148 image: impl AsRef<OsStr>,
149 args: &[impl AsRef<OsStr>],
150 wrapper: &[String],
151 full_id: &str,
152 listen_addrs: &BTreeMap<String, String>,
153 memory_limit: Option<&MemoryLimit>,
154 cpu_limit: Option<&CpuLimit>,
155 ) -> Command {
156 let wrapper_parts = || {
157 (
158 &wrapper[0],
159 wrapper[1..]
160 .iter()
161 .map(|part| interpolate_command(part, full_id, listen_addrs)),
162 )
163 };
164
165 let mut cmd = match self {
166 Self::Direct => {
167 if wrapper.is_empty() {
168 Command::new(image)
169 } else {
170 let (program, wrapper_args) = wrapper_parts();
171 let mut cmd = Command::new(program);
172 cmd.args(wrapper_args);
173 cmd.arg(image);
174 cmd
175 }
176 }
177 Self::Systemd => {
178 let mut cmd = Command::new("systemd-run");
179 cmd.args(["--user", "--scope", "--quiet"]);
180 if let Some(memory_limit) = memory_limit {
181 let memory_limit = memory_limit.0.as_u64();
182 cmd.args(["-p", &format!("MemoryMax={memory_limit}")]);
183 }
185 if let Some(cpu_limit) = cpu_limit {
186 let cpu_limit = (cpu_limit.as_millicpus() + 9) / 10;
187 cmd.args(["-p", &format!("CPUQuota={cpu_limit}%")]);
188 }
189
190 if !wrapper.is_empty() {
191 let (program, wrapper_args) = wrapper_parts();
192 cmd.arg(program);
193 cmd.args(wrapper_args);
194 };
195 cmd.arg(image);
196 cmd
197 }
198 };
199 cmd.args(args);
200 cmd
201 }
202}
203
204impl ProcessOrchestrator {
205 pub async fn new(
207 ProcessOrchestratorConfig {
208 image_dir,
209 suppress_output,
210 environment_id,
211 secrets_dir,
212 command_wrapper,
213 propagate_crashes,
214 tcp_proxy,
215 scratch_directory,
216 }: ProcessOrchestratorConfig,
217 ) -> Result<ProcessOrchestrator, anyhow::Error> {
218 let metadata_dir = env::temp_dir().join(format!("environmentd-{environment_id}"));
219 fs::create_dir_all(&metadata_dir)
220 .await
221 .context("creating metadata directory")?;
222 fs::create_dir_all(&secrets_dir)
223 .await
224 .context("creating secrets directory")?;
225 fs::set_permissions(&secrets_dir, Permissions::from_mode(0o700))
226 .await
227 .context("setting secrets directory permissions")?;
228 if let Some(prometheus_dir) = tcp_proxy
229 .as_ref()
230 .and_then(|p| p.prometheus_service_discovery_dir.as_ref())
231 {
232 fs::create_dir_all(&prometheus_dir)
233 .await
234 .context("creating prometheus directory")?;
235 }
236
237 let launch_spec = LaunchSpec::determine_implementation()?;
238 info!(driver = ?launch_spec, "Process orchestrator launch spec");
239
240 Ok(ProcessOrchestrator {
241 image_dir: fs::canonicalize(image_dir).await?,
242 suppress_output,
243 namespaces: Mutex::new(BTreeMap::new()),
244 metadata_dir: fs::canonicalize(metadata_dir).await?,
245 secrets_dir: fs::canonicalize(secrets_dir).await?,
246 command_wrapper,
247 propagate_crashes,
248 tcp_proxy,
249 scratch_directory,
250 launch_spec,
251 })
252 }
253}
254
255impl Orchestrator for ProcessOrchestrator {
256 fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
257 let mut namespaces = self.namespaces.lock().expect("lock poisoned");
258 Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| {
259 let config = Arc::new(NamespacedProcessOrchestratorConfig {
260 namespace: namespace.into(),
261 image_dir: self.image_dir.clone(),
262 suppress_output: self.suppress_output,
263 metadata_dir: self.metadata_dir.clone(),
264 command_wrapper: self.command_wrapper.clone(),
265 propagate_crashes: self.propagate_crashes,
266 tcp_proxy: self.tcp_proxy.clone(),
267 scratch_directory: self.scratch_directory.clone(),
268 launch_spec: self.launch_spec,
269 });
270
271 let services = Arc::new(Mutex::new(BTreeMap::new()));
272 let (service_event_tx, service_event_rx) = broadcast::channel(16384);
273 let (command_tx, command_rx) = mpsc::unbounded_channel();
274
275 let worker = OrchestratorWorker {
276 config: Arc::clone(&config),
277 services: Arc::clone(&services),
278 service_event_tx,
279 system: System::new(),
280 command_rx,
281 }
282 .spawn();
283
284 Arc::new(NamespacedProcessOrchestrator {
285 config,
286 services,
287 service_event_rx,
288 command_tx,
289 scheduling_config: Default::default(),
290 _worker: worker,
291 })
292 }))
293 }
294}
295
296#[derive(Debug)]
298struct NamespacedProcessOrchestratorConfig {
299 namespace: String,
300 image_dir: PathBuf,
301 suppress_output: bool,
302 metadata_dir: PathBuf,
303 command_wrapper: Vec<String>,
304 propagate_crashes: bool,
305 tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
306 scratch_directory: PathBuf,
307 launch_spec: LaunchSpec,
308}
309
310impl NamespacedProcessOrchestratorConfig {
311 fn full_id(&self, id: &str) -> String {
312 format!("{}-{}", self.namespace, id)
313 }
314
315 fn service_run_dir(&self, id: &str) -> PathBuf {
316 self.metadata_dir.join(&self.full_id(id))
317 }
318
319 fn service_scratch_dir(&self, id: &str) -> PathBuf {
320 self.scratch_directory.join(&self.full_id(id))
321 }
322}
323
324#[derive(Debug)]
325struct NamespacedProcessOrchestrator {
326 config: Arc<NamespacedProcessOrchestratorConfig>,
327 services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
328 service_event_rx: broadcast::Receiver<ServiceEvent>,
329 command_tx: mpsc::UnboundedSender<WorkerCommand>,
330 scheduling_config: std::sync::RwLock<ServiceSchedulingConfig>,
331 _worker: AbortOnDropHandle<()>,
332}
333
334impl NamespacedProcessOrchestrator {
335 fn send_command(&self, cmd: WorkerCommand) {
336 self.command_tx.send(cmd).expect("worker task not dropped");
337 }
338}
339
340#[async_trait]
341impl NamespacedOrchestrator for NamespacedProcessOrchestrator {
342 fn ensure_service(
343 &self,
344 id: &str,
345 config: ServiceConfig,
346 ) -> Result<Box<dyn Service>, anyhow::Error> {
347 let always_use_disk = self
348 .scheduling_config
349 .read()
350 .expect("poisoned")
351 .always_use_disk;
352
353 let service = ProcessService {
354 run_dir: self.config.service_run_dir(id),
355 scale: config.scale,
356 };
357 let disk = {
361 let user_requested_disk = config.disk || always_use_disk;
364 let size_disables_disk = config.disk_limit == Some(DiskLimit::ZERO);
368 user_requested_disk && !size_disables_disk
375 };
376
377 let config = EnsureServiceConfig {
378 image: config.image,
379 args: config.args,
380 ports: config.ports,
381 memory_limit: config.memory_limit,
382 cpu_limit: config.cpu_limit,
383 scale: config.scale,
384 labels: config.labels,
385 disk,
386 };
387
388 self.send_command(WorkerCommand::EnsureService {
389 id: id.to_string(),
390 config,
391 });
392
393 Ok(Box::new(service))
394 }
395
396 fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
397 self.send_command(WorkerCommand::DropService { id: id.to_string() });
398 Ok(())
399 }
400
401 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
402 let (result_tx, result_rx) = oneshot::channel();
403 self.send_command(WorkerCommand::ListServices { result_tx });
404
405 result_rx.await.expect("worker task not dropped")
406 }
407
408 fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
409 let mut initial_events = vec![];
410 let mut service_event_rx = {
411 let services = self.services.lock().expect("lock poisoned");
412 for (service_id, process_states) in &*services {
413 for (process_id, process_state) in process_states.iter().enumerate() {
414 initial_events.push(ServiceEvent {
415 service_id: service_id.clone(),
416 process_id: u64::cast_from(process_id),
417 status: process_state.status.into(),
418 time: process_state.status_time,
419 });
420 }
421 }
422 self.service_event_rx.resubscribe()
423 };
424 Box::pin(stream! {
425 for event in initial_events {
426 yield Ok(event);
427 }
428 loop {
429 yield service_event_rx.recv().await.err_into();
430 }
431 })
432 }
433
434 async fn fetch_service_metrics(
435 &self,
436 id: &str,
437 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
438 let (result_tx, result_rx) = oneshot::channel();
439 self.send_command(WorkerCommand::FetchServiceMetrics {
440 id: id.to_string(),
441 result_tx,
442 });
443
444 result_rx.await.expect("worker task not dropped")
445 }
446
447 fn update_scheduling_config(
448 &self,
449 config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
450 ) {
451 *self.scheduling_config.write().expect("poisoned") = config;
452 }
453}
454
455enum WorkerCommand {
461 EnsureService {
462 id: String,
463 config: EnsureServiceConfig,
464 },
465 DropService {
466 id: String,
467 },
468 ListServices {
469 result_tx: oneshot::Sender<Result<Vec<String>, anyhow::Error>>,
470 },
471 FetchServiceMetrics {
472 id: String,
473 result_tx: oneshot::Sender<Result<Vec<ServiceProcessMetrics>, anyhow::Error>>,
474 },
475}
476
477struct EnsureServiceConfig {
479 pub image: String,
483 pub args: Box<dyn Fn(&BTreeMap<String, String>) -> Vec<String> + Send + Sync>,
486 pub ports: Vec<ServicePort>,
488 pub memory_limit: Option<MemoryLimit>,
490 pub cpu_limit: Option<CpuLimit>,
492 pub scale: u16,
494 pub labels: BTreeMap<String, String>,
499 pub disk: bool,
501}
502
503struct OrchestratorWorker {
515 config: Arc<NamespacedProcessOrchestratorConfig>,
516 services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
517 service_event_tx: broadcast::Sender<ServiceEvent>,
518 system: System,
519 command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
520}
521
522impl OrchestratorWorker {
523 fn spawn(self) -> AbortOnDropHandle<()> {
524 let name = format!("process-orchestrator:{}", self.config.namespace);
525 mz_ore::task::spawn(|| name, self.run()).abort_on_drop()
526 }
527
528 async fn run(mut self) {
529 while let Some(cmd) = self.command_rx.recv().await {
530 use WorkerCommand::*;
531 let result = match cmd {
532 EnsureService { id, config } => self.ensure_service(id, config).await,
533 DropService { id } => self.drop_service(&id).await,
534 ListServices { result_tx } => {
535 let _ = result_tx.send(self.list_services().await);
536 Ok(())
537 }
538 FetchServiceMetrics { id, result_tx } => {
539 let _ = result_tx.send(self.fetch_service_metrics(&id));
540 Ok(())
541 }
542 };
543
544 if let Err(error) = result {
545 panic!("process orchestrator worker failed: {error}");
546 }
547 }
548 }
549
550 fn fetch_service_metrics(
551 &mut self,
552 id: &str,
553 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
554 let pids: Vec<_> = {
555 let services = self.services.lock().expect("lock poisoned");
556 let Some(service) = services.get(id) else {
557 bail!("unknown service {id}")
558 };
559 service.iter().map(|p| p.pid()).collect()
560 };
561
562 let mut metrics = vec![];
563 for pid in pids {
564 let (cpu_nano_cores, memory_bytes) = match pid {
565 None => (None, None),
566 Some(pid) => {
567 self.system
568 .refresh_process_specifics(pid, ProcessRefreshKind::new().with_cpu());
569 match self.system.process(pid) {
570 None => (None, None),
571 Some(process) => {
572 let cpu = u64::try_cast_from(
581 (f64::from(process.cpu_usage()) * 10_000_000.0).trunc(),
582 )
583 .expect("sane value of process.cpu_usage()");
584 let memory = process.memory();
585 (Some(cpu), Some(memory))
586 }
587 }
588 }
589 };
590 metrics.push(ServiceProcessMetrics {
591 cpu_nano_cores,
592 memory_bytes,
593 disk_usage_bytes: None,
595 });
596 }
597 Ok(metrics)
598 }
599
600 async fn ensure_service(
601 &self,
602 id: String,
603 EnsureServiceConfig {
604 image,
605 args,
606 ports: ports_in,
607 memory_limit,
608 cpu_limit,
609 scale,
610 labels,
611 disk,
612 }: EnsureServiceConfig,
613 ) -> Result<(), anyhow::Error> {
614 let full_id = self.config.full_id(&id);
615
616 let run_dir = self.config.service_run_dir(&id);
617 fs::create_dir_all(&run_dir)
618 .await
619 .context("creating run directory")?;
620 let scratch_dir = if disk {
621 let scratch_dir = self.config.service_scratch_dir(&id);
622 fs::create_dir_all(&scratch_dir)
623 .await
624 .context("creating scratch directory")?;
625 Some(fs::canonicalize(&scratch_dir).await?)
626 } else {
627 None
628 };
629
630 {
631 let mut services = self.services.lock().expect("lock poisoned");
632 let process_states = services.entry(id.clone()).or_default();
633
634 let mut new_process_states = vec![];
636 for i in process_states.len()..scale.into() {
637 let mut ports = vec![];
639 let mut tcp_proxy_addrs = BTreeMap::new();
640 for port in &ports_in {
641 let tcp_proxy_listener = match &self.config.tcp_proxy {
642 None => None,
643 Some(tcp_proxy) => {
644 let listener = StdTcpListener::bind((tcp_proxy.listen_addr, 0))
645 .with_context(|| format!("binding to {}", tcp_proxy.listen_addr))?;
646 listener.set_nonblocking(true)?;
647 let listener = TcpListener::from_std(listener)?;
648 let local_addr = listener.local_addr()?;
649 tcp_proxy_addrs.insert(port.name.clone(), local_addr);
650 Some(AddressedTcpListener {
651 listener,
652 local_addr,
653 })
654 }
655 };
656 ports.push(ServiceProcessPort {
657 name: port.name.clone(),
658 tcp_proxy_listener,
659 });
660 }
661
662 let handle = mz_ore::task::spawn(
664 || format!("process-orchestrator:{full_id}-{i}"),
665 self.supervise_service_process(ServiceProcessConfig {
666 id: id.to_string(),
667 run_dir: run_dir.clone(),
668 scratch_dir: scratch_dir.clone(),
669 i,
670 image: image.clone(),
671 args: &args,
672 ports,
673 memory_limit,
674 cpu_limit,
675 disk,
676 launch_spec: self.config.launch_spec,
677 }),
678 );
679
680 new_process_states.push(ProcessState {
681 _handle: handle.abort_on_drop(),
682 status: ProcessStatus::NotReady,
683 status_time: Utc::now(),
684 labels: labels.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
685 tcp_proxy_addrs,
686 });
687 }
688
689 process_states.truncate(scale.into());
692 process_states.extend(new_process_states);
693 }
694
695 self.maybe_write_prometheus_service_discovery_file().await;
696
697 Ok(())
698 }
699
700 async fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
701 let full_id = self.config.full_id(id);
702 let run_dir = self.config.service_run_dir(id);
703 let scratch_dir = self.config.service_scratch_dir(id);
704
705 {
709 let mut supervisors = self.services.lock().expect("lock poisoned");
710 supervisors.remove(id);
711 }
712
713 if let Ok(mut entries) = fs::read_dir(&run_dir).await {
718 while let Some(entry) = entries.next_entry().await? {
719 let path = entry.path();
720 if path.extension() == Some(OsStr::new("pid")) {
721 let mut system = System::new();
722 let Some(process) = find_process_from_pid_file(&mut system, &path).await else {
723 continue;
724 };
725 let pid = process.pid();
726 info!("terminating orphaned process for {full_id} with PID {pid}");
727 process.kill();
728 }
729 }
730 }
731
732 if let Err(e) = remove_dir_all(run_dir).await {
734 if e.kind() != io::ErrorKind::NotFound {
735 warn!(
736 "error cleaning up run directory for {full_id}: {}",
737 e.display_with_causes()
738 );
739 }
740 }
741 if let Err(e) = remove_dir_all(scratch_dir).await {
742 if e.kind() != io::ErrorKind::NotFound {
743 warn!(
744 "error cleaning up scratch directory for {full_id}: {}",
745 e.display_with_causes()
746 );
747 }
748 }
749
750 self.maybe_write_prometheus_service_discovery_file().await;
751 Ok(())
752 }
753
754 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
755 let mut services = vec![];
756 let namespace_prefix = format!("{}-", self.config.namespace);
757 let mut entries = fs::read_dir(&self.config.metadata_dir).await?;
758 while let Some(entry) = entries.next_entry().await? {
759 let filename = entry
760 .file_name()
761 .into_string()
762 .map_err(|_| anyhow!("unable to convert filename to string"))?;
763 if let Some(id) = filename.strip_prefix(&namespace_prefix) {
764 services.push(id.to_string());
765 }
766 }
767 Ok(services)
768 }
769
770 fn supervise_service_process(
771 &self,
772 ServiceProcessConfig {
773 id,
774 run_dir,
775 scratch_dir,
776 i,
777 image,
778 args,
779 ports,
780 memory_limit,
781 cpu_limit,
782 disk,
783 launch_spec,
784 }: ServiceProcessConfig,
785 ) -> impl Future<Output = ()> + use<> {
786 let suppress_output = self.config.suppress_output;
787 let propagate_crashes = self.config.propagate_crashes;
788 let command_wrapper = self.config.command_wrapper.clone();
789 let image = self.config.image_dir.join(image);
790 let pid_file = run_dir.join(format!("{i}.pid"));
791 let full_id = self.config.full_id(&id);
792
793 let state_updater = ProcessStateUpdater {
794 namespace: self.config.namespace.clone(),
795 id,
796 i,
797 services: Arc::clone(&self.services),
798 service_event_tx: self.service_event_tx.clone(),
799 };
800
801 let listen_addrs = ports
802 .iter()
803 .map(|p| {
804 let addr = socket_path(&run_dir, &p.name, i);
805 (p.name.clone(), addr)
806 })
807 .collect();
808 let mut args = args(&listen_addrs);
809
810 if disk {
811 if let Some(scratch) = &scratch_dir {
812 args.push(format!("--scratch-directory={}", scratch.display()));
813 } else {
814 panic!(
815 "internal error: service requested disk but no scratch directory was configured"
816 );
817 }
818 }
819
820 async move {
821 let mut proxy_handles = vec![];
822 for port in ports {
823 if let Some(tcp_listener) = port.tcp_proxy_listener {
824 info!(
825 "{full_id}-{i}: {} tcp proxy listening on {}",
826 port.name, tcp_listener.local_addr,
827 );
828 let uds_path = &listen_addrs[&port.name];
829 let handle = mz_ore::task::spawn(
830 || format!("{full_id}-{i}-proxy-{}", port.name),
831 tcp_proxy(TcpProxyConfig {
832 name: format!("{full_id}-{i}-{}", port.name),
833 tcp_listener,
834 uds_path: uds_path.clone(),
835 }),
836 );
837 proxy_handles.push(handle.abort_on_drop());
838 }
839 }
840
841 supervise_existing_process(&state_updater, &pid_file).await;
842
843 loop {
844 let mut cmd = launch_spec.refine_command(
845 &image,
846 &args,
847 &command_wrapper,
848 &full_id,
849 &listen_addrs,
850 memory_limit.as_ref(),
851 cpu_limit.as_ref(),
852 );
853 info!(
854 "launching {full_id}-{i} via {} {}...",
855 cmd.as_std().get_program().to_string_lossy(),
856 cmd.as_std()
857 .get_args()
858 .map(|arg| arg.to_string_lossy())
859 .join(" ")
860 );
861 if suppress_output {
862 cmd.stdout(Stdio::null());
863 cmd.stderr(Stdio::null());
864 }
865 match spawn_process(&state_updater, cmd, &pid_file, !command_wrapper.is_empty())
866 .await
867 {
868 Ok(status) => {
869 if propagate_crashes && did_process_crash(status) {
870 panic!(
871 "{full_id}-{i} crashed; aborting because propagate_crashes is enabled"
872 );
873 }
874 error!("{full_id}-{i} exited: {:?}; relaunching in 5s", status);
875 }
876 Err(e) => {
877 error!("{full_id}-{i} failed to spawn: {}; relaunching in 5s", e);
878 }
879 };
880 state_updater.update_state(ProcessStatus::NotReady);
881 time::sleep(Duration::from_secs(5)).await;
882 }
883 }
884 }
885
886 async fn maybe_write_prometheus_service_discovery_file(&self) {
887 #[derive(Serialize)]
888 struct StaticConfig {
889 labels: BTreeMap<String, String>,
890 targets: Vec<String>,
891 }
892
893 let Some(tcp_proxy) = &self.config.tcp_proxy else {
894 return;
895 };
896 let Some(dir) = &tcp_proxy.prometheus_service_discovery_dir else {
897 return;
898 };
899
900 let mut static_configs = vec![];
901 {
902 let services = self.services.lock().expect("lock poisoned");
903 for (id, states) in &*services {
904 for (i, state) in states.iter().enumerate() {
905 for (name, addr) in &state.tcp_proxy_addrs {
906 let mut labels = btreemap! {
907 "mz_orchestrator_namespace".into() => self.config.namespace.clone(),
908 "mz_orchestrator_service_id".into() => id.clone(),
909 "mz_orchestrator_port".into() => name.clone(),
910 "mz_orchestrator_ordinal".into() => i.to_string(),
911 };
912 for (k, v) in &state.labels {
913 let k = format!("mz_orchestrator_{}", k.replace('-', "_"));
914 labels.insert(k, v.clone());
915 }
916 static_configs.push(StaticConfig {
917 labels,
918 targets: vec![addr.to_string()],
919 })
920 }
921 }
922 }
923 }
924
925 let path = dir.join(Path::new(&self.config.namespace).with_extension("json"));
926 let contents = serde_json::to_vec_pretty(&static_configs).expect("valid json");
927 if let Err(e) = fs::write(&path, &contents).await {
928 warn!(
929 "{}: failed to write prometheus service discovery file: {}",
930 self.config.namespace,
931 e.display_with_causes()
932 );
933 }
934 }
935}
936
937struct ServiceProcessConfig<'a> {
938 id: String,
939 run_dir: PathBuf,
940 scratch_dir: Option<PathBuf>,
941 i: usize,
942 image: String,
943 args: &'a (dyn Fn(&BTreeMap<String, String>) -> Vec<String> + Send + Sync),
944 ports: Vec<ServiceProcessPort>,
945 disk: bool,
946 memory_limit: Option<MemoryLimit>,
947 cpu_limit: Option<CpuLimit>,
948 launch_spec: LaunchSpec,
949}
950
951struct ServiceProcessPort {
952 name: String,
953 tcp_proxy_listener: Option<AddressedTcpListener>,
954}
955
956async fn supervise_existing_process(state_updater: &ProcessStateUpdater, pid_file: &Path) {
958 let name = format!(
959 "{}-{}-{}",
960 state_updater.namespace, state_updater.id, state_updater.i
961 );
962
963 let mut system = System::new();
964 let Some(process) = find_process_from_pid_file(&mut system, pid_file).await else {
965 return;
966 };
967 let pid = process.pid();
968
969 info!(%pid, "discovered existing process for {name}");
970 state_updater.update_state(ProcessStatus::Ready { pid });
971
972 let need_kill = AtomicBool::new(true);
974 defer! {
975 state_updater.update_state(ProcessStatus::NotReady);
976 if need_kill.load(Ordering::SeqCst) {
977 info!(%pid, "terminating existing process for {name}");
978 process.kill();
979 }
980 }
981
982 let mut system = System::new();
984 while system.refresh_process_specifics(pid, ProcessRefreshKind::new()) {
985 time::sleep(Duration::from_secs(5)).await;
986 }
987
988 warn!(%pid, "process for {name} has crashed; will reboot");
991 need_kill.store(false, Ordering::SeqCst)
992}
993
994fn interpolate_command(
995 command_part: &str,
996 full_id: &str,
997 ports: &BTreeMap<String, String>,
998) -> String {
999 let mut command_part = command_part.replace("%N", full_id);
1000 for (endpoint, port) in ports {
1001 command_part = command_part.replace(&format!("%P:{endpoint}"), port);
1002 }
1003 command_part
1004}
1005
1006async fn spawn_process(
1007 state_updater: &ProcessStateUpdater,
1008 mut cmd: Command,
1009 pid_file: &Path,
1010 send_sigterm: bool,
1011) -> Result<ExitStatus, anyhow::Error> {
1012 struct KillOnDropChild(Child, bool);
1013
1014 impl Drop for KillOnDropChild {
1015 fn drop(&mut self) {
1016 if let (Some(pid), true) = (self.0.id().and_then(|id| i32::try_from(id).ok()), self.1) {
1017 let _ = nix::sys::signal::kill(
1018 nix::unistd::Pid::from_raw(pid),
1019 nix::sys::signal::Signal::SIGTERM,
1020 );
1021 tokio::task::block_in_place(|| std::thread::sleep(Duration::from_millis(500)));
1023 }
1024 let _ = self.0.start_kill();
1025 }
1026 }
1027
1028 let mut child = KillOnDropChild(cmd.spawn()?, send_sigterm);
1029
1030 let pid = Pid::from_u32(child.0.id().unwrap());
1039 write_pid_file(pid_file, pid).await?;
1040 state_updater.update_state(ProcessStatus::Ready { pid });
1041 Ok(child.0.wait().await?)
1042}
1043
1044fn did_process_crash(status: ExitStatus) -> bool {
1045 matches!(
1049 status.signal(),
1050 Some(SIGABRT | SIGBUS | SIGSEGV | SIGTRAP | SIGILL)
1051 )
1052}
1053
1054async fn write_pid_file(pid_file: &Path, pid: Pid) -> Result<(), anyhow::Error> {
1055 let mut system = System::new();
1056 system.refresh_process_specifics(pid, ProcessRefreshKind::new());
1057 let start_time = system.process(pid).map_or(0, |p| p.start_time());
1058 fs::write(pid_file, format!("{pid}\n{start_time}\n")).await?;
1059 Ok(())
1060}
1061
1062async fn find_process_from_pid_file<'a>(
1063 system: &'a mut System,
1064 pid_file: &Path,
1065) -> Option<&'a Process> {
1066 let Ok(contents) = fs::read_to_string(pid_file).await else {
1067 return None;
1068 };
1069 let lines = contents.trim().split('\n').collect::<Vec<_>>();
1070 let [pid, start_time] = lines.as_slice() else {
1071 return None;
1072 };
1073 let Ok(pid) = Pid::from_str(pid) else {
1074 return None;
1075 };
1076 let Ok(start_time) = u64::from_str(start_time) else {
1077 return None;
1078 };
1079 system.refresh_process_specifics(pid, ProcessRefreshKind::new());
1080 let process = system.process(pid)?;
1081 if process.start_time() != start_time {
1084 return None;
1085 }
1086 Some(process)
1087}
1088
1089struct TcpProxyConfig {
1090 name: String,
1091 tcp_listener: AddressedTcpListener,
1092 uds_path: String,
1093}
1094
1095async fn tcp_proxy(
1096 TcpProxyConfig {
1097 name,
1098 tcp_listener,
1099 uds_path,
1100 }: TcpProxyConfig,
1101) {
1102 let mut conns = FuturesUnordered::new();
1103 loop {
1104 select! {
1105 res = tcp_listener.listener.accept() => {
1106 debug!("{name}: accepting tcp proxy connection");
1107 let uds_path = uds_path.clone();
1108 conns.push(Box::pin(async move {
1109 let (mut tcp_conn, _) = res.context("accepting tcp connection")?;
1110 let mut uds_conn = UnixStream::connect(uds_path)
1111 .await
1112 .context("making uds connection")?;
1113 io::copy_bidirectional(&mut tcp_conn, &mut uds_conn)
1114 .await
1115 .context("proxying")
1116 }));
1117 }
1118 Some(Err(e)) = conns.next() => {
1119 warn!("{name}: tcp proxy connection failed: {}", e.display_with_causes());
1120 }
1121 }
1122 }
1123}
1124
1125struct ProcessStateUpdater {
1126 namespace: String,
1127 id: String,
1128 i: usize,
1129 services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
1130 service_event_tx: broadcast::Sender<ServiceEvent>,
1131}
1132
1133impl ProcessStateUpdater {
1134 fn update_state(&self, status: ProcessStatus) {
1135 let mut services = self.services.lock().expect("lock poisoned");
1136 let Some(process_states) = services.get_mut(&self.id) else {
1137 return;
1138 };
1139 let Some(process_state) = process_states.get_mut(self.i) else {
1140 return;
1141 };
1142 let status_time = Utc::now();
1143 process_state.status = status;
1144 process_state.status_time = status_time;
1145 let _ = self.service_event_tx.send(ServiceEvent {
1146 service_id: self.id.to_string(),
1147 process_id: u64::cast_from(self.i),
1148 status: status.into(),
1149 time: status_time,
1150 });
1151 }
1152}
1153
1154#[derive(Debug)]
1155struct ProcessState {
1156 _handle: AbortOnDropHandle<()>,
1157 status: ProcessStatus,
1158 status_time: DateTime<Utc>,
1159 labels: BTreeMap<String, String>,
1160 tcp_proxy_addrs: BTreeMap<String, SocketAddr>,
1161}
1162
1163impl ProcessState {
1164 fn pid(&self) -> Option<Pid> {
1165 match &self.status {
1166 ProcessStatus::NotReady => None,
1167 ProcessStatus::Ready { pid } => Some(*pid),
1168 }
1169 }
1170}
1171
1172#[derive(Debug, Clone, Copy)]
1173enum ProcessStatus {
1174 NotReady,
1175 Ready { pid: Pid },
1176}
1177
1178impl From<ProcessStatus> for ServiceStatus {
1179 fn from(status: ProcessStatus) -> ServiceStatus {
1180 match status {
1181 ProcessStatus::NotReady => ServiceStatus::Offline(None),
1182 ProcessStatus::Ready { .. } => ServiceStatus::Online,
1183 }
1184 }
1185}
1186
1187fn socket_path(run_dir: &Path, port: &str, process: usize) -> String {
1188 let desired = run_dir
1189 .join(format!("{port}-{process}"))
1190 .to_string_lossy()
1191 .into_owned();
1192 if UnixSocketAddr::from_pathname(&desired).is_err() {
1193 env::temp_dir()
1196 .join(hex::encode(Sha1::digest(desired)))
1197 .display()
1198 .to_string()
1199 } else {
1200 desired
1201 }
1202}
1203
1204struct AddressedTcpListener {
1205 listener: TcpListener,
1206 local_addr: SocketAddr,
1207}
1208
1209#[derive(Debug, Clone)]
1210struct ProcessService {
1211 run_dir: PathBuf,
1212 scale: u16,
1213}
1214
1215impl Service for ProcessService {
1216 fn addresses(&self, port: &str) -> Vec<String> {
1217 (0..self.scale)
1218 .map(|i| socket_path(&self.run_dir, port, i.into()))
1219 .collect()
1220 }
1221}