1use std::collections::BTreeMap;
11use std::env;
12use std::ffi::OsStr;
13use std::fmt::Debug;
14use std::fs::Permissions;
15use std::future::Future;
16use std::net::{IpAddr, SocketAddr, TcpListener as StdTcpListener};
17use std::num::NonZero;
18use std::os::unix::fs::PermissionsExt;
19use std::os::unix::process::ExitStatusExt;
20use std::path::{Path, PathBuf};
21use std::process::{ExitStatus, Stdio};
22use std::str::FromStr;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::sync::{Arc, Mutex};
25
26use anyhow::{Context, anyhow, bail};
27use async_stream::stream;
28use async_trait::async_trait;
29use chrono::{DateTime, Utc};
30use futures::StreamExt;
31use futures::stream::{BoxStream, FuturesUnordered};
32use itertools::Itertools;
33use libc::{SIGABRT, SIGBUS, SIGILL, SIGSEGV, SIGTRAP};
34use maplit::btreemap;
35use mz_orchestrator::scheduling_config::ServiceSchedulingConfig;
36use mz_orchestrator::{
37 CpuLimit, DiskLimit, MemoryLimit, NamespacedOrchestrator, Orchestrator, Service,
38 ServiceAssignments, ServiceConfig, ServiceEvent, ServicePort, ServiceProcessMetrics,
39 ServiceStatus,
40};
41use mz_ore::cast::{CastFrom, TryCastFrom};
42use mz_ore::error::ErrorExt;
43use mz_ore::netio::UnixSocketAddr;
44use mz_ore::result::ResultExt;
45use mz_ore::task::AbortOnDropHandle;
46use scopeguard::defer;
47use serde::Serialize;
48use sha1::{Digest, Sha1};
49use sysinfo::{Pid, PidExt, Process, ProcessExt, ProcessRefreshKind, System, SystemExt};
50use tokio::fs::remove_dir_all;
51use tokio::net::{TcpListener, UnixStream};
52use tokio::process::{Child, Command};
53use tokio::sync::{broadcast, mpsc, oneshot};
54use tokio::time::{self, Duration};
55use tokio::{fs, io, select};
56use tracing::{debug, error, info, warn};
57
58pub mod secrets;
59
60#[derive(Debug, Clone)]
62pub struct ProcessOrchestratorConfig {
63 pub image_dir: PathBuf,
66 pub suppress_output: bool,
68 pub environment_id: String,
70 pub secrets_dir: PathBuf,
72 pub command_wrapper: Vec<String>,
74 pub propagate_crashes: bool,
76 pub tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
87 pub scratch_directory: PathBuf,
89}
90
91#[derive(Debug, Clone)]
95pub struct ProcessOrchestratorTcpProxyConfig {
96 pub listen_addr: IpAddr,
98 pub prometheus_service_discovery_dir: Option<PathBuf>,
108}
109
110#[derive(Debug)]
116pub struct ProcessOrchestrator {
117 image_dir: PathBuf,
118 suppress_output: bool,
119 namespaces: Mutex<BTreeMap<String, Arc<dyn NamespacedOrchestrator>>>,
120 metadata_dir: PathBuf,
121 secrets_dir: PathBuf,
122 command_wrapper: Vec<String>,
123 propagate_crashes: bool,
124 tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
125 scratch_directory: PathBuf,
126 launch_spec: LaunchSpec,
127}
128
129#[derive(Debug, Clone, Copy)]
130enum LaunchSpec {
131 Direct,
133 Systemd,
135}
136
137impl LaunchSpec {
138 fn determine_implementation() -> Result<Self, anyhow::Error> {
139 match Path::new("/run/systemd/system/").try_exists()? {
143 true => Ok(Self::Systemd),
144 false => Ok(Self::Direct),
145 }
146 }
147
148 fn refine_command(
149 &self,
150 image: impl AsRef<OsStr>,
151 args: &[impl AsRef<OsStr>],
152 wrapper: &[String],
153 memory_limit: Option<&MemoryLimit>,
154 cpu_limit: Option<&CpuLimit>,
155 ) -> Command {
156 let mut cmd = match self {
157 Self::Direct => {
158 if let Some((program, wrapper_args)) = wrapper.split_first() {
159 let mut cmd = Command::new(program);
160 cmd.args(wrapper_args);
161 cmd.arg(image);
162 cmd
163 } else {
164 Command::new(image)
165 }
166 }
167 Self::Systemd => {
168 let mut cmd = Command::new("systemd-run");
169 cmd.args(["--user", "--scope", "--quiet"]);
170 if let Some(memory_limit) = memory_limit {
171 let memory_limit = memory_limit.0.as_u64();
172 cmd.args(["-p", &format!("MemoryMax={memory_limit}")]);
173 }
175 if let Some(cpu_limit) = cpu_limit {
176 let cpu_limit = (cpu_limit.as_millicpus() + 9) / 10;
177 cmd.args(["-p", &format!("CPUQuota={cpu_limit}%")]);
178 }
179
180 cmd.args(wrapper);
181 cmd.arg(image);
182 cmd
183 }
184 };
185 cmd.args(args);
186 cmd
187 }
188}
189
190impl ProcessOrchestrator {
191 pub async fn new(
193 ProcessOrchestratorConfig {
194 image_dir,
195 suppress_output,
196 environment_id,
197 secrets_dir,
198 command_wrapper,
199 propagate_crashes,
200 tcp_proxy,
201 scratch_directory,
202 }: ProcessOrchestratorConfig,
203 ) -> Result<ProcessOrchestrator, anyhow::Error> {
204 let metadata_dir = env::temp_dir().join(format!("environmentd-{environment_id}"));
205 fs::create_dir_all(&metadata_dir)
206 .await
207 .context("creating metadata directory")?;
208 fs::create_dir_all(&secrets_dir)
209 .await
210 .context("creating secrets directory")?;
211 fs::set_permissions(&secrets_dir, Permissions::from_mode(0o700))
212 .await
213 .context("setting secrets directory permissions")?;
214 if let Some(prometheus_dir) = tcp_proxy
215 .as_ref()
216 .and_then(|p| p.prometheus_service_discovery_dir.as_ref())
217 {
218 fs::create_dir_all(&prometheus_dir)
219 .await
220 .context("creating prometheus directory")?;
221 }
222
223 let launch_spec = LaunchSpec::determine_implementation()?;
224 info!(driver = ?launch_spec, "Process orchestrator launch spec");
225
226 Ok(ProcessOrchestrator {
227 image_dir: fs::canonicalize(image_dir).await?,
228 suppress_output,
229 namespaces: Mutex::new(BTreeMap::new()),
230 metadata_dir: fs::canonicalize(metadata_dir).await?,
231 secrets_dir: fs::canonicalize(secrets_dir).await?,
232 command_wrapper,
233 propagate_crashes,
234 tcp_proxy,
235 scratch_directory,
236 launch_spec,
237 })
238 }
239}
240
241impl Orchestrator for ProcessOrchestrator {
242 fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
243 let mut namespaces = self.namespaces.lock().expect("lock poisoned");
244 Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| {
245 let config = Arc::new(NamespacedProcessOrchestratorConfig {
246 namespace: namespace.into(),
247 image_dir: self.image_dir.clone(),
248 suppress_output: self.suppress_output,
249 metadata_dir: self.metadata_dir.clone(),
250 command_wrapper: self.command_wrapper.clone(),
251 propagate_crashes: self.propagate_crashes,
252 tcp_proxy: self.tcp_proxy.clone(),
253 scratch_directory: self.scratch_directory.clone(),
254 launch_spec: self.launch_spec,
255 });
256
257 let services = Arc::new(Mutex::new(BTreeMap::new()));
258 let (service_event_tx, service_event_rx) = broadcast::channel(16384);
259 let (command_tx, command_rx) = mpsc::unbounded_channel();
260
261 let worker = OrchestratorWorker {
262 config: Arc::clone(&config),
263 services: Arc::clone(&services),
264 service_event_tx,
265 system: System::new(),
266 command_rx,
267 }
268 .spawn();
269
270 Arc::new(NamespacedProcessOrchestrator {
271 config,
272 services,
273 service_event_rx,
274 command_tx,
275 scheduling_config: Default::default(),
276 _worker: worker,
277 })
278 }))
279 }
280}
281
282#[derive(Debug)]
284struct NamespacedProcessOrchestratorConfig {
285 namespace: String,
286 image_dir: PathBuf,
287 suppress_output: bool,
288 metadata_dir: PathBuf,
289 command_wrapper: Vec<String>,
290 propagate_crashes: bool,
291 tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
292 scratch_directory: PathBuf,
293 launch_spec: LaunchSpec,
294}
295
296impl NamespacedProcessOrchestratorConfig {
297 fn full_id(&self, id: &str) -> String {
298 format!("{}-{}", self.namespace, id)
299 }
300
301 fn service_run_dir(&self, id: &str) -> PathBuf {
302 self.metadata_dir.join(&self.full_id(id))
303 }
304
305 fn service_scratch_dir(&self, id: &str) -> PathBuf {
306 self.scratch_directory.join(&self.full_id(id))
307 }
308}
309
310#[derive(Debug)]
311struct NamespacedProcessOrchestrator {
312 config: Arc<NamespacedProcessOrchestratorConfig>,
313 services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
314 service_event_rx: broadcast::Receiver<ServiceEvent>,
315 command_tx: mpsc::UnboundedSender<WorkerCommand>,
316 scheduling_config: std::sync::RwLock<ServiceSchedulingConfig>,
317 _worker: AbortOnDropHandle<()>,
318}
319
320impl NamespacedProcessOrchestrator {
321 fn send_command(&self, cmd: WorkerCommand) {
322 self.command_tx.send(cmd).expect("worker task not dropped");
323 }
324}
325
326#[async_trait]
327impl NamespacedOrchestrator for NamespacedProcessOrchestrator {
328 fn ensure_service(
329 &self,
330 id: &str,
331 config: ServiceConfig,
332 ) -> Result<Box<dyn Service>, anyhow::Error> {
333 let service = ProcessService {
334 run_dir: self.config.service_run_dir(id),
335 scale: config.scale,
336 };
337
338 let disk = config.disk_limit != Some(DiskLimit::ZERO);
340
341 let config = EnsureServiceConfig {
342 image: config.image,
343 args: config.args,
344 ports: config.ports,
345 memory_limit: config.memory_limit,
346 cpu_limit: config.cpu_limit,
347 scale: config.scale,
348 labels: config.labels,
349 disk,
350 };
351
352 self.send_command(WorkerCommand::EnsureService {
353 id: id.to_string(),
354 config,
355 });
356
357 Ok(Box::new(service))
358 }
359
360 fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
361 self.send_command(WorkerCommand::DropService { id: id.to_string() });
362 Ok(())
363 }
364
365 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
366 let (result_tx, result_rx) = oneshot::channel();
367 self.send_command(WorkerCommand::ListServices { result_tx });
368
369 result_rx.await.expect("worker task not dropped")
370 }
371
372 fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
373 let mut initial_events = vec![];
374 let mut service_event_rx = {
375 let services = self.services.lock().expect("lock poisoned");
376 for (service_id, process_states) in &*services {
377 for (process_id, process_state) in process_states.iter().enumerate() {
378 initial_events.push(ServiceEvent {
379 service_id: service_id.clone(),
380 process_id: u64::cast_from(process_id),
381 status: process_state.status.into(),
382 time: process_state.status_time,
383 });
384 }
385 }
386 self.service_event_rx.resubscribe()
387 };
388 Box::pin(stream! {
389 for event in initial_events {
390 yield Ok(event);
391 }
392 loop {
393 yield service_event_rx.recv().await.err_into();
394 }
395 })
396 }
397
398 async fn fetch_service_metrics(
399 &self,
400 id: &str,
401 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
402 let (result_tx, result_rx) = oneshot::channel();
403 self.send_command(WorkerCommand::FetchServiceMetrics {
404 id: id.to_string(),
405 result_tx,
406 });
407
408 result_rx.await.expect("worker task not dropped")
409 }
410
411 fn update_scheduling_config(
412 &self,
413 config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
414 ) {
415 *self.scheduling_config.write().expect("poisoned") = config;
416 }
417}
418
419enum WorkerCommand {
425 EnsureService {
426 id: String,
427 config: EnsureServiceConfig,
428 },
429 DropService {
430 id: String,
431 },
432 ListServices {
433 result_tx: oneshot::Sender<Result<Vec<String>, anyhow::Error>>,
434 },
435 FetchServiceMetrics {
436 id: String,
437 result_tx: oneshot::Sender<Result<Vec<ServiceProcessMetrics>, anyhow::Error>>,
438 },
439}
440
441struct EnsureServiceConfig {
443 pub image: String,
447 pub args: Box<dyn Fn(ServiceAssignments) -> Vec<String> + Send + Sync>,
450 pub ports: Vec<ServicePort>,
452 pub memory_limit: Option<MemoryLimit>,
454 pub cpu_limit: Option<CpuLimit>,
456 pub scale: NonZero<u16>,
458 pub labels: BTreeMap<String, String>,
463 pub disk: bool,
465}
466
467struct OrchestratorWorker {
479 config: Arc<NamespacedProcessOrchestratorConfig>,
480 services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
481 service_event_tx: broadcast::Sender<ServiceEvent>,
482 system: System,
483 command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
484}
485
486impl OrchestratorWorker {
487 fn spawn(self) -> AbortOnDropHandle<()> {
488 let name = format!("process-orchestrator:{}", self.config.namespace);
489 mz_ore::task::spawn(|| name, self.run()).abort_on_drop()
490 }
491
492 async fn run(mut self) {
493 while let Some(cmd) = self.command_rx.recv().await {
494 use WorkerCommand::*;
495 let result = match cmd {
496 EnsureService { id, config } => self.ensure_service(id, config).await,
497 DropService { id } => self.drop_service(&id).await,
498 ListServices { result_tx } => {
499 let _ = result_tx.send(self.list_services().await);
500 Ok(())
501 }
502 FetchServiceMetrics { id, result_tx } => {
503 let _ = result_tx.send(self.fetch_service_metrics(&id));
504 Ok(())
505 }
506 };
507
508 if let Err(error) = result {
509 panic!("process orchestrator worker failed: {error}");
510 }
511 }
512 }
513
514 fn fetch_service_metrics(
515 &mut self,
516 id: &str,
517 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
518 let pids: Vec<_> = {
519 let services = self.services.lock().expect("lock poisoned");
520 let Some(service) = services.get(id) else {
521 bail!("unknown service {id}")
522 };
523 service.iter().map(|p| p.pid()).collect()
524 };
525
526 let mut metrics = vec![];
527 for pid in pids {
528 let (cpu_nano_cores, memory_bytes) = match pid {
529 None => (None, None),
530 Some(pid) => {
531 self.system
532 .refresh_process_specifics(pid, ProcessRefreshKind::new().with_cpu());
533 match self.system.process(pid) {
534 None => (None, None),
535 Some(process) => {
536 let cpu = u64::try_cast_from(
545 (f64::from(process.cpu_usage()) * 10_000_000.0).trunc(),
546 )
547 .expect("sane value of process.cpu_usage()");
548 let memory = process.memory();
549 (Some(cpu), Some(memory))
550 }
551 }
552 }
553 };
554 metrics.push(ServiceProcessMetrics {
555 cpu_nano_cores,
556 memory_bytes,
557 disk_bytes: None,
559 heap_bytes: None,
560 heap_limit: None,
561 });
562 }
563 Ok(metrics)
564 }
565
566 async fn ensure_service(
567 &self,
568 id: String,
569 EnsureServiceConfig {
570 image,
571 args,
572 ports: ports_in,
573 memory_limit,
574 cpu_limit,
575 scale,
576 labels,
577 disk,
578 }: EnsureServiceConfig,
579 ) -> Result<(), anyhow::Error> {
580 let full_id = self.config.full_id(&id);
581
582 let run_dir = self.config.service_run_dir(&id);
583 fs::create_dir_all(&run_dir)
584 .await
585 .context("creating run directory")?;
586 let scratch_dir = if disk {
587 let scratch_dir = self.config.service_scratch_dir(&id);
588 fs::create_dir_all(&scratch_dir)
589 .await
590 .context("creating scratch directory")?;
591 Some(fs::canonicalize(&scratch_dir).await?)
592 } else {
593 None
594 };
595
596 let old_scale = {
599 let services = self.services.lock().expect("poisoned");
600 services.get(&id).map(|states| states.len())
601 };
602 match old_scale {
603 Some(old) if old == usize::cast_from(scale) => return Ok(()),
604 Some(_) => self.drop_service(&id).await?,
605 None => (),
606 }
607
608 let mut peer_addrs = Vec::new();
610 for i in 0..scale.into() {
611 let addresses = ports_in
612 .iter()
613 .map(|port| {
614 let addr = socket_path(&run_dir, &port.name, i);
615 (port.name.clone(), addr)
616 })
617 .collect();
618 peer_addrs.push(addresses);
619 }
620
621 {
622 let mut services = self.services.lock().expect("lock poisoned");
623
624 let mut process_states = vec![];
626 for i in 0..usize::cast_from(scale) {
627 let listen_addrs = &peer_addrs[i];
628
629 let mut command_wrapper = self.config.command_wrapper.clone();
631 if let Some(parts) = command_wrapper.get_mut(1..) {
632 for part in parts {
633 *part = interpolate_command(&part[..], &full_id, listen_addrs);
634 }
635 }
636
637 let mut ports = vec![];
639 let mut tcp_proxy_addrs = BTreeMap::new();
640 for port in &ports_in {
641 let tcp_proxy_listener = match &self.config.tcp_proxy {
642 None => None,
643 Some(tcp_proxy) => {
644 let listener = StdTcpListener::bind((tcp_proxy.listen_addr, 0))
645 .with_context(|| format!("binding to {}", tcp_proxy.listen_addr))?;
646 listener.set_nonblocking(true)?;
647 let listener = TcpListener::from_std(listener)?;
648 let local_addr = listener.local_addr()?;
649 tcp_proxy_addrs.insert(port.name.clone(), local_addr);
650 Some(AddressedTcpListener {
651 listener,
652 local_addr,
653 })
654 }
655 };
656 ports.push(ServiceProcessPort {
657 name: port.name.clone(),
658 listen_addr: listen_addrs[&port.name].clone(),
659 tcp_proxy_listener,
660 });
661 }
662
663 let mut args = args(ServiceAssignments {
664 listen_addrs,
665 peer_addrs: &peer_addrs,
666 });
667 args.push(format!("--process={i}"));
668 if disk {
669 if let Some(scratch) = &scratch_dir {
670 args.push(format!("--scratch-directory={}", scratch.display()));
671 } else {
672 panic!(
673 "internal error: service requested disk but no scratch directory was configured"
674 );
675 }
676 }
677
678 let handle = mz_ore::task::spawn(
680 || format!("process-orchestrator:{full_id}-{i}"),
681 self.supervise_service_process(ServiceProcessConfig {
682 id: id.to_string(),
683 run_dir: run_dir.clone(),
684 i,
685 image: image.clone(),
686 args,
687 command_wrapper,
688 ports,
689 memory_limit,
690 cpu_limit,
691 launch_spec: self.config.launch_spec,
692 }),
693 );
694
695 process_states.push(ProcessState {
696 _handle: handle.abort_on_drop(),
697 status: ProcessStatus::NotReady,
698 status_time: Utc::now(),
699 labels: labels.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
700 tcp_proxy_addrs,
701 });
702 }
703
704 services.insert(id, process_states);
707 }
708
709 self.maybe_write_prometheus_service_discovery_file().await;
710
711 Ok(())
712 }
713
714 async fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
715 let full_id = self.config.full_id(id);
716 let run_dir = self.config.service_run_dir(id);
717 let scratch_dir = self.config.service_scratch_dir(id);
718
719 {
723 let mut supervisors = self.services.lock().expect("lock poisoned");
724 supervisors.remove(id);
725 }
726
727 if let Ok(mut entries) = fs::read_dir(&run_dir).await {
732 while let Some(entry) = entries.next_entry().await? {
733 let path = entry.path();
734 if path.extension() == Some(OsStr::new("pid")) {
735 let mut system = System::new();
736 let Some(process) = find_process_from_pid_file(&mut system, &path).await else {
737 continue;
738 };
739 let pid = process.pid();
740 info!("terminating orphaned process for {full_id} with PID {pid}");
741 process.kill();
742 }
743 }
744 }
745
746 if let Err(e) = remove_dir_all(run_dir).await {
748 if e.kind() != io::ErrorKind::NotFound {
749 warn!(
750 "error cleaning up run directory for {full_id}: {}",
751 e.display_with_causes()
752 );
753 }
754 }
755 if let Err(e) = remove_dir_all(scratch_dir).await {
756 if e.kind() != io::ErrorKind::NotFound {
757 warn!(
758 "error cleaning up scratch directory for {full_id}: {}",
759 e.display_with_causes()
760 );
761 }
762 }
763
764 self.maybe_write_prometheus_service_discovery_file().await;
765 Ok(())
766 }
767
768 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
769 let mut services = vec![];
770 let namespace_prefix = format!("{}-", self.config.namespace);
771 let mut entries = fs::read_dir(&self.config.metadata_dir).await?;
772 while let Some(entry) = entries.next_entry().await? {
773 let filename = entry
774 .file_name()
775 .into_string()
776 .map_err(|_| anyhow!("unable to convert filename to string"))?;
777 if let Some(id) = filename.strip_prefix(&namespace_prefix) {
778 services.push(id.to_string());
779 }
780 }
781 Ok(services)
782 }
783
784 fn supervise_service_process(
785 &self,
786 ServiceProcessConfig {
787 id,
788 run_dir,
789 i,
790 image,
791 args,
792 command_wrapper,
793 ports,
794 memory_limit,
795 cpu_limit,
796 launch_spec,
797 }: ServiceProcessConfig,
798 ) -> impl Future<Output = ()> + use<> {
799 let suppress_output = self.config.suppress_output;
800 let propagate_crashes = self.config.propagate_crashes;
801 let image = self.config.image_dir.join(image);
802 let pid_file = run_dir.join(format!("{i}.pid"));
803 let full_id = self.config.full_id(&id);
804
805 let state_updater = ProcessStateUpdater {
806 namespace: self.config.namespace.clone(),
807 id,
808 i,
809 services: Arc::clone(&self.services),
810 service_event_tx: self.service_event_tx.clone(),
811 };
812
813 async move {
814 #[allow(clippy::collection_is_never_read)]
816 let mut proxy_handles = vec![];
817 for port in ports {
818 if let Some(tcp_listener) = port.tcp_proxy_listener {
819 info!(
820 "{full_id}-{i}: {} tcp proxy listening on {}",
821 port.name, tcp_listener.local_addr,
822 );
823 let uds_path = port.listen_addr;
824 let handle = mz_ore::task::spawn(
825 || format!("{full_id}-{i}-proxy-{}", port.name),
826 tcp_proxy(TcpProxyConfig {
827 name: format!("{full_id}-{i}-{}", port.name),
828 tcp_listener,
829 uds_path: uds_path.clone(),
830 }),
831 );
832 proxy_handles.push(handle.abort_on_drop());
833 }
834 }
835
836 supervise_existing_process(&state_updater, &pid_file).await;
837
838 loop {
839 let mut cmd = launch_spec.refine_command(
840 &image,
841 &args,
842 &command_wrapper,
843 memory_limit.as_ref(),
844 cpu_limit.as_ref(),
845 );
846 info!(
847 "launching {full_id}-{i} via {} {}...",
848 cmd.as_std().get_program().to_string_lossy(),
849 cmd.as_std()
850 .get_args()
851 .map(|arg| arg.to_string_lossy())
852 .join(" ")
853 );
854 if suppress_output {
855 cmd.stdout(Stdio::null());
856 cmd.stderr(Stdio::null());
857 }
858 match spawn_process(&state_updater, cmd, &pid_file, !command_wrapper.is_empty())
859 .await
860 {
861 Ok(status) => {
862 assert!(
863 !(propagate_crashes && did_process_crash(status)),
864 "{full_id}-{i} crashed; aborting because propagate_crashes is enabled"
865 );
866 error!("{full_id}-{i} exited: {:?}; relaunching in 5s", status);
867 }
868 Err(e) => {
869 error!("{full_id}-{i} failed to spawn: {}; relaunching in 5s", e);
870 }
871 };
872 state_updater.update_state(ProcessStatus::NotReady);
873 time::sleep(Duration::from_secs(5)).await;
874 }
875 }
876 }
877
878 async fn maybe_write_prometheus_service_discovery_file(&self) {
879 #[derive(Serialize)]
880 struct StaticConfig {
881 labels: BTreeMap<String, String>,
882 targets: Vec<String>,
883 }
884
885 let Some(tcp_proxy) = &self.config.tcp_proxy else {
886 return;
887 };
888 let Some(dir) = &tcp_proxy.prometheus_service_discovery_dir else {
889 return;
890 };
891
892 let mut static_configs = vec![];
893 {
894 let services = self.services.lock().expect("lock poisoned");
895 for (id, states) in &*services {
896 for (i, state) in states.iter().enumerate() {
897 for (name, addr) in &state.tcp_proxy_addrs {
898 let mut labels = btreemap! {
899 "mz_orchestrator_namespace".into() => self.config.namespace.clone(),
900 "mz_orchestrator_service_id".into() => id.clone(),
901 "mz_orchestrator_port".into() => name.clone(),
902 "mz_orchestrator_ordinal".into() => i.to_string(),
903 };
904 for (k, v) in &state.labels {
905 let k = format!("mz_orchestrator_{}", k.replace('-', "_"));
906 labels.insert(k, v.clone());
907 }
908 static_configs.push(StaticConfig {
909 labels,
910 targets: vec![addr.to_string()],
911 })
912 }
913 }
914 }
915 }
916
917 let path = dir.join(Path::new(&self.config.namespace).with_extension("json"));
918 let contents = serde_json::to_vec_pretty(&static_configs).expect("valid json");
919 if let Err(e) = fs::write(&path, &contents).await {
920 warn!(
921 "{}: failed to write prometheus service discovery file: {}",
922 self.config.namespace,
923 e.display_with_causes()
924 );
925 }
926 }
927}
928
929struct ServiceProcessConfig {
930 id: String,
931 run_dir: PathBuf,
932 i: usize,
933 image: String,
934 args: Vec<String>,
935 command_wrapper: Vec<String>,
936 ports: Vec<ServiceProcessPort>,
937 memory_limit: Option<MemoryLimit>,
938 cpu_limit: Option<CpuLimit>,
939 launch_spec: LaunchSpec,
940}
941
942struct ServiceProcessPort {
943 name: String,
944 listen_addr: String,
945 tcp_proxy_listener: Option<AddressedTcpListener>,
946}
947
948async fn supervise_existing_process(state_updater: &ProcessStateUpdater, pid_file: &Path) {
950 let name = format!(
951 "{}-{}-{}",
952 state_updater.namespace, state_updater.id, state_updater.i
953 );
954
955 let mut system = System::new();
956 let Some(process) = find_process_from_pid_file(&mut system, pid_file).await else {
957 return;
958 };
959 let pid = process.pid();
960
961 info!(%pid, "discovered existing process for {name}");
962 state_updater.update_state(ProcessStatus::Ready { pid });
963
964 let need_kill = AtomicBool::new(true);
966 defer! {
967 state_updater.update_state(ProcessStatus::NotReady);
968 if need_kill.load(Ordering::SeqCst) {
969 info!(%pid, "terminating existing process for {name}");
970 process.kill();
971 }
972 }
973
974 let mut system = System::new();
976 while system.refresh_process_specifics(pid, ProcessRefreshKind::new()) {
977 time::sleep(Duration::from_secs(5)).await;
978 }
979
980 warn!(%pid, "process for {name} has crashed; will reboot");
983 need_kill.store(false, Ordering::SeqCst)
984}
985
986fn interpolate_command(
987 command_part: &str,
988 full_id: &str,
989 ports: &BTreeMap<String, String>,
990) -> String {
991 let mut command_part = command_part.replace("%N", full_id);
992 for (endpoint, port) in ports {
993 command_part = command_part.replace(&format!("%P:{endpoint}"), port);
994 }
995 command_part
996}
997
998async fn spawn_process(
999 state_updater: &ProcessStateUpdater,
1000 mut cmd: Command,
1001 pid_file: &Path,
1002 send_sigterm: bool,
1003) -> Result<ExitStatus, anyhow::Error> {
1004 struct KillOnDropChild(Child, bool);
1005
1006 impl Drop for KillOnDropChild {
1007 fn drop(&mut self) {
1008 if let (Some(pid), true) = (self.0.id().and_then(|id| i32::try_from(id).ok()), self.1) {
1009 let _ = nix::sys::signal::kill(
1010 nix::unistd::Pid::from_raw(pid),
1011 nix::sys::signal::Signal::SIGTERM,
1012 );
1013 tokio::task::block_in_place(|| std::thread::sleep(Duration::from_millis(500)));
1015 }
1016 let _ = self.0.start_kill();
1017 }
1018 }
1019
1020 let mut child = KillOnDropChild(cmd.spawn()?, send_sigterm);
1021
1022 let pid = Pid::from_u32(child.0.id().unwrap());
1031 write_pid_file(pid_file, pid).await?;
1032 state_updater.update_state(ProcessStatus::Ready { pid });
1033 Ok(child.0.wait().await?)
1034}
1035
1036fn did_process_crash(status: ExitStatus) -> bool {
1037 matches!(
1041 status.signal(),
1042 Some(SIGABRT | SIGBUS | SIGSEGV | SIGTRAP | SIGILL)
1043 )
1044}
1045
1046async fn write_pid_file(pid_file: &Path, pid: Pid) -> Result<(), anyhow::Error> {
1047 let mut system = System::new();
1048 system.refresh_process_specifics(pid, ProcessRefreshKind::new());
1049 let start_time = system.process(pid).map_or(0, |p| p.start_time());
1050 fs::write(pid_file, format!("{pid}\n{start_time}\n")).await?;
1051 Ok(())
1052}
1053
1054async fn find_process_from_pid_file<'a>(
1055 system: &'a mut System,
1056 pid_file: &Path,
1057) -> Option<&'a Process> {
1058 let Ok(contents) = fs::read_to_string(pid_file).await else {
1059 return None;
1060 };
1061 let lines = contents.trim().split('\n').collect::<Vec<_>>();
1062 let [pid, start_time] = lines.as_slice() else {
1063 return None;
1064 };
1065 let Ok(pid) = Pid::from_str(pid) else {
1066 return None;
1067 };
1068 let Ok(start_time) = u64::from_str(start_time) else {
1069 return None;
1070 };
1071 system.refresh_process_specifics(pid, ProcessRefreshKind::new());
1072 let process = system.process(pid)?;
1073 if process.start_time() != start_time {
1076 return None;
1077 }
1078 Some(process)
1079}
1080
1081struct TcpProxyConfig {
1082 name: String,
1083 tcp_listener: AddressedTcpListener,
1084 uds_path: String,
1085}
1086
1087async fn tcp_proxy(
1088 TcpProxyConfig {
1089 name,
1090 tcp_listener,
1091 uds_path,
1092 }: TcpProxyConfig,
1093) {
1094 let mut conns = FuturesUnordered::new();
1095 loop {
1096 select! {
1097 res = tcp_listener.listener.accept() => {
1098 debug!("{name}: accepting tcp proxy connection");
1099 let uds_path = uds_path.clone();
1100 conns.push(Box::pin(async move {
1101 let (mut tcp_conn, _) = res.context("accepting tcp connection")?;
1102 let mut uds_conn = UnixStream::connect(uds_path)
1103 .await
1104 .context("making uds connection")?;
1105 io::copy_bidirectional(&mut tcp_conn, &mut uds_conn)
1106 .await
1107 .context("proxying")
1108 }));
1109 }
1110 Some(result) = conns.next() => if let Err(e) = result {
1111 warn!("{name}: tcp proxy connection failed: {}", e.display_with_causes());
1112 }
1113 }
1114 }
1115}
1116
1117struct ProcessStateUpdater {
1118 namespace: String,
1119 id: String,
1120 i: usize,
1121 services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
1122 service_event_tx: broadcast::Sender<ServiceEvent>,
1123}
1124
1125impl ProcessStateUpdater {
1126 fn update_state(&self, status: ProcessStatus) {
1127 let mut services = self.services.lock().expect("lock poisoned");
1128 let Some(process_states) = services.get_mut(&self.id) else {
1129 return;
1130 };
1131 let Some(process_state) = process_states.get_mut(self.i) else {
1132 return;
1133 };
1134 let status_time = Utc::now();
1135 process_state.status = status;
1136 process_state.status_time = status_time;
1137 let _ = self.service_event_tx.send(ServiceEvent {
1138 service_id: self.id.to_string(),
1139 process_id: u64::cast_from(self.i),
1140 status: status.into(),
1141 time: status_time,
1142 });
1143 }
1144}
1145
1146#[derive(Debug)]
1147struct ProcessState {
1148 _handle: AbortOnDropHandle<()>,
1149 status: ProcessStatus,
1150 status_time: DateTime<Utc>,
1151 labels: BTreeMap<String, String>,
1152 tcp_proxy_addrs: BTreeMap<String, SocketAddr>,
1153}
1154
1155impl ProcessState {
1156 fn pid(&self) -> Option<Pid> {
1157 match &self.status {
1158 ProcessStatus::NotReady => None,
1159 ProcessStatus::Ready { pid } => Some(*pid),
1160 }
1161 }
1162}
1163
1164#[derive(Debug, Clone, Copy)]
1165enum ProcessStatus {
1166 NotReady,
1167 Ready { pid: Pid },
1168}
1169
1170impl From<ProcessStatus> for ServiceStatus {
1171 fn from(status: ProcessStatus) -> ServiceStatus {
1172 match status {
1173 ProcessStatus::NotReady => ServiceStatus::Offline(None),
1174 ProcessStatus::Ready { .. } => ServiceStatus::Online,
1175 }
1176 }
1177}
1178
1179fn socket_path(run_dir: &Path, port: &str, process: u16) -> String {
1180 let desired = run_dir
1181 .join(format!("{port}-{process}"))
1182 .to_string_lossy()
1183 .into_owned();
1184 if UnixSocketAddr::from_pathname(&desired).is_err() {
1185 env::temp_dir()
1188 .join(hex::encode(Sha1::digest(desired)))
1189 .display()
1190 .to_string()
1191 } else {
1192 desired
1193 }
1194}
1195
1196struct AddressedTcpListener {
1197 listener: TcpListener,
1198 local_addr: SocketAddr,
1199}
1200
1201#[derive(Debug)]
1202struct ProcessService {
1203 run_dir: PathBuf,
1204 scale: NonZero<u16>,
1205}
1206
1207impl Service for ProcessService {
1208 fn addresses(&self, port: &str) -> Vec<String> {
1209 (0..self.scale.get())
1210 .map(|i| socket_path(&self.run_dir, port, i))
1211 .collect()
1212 }
1213}