1use std::collections::BTreeMap;
11use std::env;
12use std::ffi::OsStr;
13use std::fmt::Debug;
14use std::fs::Permissions;
15use std::future::Future;
16use std::net::{IpAddr, SocketAddr, TcpListener as StdTcpListener};
17use std::os::unix::fs::PermissionsExt;
18use std::os::unix::process::ExitStatusExt;
19use std::path::{Path, PathBuf};
20use std::process::{ExitStatus, Stdio};
21use std::str::FromStr;
22use std::sync::atomic::{AtomicBool, Ordering};
23use std::sync::{Arc, Mutex};
24
25use anyhow::{Context, anyhow, bail};
26use async_stream::stream;
27use async_trait::async_trait;
28use chrono::{DateTime, Utc};
29use futures::StreamExt;
30use futures::stream::{BoxStream, FuturesUnordered};
31use itertools::Itertools;
32use libc::{SIGABRT, SIGBUS, SIGILL, SIGSEGV, SIGTRAP};
33use maplit::btreemap;
34use mz_orchestrator::scheduling_config::ServiceSchedulingConfig;
35use mz_orchestrator::{
36 CpuLimit, DiskLimit, MemoryLimit, NamespacedOrchestrator, Orchestrator, Service,
37 ServiceAssignments, ServiceConfig, ServiceEvent, ServicePort, ServiceProcessMetrics,
38 ServiceStatus,
39};
40use mz_ore::cast::{CastFrom, TryCastFrom};
41use mz_ore::error::ErrorExt;
42use mz_ore::netio::UnixSocketAddr;
43use mz_ore::result::ResultExt;
44use mz_ore::task::AbortOnDropHandle;
45use scopeguard::defer;
46use serde::Serialize;
47use sha1::{Digest, Sha1};
48use sysinfo::{Pid, PidExt, Process, ProcessExt, ProcessRefreshKind, System, SystemExt};
49use tokio::fs::remove_dir_all;
50use tokio::net::{TcpListener, UnixStream};
51use tokio::process::{Child, Command};
52use tokio::sync::{broadcast, mpsc, oneshot};
53use tokio::time::{self, Duration};
54use tokio::{fs, io, select};
55use tracing::{debug, error, info, warn};
56
57pub mod secrets;
58
59#[derive(Debug, Clone)]
61pub struct ProcessOrchestratorConfig {
62 pub image_dir: PathBuf,
65 pub suppress_output: bool,
67 pub environment_id: String,
69 pub secrets_dir: PathBuf,
71 pub command_wrapper: Vec<String>,
73 pub propagate_crashes: bool,
75 pub tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
86 pub scratch_directory: PathBuf,
88}
89
90#[derive(Debug, Clone)]
94pub struct ProcessOrchestratorTcpProxyConfig {
95 pub listen_addr: IpAddr,
97 pub prometheus_service_discovery_dir: Option<PathBuf>,
107}
108
109#[derive(Debug)]
115pub struct ProcessOrchestrator {
116 image_dir: PathBuf,
117 suppress_output: bool,
118 namespaces: Mutex<BTreeMap<String, Arc<dyn NamespacedOrchestrator>>>,
119 metadata_dir: PathBuf,
120 secrets_dir: PathBuf,
121 command_wrapper: Vec<String>,
122 propagate_crashes: bool,
123 tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
124 scratch_directory: PathBuf,
125 launch_spec: LaunchSpec,
126}
127
128#[derive(Debug, Clone, Copy)]
129enum LaunchSpec {
130 Direct,
132 Systemd,
134}
135
136impl LaunchSpec {
137 fn determine_implementation() -> Result<Self, anyhow::Error> {
138 match Path::new("/run/systemd/system/").try_exists()? {
142 true => Ok(Self::Systemd),
143 false => Ok(Self::Direct),
144 }
145 }
146
147 fn refine_command(
148 &self,
149 image: impl AsRef<OsStr>,
150 args: &[impl AsRef<OsStr>],
151 wrapper: &[String],
152 memory_limit: Option<&MemoryLimit>,
153 cpu_limit: Option<&CpuLimit>,
154 ) -> Command {
155 let mut cmd = match self {
156 Self::Direct => {
157 if let Some((program, wrapper_args)) = wrapper.split_first() {
158 let mut cmd = Command::new(program);
159 cmd.args(wrapper_args);
160 cmd.arg(image);
161 cmd
162 } else {
163 Command::new(image)
164 }
165 }
166 Self::Systemd => {
167 let mut cmd = Command::new("systemd-run");
168 cmd.args(["--user", "--scope", "--quiet"]);
169 if let Some(memory_limit) = memory_limit {
170 let memory_limit = memory_limit.0.as_u64();
171 cmd.args(["-p", &format!("MemoryMax={memory_limit}")]);
172 }
174 if let Some(cpu_limit) = cpu_limit {
175 let cpu_limit = (cpu_limit.as_millicpus() + 9) / 10;
176 cmd.args(["-p", &format!("CPUQuota={cpu_limit}%")]);
177 }
178
179 cmd.args(wrapper);
180 cmd.arg(image);
181 cmd
182 }
183 };
184 cmd.args(args);
185 cmd
186 }
187}
188
189impl ProcessOrchestrator {
190 pub async fn new(
192 ProcessOrchestratorConfig {
193 image_dir,
194 suppress_output,
195 environment_id,
196 secrets_dir,
197 command_wrapper,
198 propagate_crashes,
199 tcp_proxy,
200 scratch_directory,
201 }: ProcessOrchestratorConfig,
202 ) -> Result<ProcessOrchestrator, anyhow::Error> {
203 let metadata_dir = env::temp_dir().join(format!("environmentd-{environment_id}"));
204 fs::create_dir_all(&metadata_dir)
205 .await
206 .context("creating metadata directory")?;
207 fs::create_dir_all(&secrets_dir)
208 .await
209 .context("creating secrets directory")?;
210 fs::set_permissions(&secrets_dir, Permissions::from_mode(0o700))
211 .await
212 .context("setting secrets directory permissions")?;
213 if let Some(prometheus_dir) = tcp_proxy
214 .as_ref()
215 .and_then(|p| p.prometheus_service_discovery_dir.as_ref())
216 {
217 fs::create_dir_all(&prometheus_dir)
218 .await
219 .context("creating prometheus directory")?;
220 }
221
222 let launch_spec = LaunchSpec::determine_implementation()?;
223 info!(driver = ?launch_spec, "Process orchestrator launch spec");
224
225 Ok(ProcessOrchestrator {
226 image_dir: fs::canonicalize(image_dir).await?,
227 suppress_output,
228 namespaces: Mutex::new(BTreeMap::new()),
229 metadata_dir: fs::canonicalize(metadata_dir).await?,
230 secrets_dir: fs::canonicalize(secrets_dir).await?,
231 command_wrapper,
232 propagate_crashes,
233 tcp_proxy,
234 scratch_directory,
235 launch_spec,
236 })
237 }
238}
239
240impl Orchestrator for ProcessOrchestrator {
241 fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
242 let mut namespaces = self.namespaces.lock().expect("lock poisoned");
243 Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| {
244 let config = Arc::new(NamespacedProcessOrchestratorConfig {
245 namespace: namespace.into(),
246 image_dir: self.image_dir.clone(),
247 suppress_output: self.suppress_output,
248 metadata_dir: self.metadata_dir.clone(),
249 command_wrapper: self.command_wrapper.clone(),
250 propagate_crashes: self.propagate_crashes,
251 tcp_proxy: self.tcp_proxy.clone(),
252 scratch_directory: self.scratch_directory.clone(),
253 launch_spec: self.launch_spec,
254 });
255
256 let services = Arc::new(Mutex::new(BTreeMap::new()));
257 let (service_event_tx, service_event_rx) = broadcast::channel(16384);
258 let (command_tx, command_rx) = mpsc::unbounded_channel();
259
260 let worker = OrchestratorWorker {
261 config: Arc::clone(&config),
262 services: Arc::clone(&services),
263 service_event_tx,
264 system: System::new(),
265 command_rx,
266 }
267 .spawn();
268
269 Arc::new(NamespacedProcessOrchestrator {
270 config,
271 services,
272 service_event_rx,
273 command_tx,
274 scheduling_config: Default::default(),
275 _worker: worker,
276 })
277 }))
278 }
279}
280
281#[derive(Debug)]
283struct NamespacedProcessOrchestratorConfig {
284 namespace: String,
285 image_dir: PathBuf,
286 suppress_output: bool,
287 metadata_dir: PathBuf,
288 command_wrapper: Vec<String>,
289 propagate_crashes: bool,
290 tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
291 scratch_directory: PathBuf,
292 launch_spec: LaunchSpec,
293}
294
295impl NamespacedProcessOrchestratorConfig {
296 fn full_id(&self, id: &str) -> String {
297 format!("{}-{}", self.namespace, id)
298 }
299
300 fn service_run_dir(&self, id: &str) -> PathBuf {
301 self.metadata_dir.join(&self.full_id(id))
302 }
303
304 fn service_scratch_dir(&self, id: &str) -> PathBuf {
305 self.scratch_directory.join(&self.full_id(id))
306 }
307}
308
309#[derive(Debug)]
310struct NamespacedProcessOrchestrator {
311 config: Arc<NamespacedProcessOrchestratorConfig>,
312 services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
313 service_event_rx: broadcast::Receiver<ServiceEvent>,
314 command_tx: mpsc::UnboundedSender<WorkerCommand>,
315 scheduling_config: std::sync::RwLock<ServiceSchedulingConfig>,
316 _worker: AbortOnDropHandle<()>,
317}
318
319impl NamespacedProcessOrchestrator {
320 fn send_command(&self, cmd: WorkerCommand) {
321 self.command_tx.send(cmd).expect("worker task not dropped");
322 }
323}
324
325#[async_trait]
326impl NamespacedOrchestrator for NamespacedProcessOrchestrator {
327 fn ensure_service(
328 &self,
329 id: &str,
330 config: ServiceConfig,
331 ) -> Result<Box<dyn Service>, anyhow::Error> {
332 let service = ProcessService {
333 run_dir: self.config.service_run_dir(id),
334 scale: config.scale,
335 };
336
337 let disk = config.disk_limit != Some(DiskLimit::ZERO);
339
340 let config = EnsureServiceConfig {
341 image: config.image,
342 args: config.args,
343 ports: config.ports,
344 memory_limit: config.memory_limit,
345 cpu_limit: config.cpu_limit,
346 scale: config.scale,
347 labels: config.labels,
348 disk,
349 };
350
351 self.send_command(WorkerCommand::EnsureService {
352 id: id.to_string(),
353 config,
354 });
355
356 Ok(Box::new(service))
357 }
358
359 fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
360 self.send_command(WorkerCommand::DropService { id: id.to_string() });
361 Ok(())
362 }
363
364 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
365 let (result_tx, result_rx) = oneshot::channel();
366 self.send_command(WorkerCommand::ListServices { result_tx });
367
368 result_rx.await.expect("worker task not dropped")
369 }
370
371 fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
372 let mut initial_events = vec![];
373 let mut service_event_rx = {
374 let services = self.services.lock().expect("lock poisoned");
375 for (service_id, process_states) in &*services {
376 for (process_id, process_state) in process_states.iter().enumerate() {
377 initial_events.push(ServiceEvent {
378 service_id: service_id.clone(),
379 process_id: u64::cast_from(process_id),
380 status: process_state.status.into(),
381 time: process_state.status_time,
382 });
383 }
384 }
385 self.service_event_rx.resubscribe()
386 };
387 Box::pin(stream! {
388 for event in initial_events {
389 yield Ok(event);
390 }
391 loop {
392 yield service_event_rx.recv().await.err_into();
393 }
394 })
395 }
396
397 async fn fetch_service_metrics(
398 &self,
399 id: &str,
400 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
401 let (result_tx, result_rx) = oneshot::channel();
402 self.send_command(WorkerCommand::FetchServiceMetrics {
403 id: id.to_string(),
404 result_tx,
405 });
406
407 result_rx.await.expect("worker task not dropped")
408 }
409
410 fn update_scheduling_config(
411 &self,
412 config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
413 ) {
414 *self.scheduling_config.write().expect("poisoned") = config;
415 }
416}
417
418enum WorkerCommand {
424 EnsureService {
425 id: String,
426 config: EnsureServiceConfig,
427 },
428 DropService {
429 id: String,
430 },
431 ListServices {
432 result_tx: oneshot::Sender<Result<Vec<String>, anyhow::Error>>,
433 },
434 FetchServiceMetrics {
435 id: String,
436 result_tx: oneshot::Sender<Result<Vec<ServiceProcessMetrics>, anyhow::Error>>,
437 },
438}
439
440struct EnsureServiceConfig {
442 pub image: String,
446 pub args: Box<dyn Fn(ServiceAssignments) -> Vec<String> + Send + Sync>,
449 pub ports: Vec<ServicePort>,
451 pub memory_limit: Option<MemoryLimit>,
453 pub cpu_limit: Option<CpuLimit>,
455 pub scale: u16,
457 pub labels: BTreeMap<String, String>,
462 pub disk: bool,
464}
465
466struct OrchestratorWorker {
478 config: Arc<NamespacedProcessOrchestratorConfig>,
479 services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
480 service_event_tx: broadcast::Sender<ServiceEvent>,
481 system: System,
482 command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
483}
484
485impl OrchestratorWorker {
486 fn spawn(self) -> AbortOnDropHandle<()> {
487 let name = format!("process-orchestrator:{}", self.config.namespace);
488 mz_ore::task::spawn(|| name, self.run()).abort_on_drop()
489 }
490
491 async fn run(mut self) {
492 while let Some(cmd) = self.command_rx.recv().await {
493 use WorkerCommand::*;
494 let result = match cmd {
495 EnsureService { id, config } => self.ensure_service(id, config).await,
496 DropService { id } => self.drop_service(&id).await,
497 ListServices { result_tx } => {
498 let _ = result_tx.send(self.list_services().await);
499 Ok(())
500 }
501 FetchServiceMetrics { id, result_tx } => {
502 let _ = result_tx.send(self.fetch_service_metrics(&id));
503 Ok(())
504 }
505 };
506
507 if let Err(error) = result {
508 panic!("process orchestrator worker failed: {error}");
509 }
510 }
511 }
512
513 fn fetch_service_metrics(
514 &mut self,
515 id: &str,
516 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
517 let pids: Vec<_> = {
518 let services = self.services.lock().expect("lock poisoned");
519 let Some(service) = services.get(id) else {
520 bail!("unknown service {id}")
521 };
522 service.iter().map(|p| p.pid()).collect()
523 };
524
525 let mut metrics = vec![];
526 for pid in pids {
527 let (cpu_nano_cores, memory_bytes) = match pid {
528 None => (None, None),
529 Some(pid) => {
530 self.system
531 .refresh_process_specifics(pid, ProcessRefreshKind::new().with_cpu());
532 match self.system.process(pid) {
533 None => (None, None),
534 Some(process) => {
535 let cpu = u64::try_cast_from(
544 (f64::from(process.cpu_usage()) * 10_000_000.0).trunc(),
545 )
546 .expect("sane value of process.cpu_usage()");
547 let memory = process.memory();
548 (Some(cpu), Some(memory))
549 }
550 }
551 }
552 };
553 metrics.push(ServiceProcessMetrics {
554 cpu_nano_cores,
555 memory_bytes,
556 disk_bytes: None,
558 heap_bytes: None,
559 heap_limit: None,
560 });
561 }
562 Ok(metrics)
563 }
564
565 async fn ensure_service(
566 &self,
567 id: String,
568 EnsureServiceConfig {
569 image,
570 args,
571 ports: ports_in,
572 memory_limit,
573 cpu_limit,
574 scale,
575 labels,
576 disk,
577 }: EnsureServiceConfig,
578 ) -> Result<(), anyhow::Error> {
579 let full_id = self.config.full_id(&id);
580
581 let run_dir = self.config.service_run_dir(&id);
582 fs::create_dir_all(&run_dir)
583 .await
584 .context("creating run directory")?;
585 let scratch_dir = if disk {
586 let scratch_dir = self.config.service_scratch_dir(&id);
587 fs::create_dir_all(&scratch_dir)
588 .await
589 .context("creating scratch directory")?;
590 Some(fs::canonicalize(&scratch_dir).await?)
591 } else {
592 None
593 };
594
595 let old_scale = {
598 let services = self.services.lock().expect("poisoned");
599 services.get(&id).map(|states| states.len())
600 };
601 match old_scale {
602 Some(old) if old == usize::from(scale) => return Ok(()),
603 Some(_) => self.drop_service(&id).await?,
604 None => (),
605 }
606
607 let mut peer_addrs = Vec::new();
609 for i in 0..scale.into() {
610 let addresses = ports_in
611 .iter()
612 .map(|port| {
613 let addr = socket_path(&run_dir, &port.name, i);
614 (port.name.clone(), addr)
615 })
616 .collect();
617 peer_addrs.push(addresses);
618 }
619
620 {
621 let mut services = self.services.lock().expect("lock poisoned");
622
623 let mut process_states = vec![];
625 for i in 0..scale.into() {
626 let listen_addrs = &peer_addrs[i];
627
628 let mut command_wrapper = self.config.command_wrapper.clone();
630 if let Some(parts) = command_wrapper.get_mut(1..) {
631 for part in parts {
632 *part = interpolate_command(&part[..], &full_id, listen_addrs);
633 }
634 }
635
636 let mut ports = vec![];
638 let mut tcp_proxy_addrs = BTreeMap::new();
639 for port in &ports_in {
640 let tcp_proxy_listener = match &self.config.tcp_proxy {
641 None => None,
642 Some(tcp_proxy) => {
643 let listener = StdTcpListener::bind((tcp_proxy.listen_addr, 0))
644 .with_context(|| format!("binding to {}", tcp_proxy.listen_addr))?;
645 listener.set_nonblocking(true)?;
646 let listener = TcpListener::from_std(listener)?;
647 let local_addr = listener.local_addr()?;
648 tcp_proxy_addrs.insert(port.name.clone(), local_addr);
649 Some(AddressedTcpListener {
650 listener,
651 local_addr,
652 })
653 }
654 };
655 ports.push(ServiceProcessPort {
656 name: port.name.clone(),
657 listen_addr: listen_addrs[&port.name].clone(),
658 tcp_proxy_listener,
659 });
660 }
661
662 let mut args = args(ServiceAssignments {
663 listen_addrs,
664 peer_addrs: &peer_addrs,
665 });
666 args.push(format!("--process={i}"));
667 if disk {
668 if let Some(scratch) = &scratch_dir {
669 args.push(format!("--scratch-directory={}", scratch.display()));
670 } else {
671 panic!(
672 "internal error: service requested disk but no scratch directory was configured"
673 );
674 }
675 }
676
677 let handle = mz_ore::task::spawn(
679 || format!("process-orchestrator:{full_id}-{i}"),
680 self.supervise_service_process(ServiceProcessConfig {
681 id: id.to_string(),
682 run_dir: run_dir.clone(),
683 i,
684 image: image.clone(),
685 args,
686 command_wrapper,
687 ports,
688 memory_limit,
689 cpu_limit,
690 launch_spec: self.config.launch_spec,
691 }),
692 );
693
694 process_states.push(ProcessState {
695 _handle: handle.abort_on_drop(),
696 status: ProcessStatus::NotReady,
697 status_time: Utc::now(),
698 labels: labels.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
699 tcp_proxy_addrs,
700 });
701 }
702
703 services.insert(id, process_states);
706 }
707
708 self.maybe_write_prometheus_service_discovery_file().await;
709
710 Ok(())
711 }
712
713 async fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
714 let full_id = self.config.full_id(id);
715 let run_dir = self.config.service_run_dir(id);
716 let scratch_dir = self.config.service_scratch_dir(id);
717
718 {
722 let mut supervisors = self.services.lock().expect("lock poisoned");
723 supervisors.remove(id);
724 }
725
726 if let Ok(mut entries) = fs::read_dir(&run_dir).await {
731 while let Some(entry) = entries.next_entry().await? {
732 let path = entry.path();
733 if path.extension() == Some(OsStr::new("pid")) {
734 let mut system = System::new();
735 let Some(process) = find_process_from_pid_file(&mut system, &path).await else {
736 continue;
737 };
738 let pid = process.pid();
739 info!("terminating orphaned process for {full_id} with PID {pid}");
740 process.kill();
741 }
742 }
743 }
744
745 if let Err(e) = remove_dir_all(run_dir).await {
747 if e.kind() != io::ErrorKind::NotFound {
748 warn!(
749 "error cleaning up run directory for {full_id}: {}",
750 e.display_with_causes()
751 );
752 }
753 }
754 if let Err(e) = remove_dir_all(scratch_dir).await {
755 if e.kind() != io::ErrorKind::NotFound {
756 warn!(
757 "error cleaning up scratch directory for {full_id}: {}",
758 e.display_with_causes()
759 );
760 }
761 }
762
763 self.maybe_write_prometheus_service_discovery_file().await;
764 Ok(())
765 }
766
767 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
768 let mut services = vec![];
769 let namespace_prefix = format!("{}-", self.config.namespace);
770 let mut entries = fs::read_dir(&self.config.metadata_dir).await?;
771 while let Some(entry) = entries.next_entry().await? {
772 let filename = entry
773 .file_name()
774 .into_string()
775 .map_err(|_| anyhow!("unable to convert filename to string"))?;
776 if let Some(id) = filename.strip_prefix(&namespace_prefix) {
777 services.push(id.to_string());
778 }
779 }
780 Ok(services)
781 }
782
783 fn supervise_service_process(
784 &self,
785 ServiceProcessConfig {
786 id,
787 run_dir,
788 i,
789 image,
790 args,
791 command_wrapper,
792 ports,
793 memory_limit,
794 cpu_limit,
795 launch_spec,
796 }: ServiceProcessConfig,
797 ) -> impl Future<Output = ()> + use<> {
798 let suppress_output = self.config.suppress_output;
799 let propagate_crashes = self.config.propagate_crashes;
800 let image = self.config.image_dir.join(image);
801 let pid_file = run_dir.join(format!("{i}.pid"));
802 let full_id = self.config.full_id(&id);
803
804 let state_updater = ProcessStateUpdater {
805 namespace: self.config.namespace.clone(),
806 id,
807 i,
808 services: Arc::clone(&self.services),
809 service_event_tx: self.service_event_tx.clone(),
810 };
811
812 async move {
813 let mut proxy_handles = vec![];
814 for port in ports {
815 if let Some(tcp_listener) = port.tcp_proxy_listener {
816 info!(
817 "{full_id}-{i}: {} tcp proxy listening on {}",
818 port.name, tcp_listener.local_addr,
819 );
820 let uds_path = port.listen_addr;
821 let handle = mz_ore::task::spawn(
822 || format!("{full_id}-{i}-proxy-{}", port.name),
823 tcp_proxy(TcpProxyConfig {
824 name: format!("{full_id}-{i}-{}", port.name),
825 tcp_listener,
826 uds_path: uds_path.clone(),
827 }),
828 );
829 proxy_handles.push(handle.abort_on_drop());
830 }
831 }
832
833 supervise_existing_process(&state_updater, &pid_file).await;
834
835 loop {
836 let mut cmd = launch_spec.refine_command(
837 &image,
838 &args,
839 &command_wrapper,
840 memory_limit.as_ref(),
841 cpu_limit.as_ref(),
842 );
843 info!(
844 "launching {full_id}-{i} via {} {}...",
845 cmd.as_std().get_program().to_string_lossy(),
846 cmd.as_std()
847 .get_args()
848 .map(|arg| arg.to_string_lossy())
849 .join(" ")
850 );
851 if suppress_output {
852 cmd.stdout(Stdio::null());
853 cmd.stderr(Stdio::null());
854 }
855 match spawn_process(&state_updater, cmd, &pid_file, !command_wrapper.is_empty())
856 .await
857 {
858 Ok(status) => {
859 if propagate_crashes && did_process_crash(status) {
860 panic!(
861 "{full_id}-{i} crashed; aborting because propagate_crashes is enabled"
862 );
863 }
864 error!("{full_id}-{i} exited: {:?}; relaunching in 5s", status);
865 }
866 Err(e) => {
867 error!("{full_id}-{i} failed to spawn: {}; relaunching in 5s", e);
868 }
869 };
870 state_updater.update_state(ProcessStatus::NotReady);
871 time::sleep(Duration::from_secs(5)).await;
872 }
873 }
874 }
875
876 async fn maybe_write_prometheus_service_discovery_file(&self) {
877 #[derive(Serialize)]
878 struct StaticConfig {
879 labels: BTreeMap<String, String>,
880 targets: Vec<String>,
881 }
882
883 let Some(tcp_proxy) = &self.config.tcp_proxy else {
884 return;
885 };
886 let Some(dir) = &tcp_proxy.prometheus_service_discovery_dir else {
887 return;
888 };
889
890 let mut static_configs = vec![];
891 {
892 let services = self.services.lock().expect("lock poisoned");
893 for (id, states) in &*services {
894 for (i, state) in states.iter().enumerate() {
895 for (name, addr) in &state.tcp_proxy_addrs {
896 let mut labels = btreemap! {
897 "mz_orchestrator_namespace".into() => self.config.namespace.clone(),
898 "mz_orchestrator_service_id".into() => id.clone(),
899 "mz_orchestrator_port".into() => name.clone(),
900 "mz_orchestrator_ordinal".into() => i.to_string(),
901 };
902 for (k, v) in &state.labels {
903 let k = format!("mz_orchestrator_{}", k.replace('-', "_"));
904 labels.insert(k, v.clone());
905 }
906 static_configs.push(StaticConfig {
907 labels,
908 targets: vec![addr.to_string()],
909 })
910 }
911 }
912 }
913 }
914
915 let path = dir.join(Path::new(&self.config.namespace).with_extension("json"));
916 let contents = serde_json::to_vec_pretty(&static_configs).expect("valid json");
917 if let Err(e) = fs::write(&path, &contents).await {
918 warn!(
919 "{}: failed to write prometheus service discovery file: {}",
920 self.config.namespace,
921 e.display_with_causes()
922 );
923 }
924 }
925}
926
927struct ServiceProcessConfig {
928 id: String,
929 run_dir: PathBuf,
930 i: usize,
931 image: String,
932 args: Vec<String>,
933 command_wrapper: Vec<String>,
934 ports: Vec<ServiceProcessPort>,
935 memory_limit: Option<MemoryLimit>,
936 cpu_limit: Option<CpuLimit>,
937 launch_spec: LaunchSpec,
938}
939
940struct ServiceProcessPort {
941 name: String,
942 listen_addr: String,
943 tcp_proxy_listener: Option<AddressedTcpListener>,
944}
945
946async fn supervise_existing_process(state_updater: &ProcessStateUpdater, pid_file: &Path) {
948 let name = format!(
949 "{}-{}-{}",
950 state_updater.namespace, state_updater.id, state_updater.i
951 );
952
953 let mut system = System::new();
954 let Some(process) = find_process_from_pid_file(&mut system, pid_file).await else {
955 return;
956 };
957 let pid = process.pid();
958
959 info!(%pid, "discovered existing process for {name}");
960 state_updater.update_state(ProcessStatus::Ready { pid });
961
962 let need_kill = AtomicBool::new(true);
964 defer! {
965 state_updater.update_state(ProcessStatus::NotReady);
966 if need_kill.load(Ordering::SeqCst) {
967 info!(%pid, "terminating existing process for {name}");
968 process.kill();
969 }
970 }
971
972 let mut system = System::new();
974 while system.refresh_process_specifics(pid, ProcessRefreshKind::new()) {
975 time::sleep(Duration::from_secs(5)).await;
976 }
977
978 warn!(%pid, "process for {name} has crashed; will reboot");
981 need_kill.store(false, Ordering::SeqCst)
982}
983
984fn interpolate_command(
985 command_part: &str,
986 full_id: &str,
987 ports: &BTreeMap<String, String>,
988) -> String {
989 let mut command_part = command_part.replace("%N", full_id);
990 for (endpoint, port) in ports {
991 command_part = command_part.replace(&format!("%P:{endpoint}"), port);
992 }
993 command_part
994}
995
996async fn spawn_process(
997 state_updater: &ProcessStateUpdater,
998 mut cmd: Command,
999 pid_file: &Path,
1000 send_sigterm: bool,
1001) -> Result<ExitStatus, anyhow::Error> {
1002 struct KillOnDropChild(Child, bool);
1003
1004 impl Drop for KillOnDropChild {
1005 fn drop(&mut self) {
1006 if let (Some(pid), true) = (self.0.id().and_then(|id| i32::try_from(id).ok()), self.1) {
1007 let _ = nix::sys::signal::kill(
1008 nix::unistd::Pid::from_raw(pid),
1009 nix::sys::signal::Signal::SIGTERM,
1010 );
1011 tokio::task::block_in_place(|| std::thread::sleep(Duration::from_millis(500)));
1013 }
1014 let _ = self.0.start_kill();
1015 }
1016 }
1017
1018 let mut child = KillOnDropChild(cmd.spawn()?, send_sigterm);
1019
1020 let pid = Pid::from_u32(child.0.id().unwrap());
1029 write_pid_file(pid_file, pid).await?;
1030 state_updater.update_state(ProcessStatus::Ready { pid });
1031 Ok(child.0.wait().await?)
1032}
1033
1034fn did_process_crash(status: ExitStatus) -> bool {
1035 matches!(
1039 status.signal(),
1040 Some(SIGABRT | SIGBUS | SIGSEGV | SIGTRAP | SIGILL)
1041 )
1042}
1043
1044async fn write_pid_file(pid_file: &Path, pid: Pid) -> Result<(), anyhow::Error> {
1045 let mut system = System::new();
1046 system.refresh_process_specifics(pid, ProcessRefreshKind::new());
1047 let start_time = system.process(pid).map_or(0, |p| p.start_time());
1048 fs::write(pid_file, format!("{pid}\n{start_time}\n")).await?;
1049 Ok(())
1050}
1051
1052async fn find_process_from_pid_file<'a>(
1053 system: &'a mut System,
1054 pid_file: &Path,
1055) -> Option<&'a Process> {
1056 let Ok(contents) = fs::read_to_string(pid_file).await else {
1057 return None;
1058 };
1059 let lines = contents.trim().split('\n').collect::<Vec<_>>();
1060 let [pid, start_time] = lines.as_slice() else {
1061 return None;
1062 };
1063 let Ok(pid) = Pid::from_str(pid) else {
1064 return None;
1065 };
1066 let Ok(start_time) = u64::from_str(start_time) else {
1067 return None;
1068 };
1069 system.refresh_process_specifics(pid, ProcessRefreshKind::new());
1070 let process = system.process(pid)?;
1071 if process.start_time() != start_time {
1074 return None;
1075 }
1076 Some(process)
1077}
1078
1079struct TcpProxyConfig {
1080 name: String,
1081 tcp_listener: AddressedTcpListener,
1082 uds_path: String,
1083}
1084
1085async fn tcp_proxy(
1086 TcpProxyConfig {
1087 name,
1088 tcp_listener,
1089 uds_path,
1090 }: TcpProxyConfig,
1091) {
1092 let mut conns = FuturesUnordered::new();
1093 loop {
1094 select! {
1095 res = tcp_listener.listener.accept() => {
1096 debug!("{name}: accepting tcp proxy connection");
1097 let uds_path = uds_path.clone();
1098 conns.push(Box::pin(async move {
1099 let (mut tcp_conn, _) = res.context("accepting tcp connection")?;
1100 let mut uds_conn = UnixStream::connect(uds_path)
1101 .await
1102 .context("making uds connection")?;
1103 io::copy_bidirectional(&mut tcp_conn, &mut uds_conn)
1104 .await
1105 .context("proxying")
1106 }));
1107 }
1108 Some(Err(e)) = conns.next() => {
1109 warn!("{name}: tcp proxy connection failed: {}", e.display_with_causes());
1110 }
1111 }
1112 }
1113}
1114
1115struct ProcessStateUpdater {
1116 namespace: String,
1117 id: String,
1118 i: usize,
1119 services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
1120 service_event_tx: broadcast::Sender<ServiceEvent>,
1121}
1122
1123impl ProcessStateUpdater {
1124 fn update_state(&self, status: ProcessStatus) {
1125 let mut services = self.services.lock().expect("lock poisoned");
1126 let Some(process_states) = services.get_mut(&self.id) else {
1127 return;
1128 };
1129 let Some(process_state) = process_states.get_mut(self.i) else {
1130 return;
1131 };
1132 let status_time = Utc::now();
1133 process_state.status = status;
1134 process_state.status_time = status_time;
1135 let _ = self.service_event_tx.send(ServiceEvent {
1136 service_id: self.id.to_string(),
1137 process_id: u64::cast_from(self.i),
1138 status: status.into(),
1139 time: status_time,
1140 });
1141 }
1142}
1143
1144#[derive(Debug)]
1145struct ProcessState {
1146 _handle: AbortOnDropHandle<()>,
1147 status: ProcessStatus,
1148 status_time: DateTime<Utc>,
1149 labels: BTreeMap<String, String>,
1150 tcp_proxy_addrs: BTreeMap<String, SocketAddr>,
1151}
1152
1153impl ProcessState {
1154 fn pid(&self) -> Option<Pid> {
1155 match &self.status {
1156 ProcessStatus::NotReady => None,
1157 ProcessStatus::Ready { pid } => Some(*pid),
1158 }
1159 }
1160}
1161
1162#[derive(Debug, Clone, Copy)]
1163enum ProcessStatus {
1164 NotReady,
1165 Ready { pid: Pid },
1166}
1167
1168impl From<ProcessStatus> for ServiceStatus {
1169 fn from(status: ProcessStatus) -> ServiceStatus {
1170 match status {
1171 ProcessStatus::NotReady => ServiceStatus::Offline(None),
1172 ProcessStatus::Ready { .. } => ServiceStatus::Online,
1173 }
1174 }
1175}
1176
1177fn socket_path(run_dir: &Path, port: &str, process: usize) -> String {
1178 let desired = run_dir
1179 .join(format!("{port}-{process}"))
1180 .to_string_lossy()
1181 .into_owned();
1182 if UnixSocketAddr::from_pathname(&desired).is_err() {
1183 env::temp_dir()
1186 .join(hex::encode(Sha1::digest(desired)))
1187 .display()
1188 .to_string()
1189 } else {
1190 desired
1191 }
1192}
1193
1194struct AddressedTcpListener {
1195 listener: TcpListener,
1196 local_addr: SocketAddr,
1197}
1198
1199#[derive(Debug, Clone)]
1200struct ProcessService {
1201 run_dir: PathBuf,
1202 scale: u16,
1203}
1204
1205impl Service for ProcessService {
1206 fn addresses(&self, port: &str) -> Vec<String> {
1207 (0..self.scale)
1208 .map(|i| socket_path(&self.run_dir, port, i.into()))
1209 .collect()
1210 }
1211}