1use std::collections::BTreeMap;
11use std::env;
12use std::ffi::OsStr;
13use std::fmt::Debug;
14use std::fs::Permissions;
15use std::future::Future;
16use std::net::{IpAddr, SocketAddr, TcpListener as StdTcpListener};
17use std::num::NonZero;
18use std::os::unix::fs::PermissionsExt;
19use std::os::unix::process::ExitStatusExt;
20use std::path::{Path, PathBuf};
21use std::process::{ExitStatus, Stdio};
22use std::str::FromStr;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::sync::{Arc, Mutex};
25
26use anyhow::{Context, anyhow, bail};
27use async_stream::stream;
28use async_trait::async_trait;
29use chrono::{DateTime, Utc};
30use futures::StreamExt;
31use futures::stream::{BoxStream, FuturesUnordered};
32use itertools::Itertools;
33use libc::{SIGABRT, SIGBUS, SIGILL, SIGSEGV, SIGTRAP};
34use maplit::btreemap;
35use mz_orchestrator::scheduling_config::ServiceSchedulingConfig;
36use mz_orchestrator::{
37 CpuLimit, DiskLimit, MemoryLimit, NamespacedOrchestrator, Orchestrator, Service,
38 ServiceAssignments, ServiceConfig, ServiceEvent, ServicePort, ServiceProcessMetrics,
39 ServiceStatus,
40};
41use mz_ore::cast::{CastFrom, TryCastFrom};
42use mz_ore::error::ErrorExt;
43use mz_ore::netio::UnixSocketAddr;
44use mz_ore::result::ResultExt;
45use mz_ore::task::AbortOnDropHandle;
46use scopeguard::defer;
47use serde::Serialize;
48use sha1::{Digest, Sha1};
49use sysinfo::{Pid, PidExt, Process, ProcessExt, ProcessRefreshKind, System, SystemExt};
50use tokio::fs::remove_dir_all;
51use tokio::net::{TcpListener, UnixStream};
52use tokio::process::{Child, Command};
53use tokio::sync::{broadcast, mpsc, oneshot};
54use tokio::time::{self, Duration};
55use tokio::{fs, io, select};
56use tracing::{debug, error, info, warn};
57
58pub mod secrets;
59
60#[derive(Debug, Clone)]
62pub struct ProcessOrchestratorConfig {
63 pub image_dir: PathBuf,
66 pub suppress_output: bool,
68 pub environment_id: String,
70 pub secrets_dir: PathBuf,
72 pub command_wrapper: Vec<String>,
74 pub propagate_crashes: bool,
76 pub tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
87 pub scratch_directory: PathBuf,
89}
90
91#[derive(Debug, Clone)]
95pub struct ProcessOrchestratorTcpProxyConfig {
96 pub listen_addr: IpAddr,
98 pub prometheus_service_discovery_dir: Option<PathBuf>,
108}
109
110#[derive(Debug)]
116pub struct ProcessOrchestrator {
117 image_dir: PathBuf,
118 suppress_output: bool,
119 namespaces: Mutex<BTreeMap<String, Arc<dyn NamespacedOrchestrator>>>,
120 metadata_dir: PathBuf,
121 secrets_dir: PathBuf,
122 command_wrapper: Vec<String>,
123 propagate_crashes: bool,
124 tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
125 scratch_directory: PathBuf,
126 launch_spec: LaunchSpec,
127}
128
129#[derive(Debug, Clone, Copy)]
130enum LaunchSpec {
131 Direct,
133 Systemd,
135}
136
137impl LaunchSpec {
138 fn determine_implementation() -> Result<Self, anyhow::Error> {
139 match Path::new("/run/systemd/system/").try_exists()? {
143 true => Ok(Self::Systemd),
144 false => Ok(Self::Direct),
145 }
146 }
147
148 fn refine_command(
149 &self,
150 image: impl AsRef<OsStr>,
151 args: &[impl AsRef<OsStr>],
152 wrapper: &[String],
153 memory_limit: Option<&MemoryLimit>,
154 cpu_limit: Option<&CpuLimit>,
155 ) -> Command {
156 let mut cmd = match self {
157 Self::Direct => {
158 if let Some((program, wrapper_args)) = wrapper.split_first() {
159 let mut cmd = Command::new(program);
160 cmd.args(wrapper_args);
161 cmd.arg(image);
162 cmd
163 } else {
164 Command::new(image)
165 }
166 }
167 Self::Systemd => {
168 let mut cmd = Command::new("systemd-run");
169 cmd.args(["--user", "--scope", "--quiet"]);
170 if let Some(memory_limit) = memory_limit {
171 let memory_limit = memory_limit.0.as_u64();
172 cmd.args(["-p", &format!("MemoryMax={memory_limit}")]);
173 }
175 if let Some(cpu_limit) = cpu_limit {
176 let cpu_limit = (cpu_limit.as_millicpus() + 9) / 10;
177 cmd.args(["-p", &format!("CPUQuota={cpu_limit}%")]);
178 }
179
180 cmd.args(wrapper);
181 cmd.arg(image);
182 cmd
183 }
184 };
185 cmd.args(args);
186 cmd
187 }
188}
189
190impl ProcessOrchestrator {
191 pub async fn new(
193 ProcessOrchestratorConfig {
194 image_dir,
195 suppress_output,
196 environment_id,
197 secrets_dir,
198 command_wrapper,
199 propagate_crashes,
200 tcp_proxy,
201 scratch_directory,
202 }: ProcessOrchestratorConfig,
203 ) -> Result<ProcessOrchestrator, anyhow::Error> {
204 let metadata_dir = env::temp_dir().join(format!("environmentd-{environment_id}"));
205 fs::create_dir_all(&metadata_dir)
206 .await
207 .context("creating metadata directory")?;
208 fs::create_dir_all(&secrets_dir)
209 .await
210 .context("creating secrets directory")?;
211 fs::set_permissions(&secrets_dir, Permissions::from_mode(0o700))
212 .await
213 .context("setting secrets directory permissions")?;
214 if let Some(prometheus_dir) = tcp_proxy
215 .as_ref()
216 .and_then(|p| p.prometheus_service_discovery_dir.as_ref())
217 {
218 fs::create_dir_all(&prometheus_dir)
219 .await
220 .context("creating prometheus directory")?;
221 }
222
223 let launch_spec = LaunchSpec::determine_implementation()?;
224 info!(driver = ?launch_spec, "Process orchestrator launch spec");
225
226 Ok(ProcessOrchestrator {
227 image_dir: fs::canonicalize(image_dir).await?,
228 suppress_output,
229 namespaces: Mutex::new(BTreeMap::new()),
230 metadata_dir: fs::canonicalize(metadata_dir).await?,
231 secrets_dir: fs::canonicalize(secrets_dir).await?,
232 command_wrapper,
233 propagate_crashes,
234 tcp_proxy,
235 scratch_directory,
236 launch_spec,
237 })
238 }
239}
240
241impl Orchestrator for ProcessOrchestrator {
242 fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
243 let mut namespaces = self.namespaces.lock().expect("lock poisoned");
244 Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| {
245 let config = Arc::new(NamespacedProcessOrchestratorConfig {
246 namespace: namespace.into(),
247 image_dir: self.image_dir.clone(),
248 suppress_output: self.suppress_output,
249 metadata_dir: self.metadata_dir.clone(),
250 command_wrapper: self.command_wrapper.clone(),
251 propagate_crashes: self.propagate_crashes,
252 tcp_proxy: self.tcp_proxy.clone(),
253 scratch_directory: self.scratch_directory.clone(),
254 launch_spec: self.launch_spec,
255 });
256
257 let services = Arc::new(Mutex::new(BTreeMap::new()));
258 let (service_event_tx, service_event_rx) = broadcast::channel(16384);
259 let (command_tx, command_rx) = mpsc::unbounded_channel();
260
261 let worker = OrchestratorWorker {
262 config: Arc::clone(&config),
263 services: Arc::clone(&services),
264 service_event_tx,
265 system: System::new(),
266 command_rx,
267 }
268 .spawn();
269
270 Arc::new(NamespacedProcessOrchestrator {
271 config,
272 services,
273 service_event_rx,
274 command_tx,
275 scheduling_config: Default::default(),
276 _worker: worker,
277 })
278 }))
279 }
280}
281
282#[derive(Debug)]
284struct NamespacedProcessOrchestratorConfig {
285 namespace: String,
286 image_dir: PathBuf,
287 suppress_output: bool,
288 metadata_dir: PathBuf,
289 command_wrapper: Vec<String>,
290 propagate_crashes: bool,
291 tcp_proxy: Option<ProcessOrchestratorTcpProxyConfig>,
292 scratch_directory: PathBuf,
293 launch_spec: LaunchSpec,
294}
295
296impl NamespacedProcessOrchestratorConfig {
297 fn full_id(&self, id: &str) -> String {
298 format!("{}-{}", self.namespace, id)
299 }
300
301 fn service_run_dir(&self, id: &str) -> PathBuf {
302 self.metadata_dir.join(&self.full_id(id))
303 }
304
305 fn service_scratch_dir(&self, id: &str) -> PathBuf {
306 self.scratch_directory.join(&self.full_id(id))
307 }
308}
309
310#[derive(Debug)]
311struct NamespacedProcessOrchestrator {
312 config: Arc<NamespacedProcessOrchestratorConfig>,
313 services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
314 service_event_rx: broadcast::Receiver<ServiceEvent>,
315 command_tx: mpsc::UnboundedSender<WorkerCommand>,
316 scheduling_config: std::sync::RwLock<ServiceSchedulingConfig>,
317 _worker: AbortOnDropHandle<()>,
318}
319
320impl NamespacedProcessOrchestrator {
321 fn send_command(&self, cmd: WorkerCommand) {
322 self.command_tx.send(cmd).expect("worker task not dropped");
323 }
324}
325
326#[async_trait]
327impl NamespacedOrchestrator for NamespacedProcessOrchestrator {
328 fn ensure_service(
329 &self,
330 id: &str,
331 config: ServiceConfig,
332 ) -> Result<Box<dyn Service>, anyhow::Error> {
333 let service = ProcessService {
334 id: id.to_string(),
335 run_dir: self.config.service_run_dir(id),
336 scale: config.scale,
337 services: Arc::clone(&self.services),
338 };
339
340 let disk = config.disk_limit != Some(DiskLimit::ZERO);
342
343 let config = EnsureServiceConfig {
344 image: config.image,
345 args: config.args,
346 ports: config.ports,
347 memory_limit: config.memory_limit,
348 cpu_limit: config.cpu_limit,
349 scale: config.scale,
350 labels: config.labels,
351 disk,
352 };
353
354 self.send_command(WorkerCommand::EnsureService {
355 id: id.to_string(),
356 config,
357 });
358
359 Ok(Box::new(service))
360 }
361
362 fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
363 self.send_command(WorkerCommand::DropService { id: id.to_string() });
364 Ok(())
365 }
366
367 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
368 let (result_tx, result_rx) = oneshot::channel();
369 self.send_command(WorkerCommand::ListServices { result_tx });
370
371 result_rx.await.expect("worker task not dropped")
372 }
373
374 fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
375 let mut initial_events = vec![];
376 let mut service_event_rx = {
377 let services = self.services.lock().expect("lock poisoned");
378 for (service_id, process_states) in &*services {
379 for (process_id, process_state) in process_states.iter().enumerate() {
380 initial_events.push(ServiceEvent {
381 service_id: service_id.clone(),
382 process_id: u64::cast_from(process_id),
383 status: process_state.status.into(),
384 time: process_state.status_time,
385 });
386 }
387 }
388 self.service_event_rx.resubscribe()
389 };
390 Box::pin(stream! {
391 for event in initial_events {
392 yield Ok(event);
393 }
394 loop {
395 yield service_event_rx.recv().await.err_into();
396 }
397 })
398 }
399
400 async fn fetch_service_metrics(
401 &self,
402 id: &str,
403 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
404 let (result_tx, result_rx) = oneshot::channel();
405 self.send_command(WorkerCommand::FetchServiceMetrics {
406 id: id.to_string(),
407 result_tx,
408 });
409
410 result_rx.await.expect("worker task not dropped")
411 }
412
413 fn update_scheduling_config(
414 &self,
415 config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
416 ) {
417 *self.scheduling_config.write().expect("poisoned") = config;
418 }
419}
420
421enum WorkerCommand {
427 EnsureService {
428 id: String,
429 config: EnsureServiceConfig,
430 },
431 DropService {
432 id: String,
433 },
434 ListServices {
435 result_tx: oneshot::Sender<Result<Vec<String>, anyhow::Error>>,
436 },
437 FetchServiceMetrics {
438 id: String,
439 result_tx: oneshot::Sender<Result<Vec<ServiceProcessMetrics>, anyhow::Error>>,
440 },
441}
442
443struct EnsureServiceConfig {
445 pub image: String,
449 pub args: Box<dyn Fn(ServiceAssignments) -> Vec<String> + Send + Sync>,
452 pub ports: Vec<ServicePort>,
454 pub memory_limit: Option<MemoryLimit>,
456 pub cpu_limit: Option<CpuLimit>,
458 pub scale: NonZero<u16>,
460 pub labels: BTreeMap<String, String>,
465 pub disk: bool,
467}
468
469struct OrchestratorWorker {
481 config: Arc<NamespacedProcessOrchestratorConfig>,
482 services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
483 service_event_tx: broadcast::Sender<ServiceEvent>,
484 system: System,
485 command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
486}
487
488impl OrchestratorWorker {
489 fn spawn(self) -> AbortOnDropHandle<()> {
490 let name = format!("process-orchestrator:{}", self.config.namespace);
491 mz_ore::task::spawn(|| name, self.run()).abort_on_drop()
492 }
493
494 async fn run(mut self) {
495 while let Some(cmd) = self.command_rx.recv().await {
496 use WorkerCommand::*;
497 let result = match cmd {
498 EnsureService { id, config } => self.ensure_service(id, config).await,
499 DropService { id } => self.drop_service(&id).await,
500 ListServices { result_tx } => {
501 let _ = result_tx.send(self.list_services().await);
502 Ok(())
503 }
504 FetchServiceMetrics { id, result_tx } => {
505 let _ = result_tx.send(self.fetch_service_metrics(&id));
506 Ok(())
507 }
508 };
509
510 if let Err(error) = result {
511 panic!("process orchestrator worker failed: {error}");
512 }
513 }
514 }
515
516 fn fetch_service_metrics(
517 &mut self,
518 id: &str,
519 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
520 let pids: Vec<_> = {
521 let services = self.services.lock().expect("lock poisoned");
522 let Some(service) = services.get(id) else {
523 bail!("unknown service {id}")
524 };
525 service.iter().map(|p| p.pid()).collect()
526 };
527
528 let mut metrics = vec![];
529 for pid in pids {
530 let (cpu_nano_cores, memory_bytes) = match pid {
531 None => (None, None),
532 Some(pid) => {
533 self.system
534 .refresh_process_specifics(pid, ProcessRefreshKind::new().with_cpu());
535 match self.system.process(pid) {
536 None => (None, None),
537 Some(process) => {
538 let cpu = u64::try_cast_from(
547 (f64::from(process.cpu_usage()) * 10_000_000.0).trunc(),
548 )
549 .expect("sane value of process.cpu_usage()");
550 let memory = process.memory();
551 (Some(cpu), Some(memory))
552 }
553 }
554 }
555 };
556 metrics.push(ServiceProcessMetrics {
557 cpu_nano_cores,
558 memory_bytes,
559 disk_bytes: None,
561 heap_bytes: None,
562 heap_limit: None,
563 });
564 }
565 Ok(metrics)
566 }
567
568 async fn ensure_service(
569 &self,
570 id: String,
571 EnsureServiceConfig {
572 image,
573 args,
574 ports: ports_in,
575 memory_limit,
576 cpu_limit,
577 scale,
578 labels,
579 disk,
580 }: EnsureServiceConfig,
581 ) -> Result<(), anyhow::Error> {
582 let full_id = self.config.full_id(&id);
583
584 let run_dir = self.config.service_run_dir(&id);
585 fs::create_dir_all(&run_dir)
586 .await
587 .context("creating run directory")?;
588 let scratch_dir = if disk {
589 let scratch_dir = self.config.service_scratch_dir(&id);
590 fs::create_dir_all(&scratch_dir)
591 .await
592 .context("creating scratch directory")?;
593 Some(fs::canonicalize(&scratch_dir).await?)
594 } else {
595 None
596 };
597
598 let old_scale = {
601 let services = self.services.lock().expect("poisoned");
602 services.get(&id).map(|states| states.len())
603 };
604 match old_scale {
605 Some(old) if old == usize::cast_from(scale) => return Ok(()),
606 Some(_) => self.drop_service(&id).await?,
607 None => (),
608 }
609
610 let mut peer_addrs = Vec::new();
612 for i in 0..scale.into() {
613 let addresses = ports_in
614 .iter()
615 .map(|port| {
616 let addr = socket_path(&run_dir, &port.name, i);
617 (port.name.clone(), addr)
618 })
619 .collect();
620 peer_addrs.push(addresses);
621 }
622
623 {
624 let mut services = self.services.lock().expect("lock poisoned");
625
626 let mut process_states = vec![];
628 for i in 0..usize::cast_from(scale) {
629 let listen_addrs = &peer_addrs[i];
630
631 let mut command_wrapper = self.config.command_wrapper.clone();
633 if let Some(parts) = command_wrapper.get_mut(1..) {
634 for part in parts {
635 *part = interpolate_command(&part[..], &full_id, listen_addrs);
636 }
637 }
638
639 let mut ports = vec![];
641 let mut tcp_proxy_addrs = BTreeMap::new();
642 for port in &ports_in {
643 let tcp_proxy_listener = match &self.config.tcp_proxy {
644 None => None,
645 Some(tcp_proxy) => {
646 let listener = StdTcpListener::bind((tcp_proxy.listen_addr, 0))
647 .with_context(|| format!("binding to {}", tcp_proxy.listen_addr))?;
648 listener.set_nonblocking(true)?;
649 let listener = TcpListener::from_std(listener)?;
650 let local_addr = listener.local_addr()?;
651 tcp_proxy_addrs.insert(port.name.clone(), local_addr);
652 Some(AddressedTcpListener {
653 listener,
654 local_addr,
655 })
656 }
657 };
658 ports.push(ServiceProcessPort {
659 name: port.name.clone(),
660 listen_addr: listen_addrs[&port.name].clone(),
661 tcp_proxy_listener,
662 });
663 }
664
665 let mut args = args(ServiceAssignments {
666 listen_addrs,
667 peer_addrs: &peer_addrs,
668 });
669 args.push(format!("--process={i}"));
670 if disk {
671 if let Some(scratch) = &scratch_dir {
672 args.push(format!("--scratch-directory={}", scratch.display()));
673 } else {
674 panic!(
675 "internal error: service requested disk but no scratch directory was configured"
676 );
677 }
678 }
679
680 let handle = mz_ore::task::spawn(
682 || format!("process-orchestrator:{full_id}-{i}"),
683 self.supervise_service_process(ServiceProcessConfig {
684 id: id.to_string(),
685 run_dir: run_dir.clone(),
686 i,
687 image: image.clone(),
688 args,
689 command_wrapper,
690 ports,
691 memory_limit,
692 cpu_limit,
693 launch_spec: self.config.launch_spec,
694 }),
695 );
696
697 process_states.push(ProcessState {
698 _handle: handle.abort_on_drop(),
699 status: ProcessStatus::NotReady,
700 status_time: Utc::now(),
701 labels: labels.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
702 tcp_proxy_addrs,
703 });
704 }
705
706 services.insert(id, process_states);
709 }
710
711 self.maybe_write_prometheus_service_discovery_file().await;
712
713 Ok(())
714 }
715
716 async fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
717 let full_id = self.config.full_id(id);
718 let run_dir = self.config.service_run_dir(id);
719 let scratch_dir = self.config.service_scratch_dir(id);
720
721 {
725 let mut supervisors = self.services.lock().expect("lock poisoned");
726 supervisors.remove(id);
727 }
728
729 if let Ok(mut entries) = fs::read_dir(&run_dir).await {
734 while let Some(entry) = entries.next_entry().await? {
735 let path = entry.path();
736 if path.extension() == Some(OsStr::new("pid")) {
737 let mut system = System::new();
738 let Some(process) = find_process_from_pid_file(&mut system, &path).await else {
739 continue;
740 };
741 let pid = process.pid();
742 info!("terminating orphaned process for {full_id} with PID {pid}");
743 process.kill();
744 }
745 }
746 }
747
748 if let Err(e) = remove_dir_all(run_dir).await {
750 if e.kind() != io::ErrorKind::NotFound {
751 warn!(
752 "error cleaning up run directory for {full_id}: {}",
753 e.display_with_causes()
754 );
755 }
756 }
757 if let Err(e) = remove_dir_all(scratch_dir).await {
758 if e.kind() != io::ErrorKind::NotFound {
759 warn!(
760 "error cleaning up scratch directory for {full_id}: {}",
761 e.display_with_causes()
762 );
763 }
764 }
765
766 self.maybe_write_prometheus_service_discovery_file().await;
767 Ok(())
768 }
769
770 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
771 let mut services = vec![];
772 let namespace_prefix = format!("{}-", self.config.namespace);
773 let mut entries = fs::read_dir(&self.config.metadata_dir).await?;
774 while let Some(entry) = entries.next_entry().await? {
775 let filename = entry
776 .file_name()
777 .into_string()
778 .map_err(|_| anyhow!("unable to convert filename to string"))?;
779 if let Some(id) = filename.strip_prefix(&namespace_prefix) {
780 services.push(id.to_string());
781 }
782 }
783 Ok(services)
784 }
785
786 fn supervise_service_process(
787 &self,
788 ServiceProcessConfig {
789 id,
790 run_dir,
791 i,
792 image,
793 args,
794 command_wrapper,
795 ports,
796 memory_limit,
797 cpu_limit,
798 launch_spec,
799 }: ServiceProcessConfig,
800 ) -> impl Future<Output = ()> + use<> {
801 let suppress_output = self.config.suppress_output;
802 let propagate_crashes = self.config.propagate_crashes;
803 let image = self.config.image_dir.join(image);
804 let pid_file = run_dir.join(format!("{i}.pid"));
805 let full_id = self.config.full_id(&id);
806
807 let state_updater = ProcessStateUpdater {
808 namespace: self.config.namespace.clone(),
809 id,
810 i,
811 services: Arc::clone(&self.services),
812 service_event_tx: self.service_event_tx.clone(),
813 };
814
815 async move {
816 let mut proxy_handles = vec![];
817 for port in ports {
818 if let Some(tcp_listener) = port.tcp_proxy_listener {
819 info!(
820 "{full_id}-{i}: {} tcp proxy listening on {}",
821 port.name, tcp_listener.local_addr,
822 );
823 let uds_path = port.listen_addr;
824 let handle = mz_ore::task::spawn(
825 || format!("{full_id}-{i}-proxy-{}", port.name),
826 tcp_proxy(TcpProxyConfig {
827 name: format!("{full_id}-{i}-{}", port.name),
828 tcp_listener,
829 uds_path: uds_path.clone(),
830 }),
831 );
832 proxy_handles.push(handle.abort_on_drop());
833 }
834 }
835
836 supervise_existing_process(&state_updater, &pid_file).await;
837
838 loop {
839 let mut cmd = launch_spec.refine_command(
840 &image,
841 &args,
842 &command_wrapper,
843 memory_limit.as_ref(),
844 cpu_limit.as_ref(),
845 );
846 info!(
847 "launching {full_id}-{i} via {} {}...",
848 cmd.as_std().get_program().to_string_lossy(),
849 cmd.as_std()
850 .get_args()
851 .map(|arg| arg.to_string_lossy())
852 .join(" ")
853 );
854 if suppress_output {
855 cmd.stdout(Stdio::null());
856 cmd.stderr(Stdio::null());
857 }
858 match spawn_process(&state_updater, cmd, &pid_file, !command_wrapper.is_empty())
859 .await
860 {
861 Ok(status) => {
862 if propagate_crashes && did_process_crash(status) {
863 panic!(
864 "{full_id}-{i} crashed; aborting because propagate_crashes is enabled"
865 );
866 }
867 error!("{full_id}-{i} exited: {:?}; relaunching in 5s", status);
868 }
869 Err(e) => {
870 error!("{full_id}-{i} failed to spawn: {}; relaunching in 5s", e);
871 }
872 };
873 state_updater.update_state(ProcessStatus::NotReady);
874 time::sleep(Duration::from_secs(5)).await;
875 }
876 }
877 }
878
879 async fn maybe_write_prometheus_service_discovery_file(&self) {
880 #[derive(Serialize)]
881 struct StaticConfig {
882 labels: BTreeMap<String, String>,
883 targets: Vec<String>,
884 }
885
886 let Some(tcp_proxy) = &self.config.tcp_proxy else {
887 return;
888 };
889 let Some(dir) = &tcp_proxy.prometheus_service_discovery_dir else {
890 return;
891 };
892
893 let mut static_configs = vec![];
894 {
895 let services = self.services.lock().expect("lock poisoned");
896 for (id, states) in &*services {
897 for (i, state) in states.iter().enumerate() {
898 for (name, addr) in &state.tcp_proxy_addrs {
899 let mut labels = btreemap! {
900 "mz_orchestrator_namespace".into() => self.config.namespace.clone(),
901 "mz_orchestrator_service_id".into() => id.clone(),
902 "mz_orchestrator_port".into() => name.clone(),
903 "mz_orchestrator_ordinal".into() => i.to_string(),
904 };
905 for (k, v) in &state.labels {
906 let k = format!("mz_orchestrator_{}", k.replace('-', "_"));
907 labels.insert(k, v.clone());
908 }
909 static_configs.push(StaticConfig {
910 labels,
911 targets: vec![addr.to_string()],
912 })
913 }
914 }
915 }
916 }
917
918 let path = dir.join(Path::new(&self.config.namespace).with_extension("json"));
919 let contents = serde_json::to_vec_pretty(&static_configs).expect("valid json");
920 if let Err(e) = fs::write(&path, &contents).await {
921 warn!(
922 "{}: failed to write prometheus service discovery file: {}",
923 self.config.namespace,
924 e.display_with_causes()
925 );
926 }
927 }
928}
929
930struct ServiceProcessConfig {
931 id: String,
932 run_dir: PathBuf,
933 i: usize,
934 image: String,
935 args: Vec<String>,
936 command_wrapper: Vec<String>,
937 ports: Vec<ServiceProcessPort>,
938 memory_limit: Option<MemoryLimit>,
939 cpu_limit: Option<CpuLimit>,
940 launch_spec: LaunchSpec,
941}
942
943struct ServiceProcessPort {
944 name: String,
945 listen_addr: String,
946 tcp_proxy_listener: Option<AddressedTcpListener>,
947}
948
949async fn supervise_existing_process(state_updater: &ProcessStateUpdater, pid_file: &Path) {
951 let name = format!(
952 "{}-{}-{}",
953 state_updater.namespace, state_updater.id, state_updater.i
954 );
955
956 let mut system = System::new();
957 let Some(process) = find_process_from_pid_file(&mut system, pid_file).await else {
958 return;
959 };
960 let pid = process.pid();
961
962 info!(%pid, "discovered existing process for {name}");
963 state_updater.update_state(ProcessStatus::Ready { pid });
964
965 let need_kill = AtomicBool::new(true);
967 defer! {
968 state_updater.update_state(ProcessStatus::NotReady);
969 if need_kill.load(Ordering::SeqCst) {
970 info!(%pid, "terminating existing process for {name}");
971 process.kill();
972 }
973 }
974
975 let mut system = System::new();
977 while system.refresh_process_specifics(pid, ProcessRefreshKind::new()) {
978 time::sleep(Duration::from_secs(5)).await;
979 }
980
981 warn!(%pid, "process for {name} has crashed; will reboot");
984 need_kill.store(false, Ordering::SeqCst)
985}
986
987fn interpolate_command(
988 command_part: &str,
989 full_id: &str,
990 ports: &BTreeMap<String, String>,
991) -> String {
992 let mut command_part = command_part.replace("%N", full_id);
993 for (endpoint, port) in ports {
994 command_part = command_part.replace(&format!("%P:{endpoint}"), port);
995 }
996 command_part
997}
998
999async fn spawn_process(
1000 state_updater: &ProcessStateUpdater,
1001 mut cmd: Command,
1002 pid_file: &Path,
1003 send_sigterm: bool,
1004) -> Result<ExitStatus, anyhow::Error> {
1005 struct KillOnDropChild(Child, bool);
1006
1007 impl Drop for KillOnDropChild {
1008 fn drop(&mut self) {
1009 if let (Some(pid), true) = (self.0.id().and_then(|id| i32::try_from(id).ok()), self.1) {
1010 let _ = nix::sys::signal::kill(
1011 nix::unistd::Pid::from_raw(pid),
1012 nix::sys::signal::Signal::SIGTERM,
1013 );
1014 tokio::task::block_in_place(|| std::thread::sleep(Duration::from_millis(500)));
1016 }
1017 let _ = self.0.start_kill();
1018 }
1019 }
1020
1021 let mut child = KillOnDropChild(cmd.spawn()?, send_sigterm);
1022
1023 let pid = Pid::from_u32(child.0.id().unwrap());
1032 write_pid_file(pid_file, pid).await?;
1033 state_updater.update_state(ProcessStatus::Ready { pid });
1034 Ok(child.0.wait().await?)
1035}
1036
1037fn did_process_crash(status: ExitStatus) -> bool {
1038 matches!(
1042 status.signal(),
1043 Some(SIGABRT | SIGBUS | SIGSEGV | SIGTRAP | SIGILL)
1044 )
1045}
1046
1047async fn write_pid_file(pid_file: &Path, pid: Pid) -> Result<(), anyhow::Error> {
1048 let mut system = System::new();
1049 system.refresh_process_specifics(pid, ProcessRefreshKind::new());
1050 let start_time = system.process(pid).map_or(0, |p| p.start_time());
1051 fs::write(pid_file, format!("{pid}\n{start_time}\n")).await?;
1052 Ok(())
1053}
1054
1055async fn find_process_from_pid_file<'a>(
1056 system: &'a mut System,
1057 pid_file: &Path,
1058) -> Option<&'a Process> {
1059 let Ok(contents) = fs::read_to_string(pid_file).await else {
1060 return None;
1061 };
1062 let lines = contents.trim().split('\n').collect::<Vec<_>>();
1063 let [pid, start_time] = lines.as_slice() else {
1064 return None;
1065 };
1066 let Ok(pid) = Pid::from_str(pid) else {
1067 return None;
1068 };
1069 let Ok(start_time) = u64::from_str(start_time) else {
1070 return None;
1071 };
1072 system.refresh_process_specifics(pid, ProcessRefreshKind::new());
1073 let process = system.process(pid)?;
1074 if process.start_time() != start_time {
1077 return None;
1078 }
1079 Some(process)
1080}
1081
1082struct TcpProxyConfig {
1083 name: String,
1084 tcp_listener: AddressedTcpListener,
1085 uds_path: String,
1086}
1087
1088async fn tcp_proxy(
1089 TcpProxyConfig {
1090 name,
1091 tcp_listener,
1092 uds_path,
1093 }: TcpProxyConfig,
1094) {
1095 let mut conns = FuturesUnordered::new();
1096 loop {
1097 select! {
1098 res = tcp_listener.listener.accept() => {
1099 debug!("{name}: accepting tcp proxy connection");
1100 let uds_path = uds_path.clone();
1101 conns.push(Box::pin(async move {
1102 let (mut tcp_conn, _) = res.context("accepting tcp connection")?;
1103 let mut uds_conn = UnixStream::connect(uds_path)
1104 .await
1105 .context("making uds connection")?;
1106 io::copy_bidirectional(&mut tcp_conn, &mut uds_conn)
1107 .await
1108 .context("proxying")
1109 }));
1110 }
1111 Some(Err(e)) = conns.next() => {
1112 warn!("{name}: tcp proxy connection failed: {}", e.display_with_causes());
1113 }
1114 }
1115 }
1116}
1117
1118struct ProcessStateUpdater {
1119 namespace: String,
1120 id: String,
1121 i: usize,
1122 services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
1123 service_event_tx: broadcast::Sender<ServiceEvent>,
1124}
1125
1126impl ProcessStateUpdater {
1127 fn update_state(&self, status: ProcessStatus) {
1128 let mut services = self.services.lock().expect("lock poisoned");
1129 let Some(process_states) = services.get_mut(&self.id) else {
1130 return;
1131 };
1132 let Some(process_state) = process_states.get_mut(self.i) else {
1133 return;
1134 };
1135 let status_time = Utc::now();
1136 process_state.status = status;
1137 process_state.status_time = status_time;
1138 let _ = self.service_event_tx.send(ServiceEvent {
1139 service_id: self.id.to_string(),
1140 process_id: u64::cast_from(self.i),
1141 status: status.into(),
1142 time: status_time,
1143 });
1144 }
1145}
1146
1147#[derive(Debug)]
1148struct ProcessState {
1149 _handle: AbortOnDropHandle<()>,
1150 status: ProcessStatus,
1151 status_time: DateTime<Utc>,
1152 labels: BTreeMap<String, String>,
1153 tcp_proxy_addrs: BTreeMap<String, SocketAddr>,
1154}
1155
1156impl ProcessState {
1157 fn pid(&self) -> Option<Pid> {
1158 match &self.status {
1159 ProcessStatus::NotReady => None,
1160 ProcessStatus::Ready { pid } => Some(*pid),
1161 }
1162 }
1163}
1164
1165#[derive(Debug, Clone, Copy)]
1166enum ProcessStatus {
1167 NotReady,
1168 Ready { pid: Pid },
1169}
1170
1171impl From<ProcessStatus> for ServiceStatus {
1172 fn from(status: ProcessStatus) -> ServiceStatus {
1173 match status {
1174 ProcessStatus::NotReady => ServiceStatus::Offline(None),
1175 ProcessStatus::Ready { .. } => ServiceStatus::Online,
1176 }
1177 }
1178}
1179
1180fn socket_path(run_dir: &Path, port: &str, process: u16) -> String {
1181 let desired = run_dir
1182 .join(format!("{port}-{process}"))
1183 .to_string_lossy()
1184 .into_owned();
1185 if UnixSocketAddr::from_pathname(&desired).is_err() {
1186 env::temp_dir()
1189 .join(hex::encode(Sha1::digest(desired)))
1190 .display()
1191 .to_string()
1192 } else {
1193 desired
1194 }
1195}
1196
1197struct AddressedTcpListener {
1198 listener: TcpListener,
1199 local_addr: SocketAddr,
1200}
1201
1202#[derive(Debug)]
1203struct ProcessService {
1204 id: String,
1205 run_dir: PathBuf,
1206 scale: NonZero<u16>,
1207 services: Arc<Mutex<BTreeMap<String, Vec<ProcessState>>>>,
1209}
1210
1211impl Service for ProcessService {
1212 fn addresses(&self, port: &str) -> Vec<String> {
1213 (0..self.scale.get())
1214 .map(|i| socket_path(&self.run_dir, port, i))
1215 .collect()
1216 }
1217
1218 fn tcp_addresses(&self, port: &str) -> Vec<String> {
1219 let guard = self.services.lock().expect("lock poisoned");
1220 let Some(states) = guard.get(&self.id) else {
1221 return Vec::new();
1223 };
1224
1225 states
1226 .iter()
1227 .filter_map(|state| state.tcp_proxy_addrs.get(port))
1228 .map(|addr| addr.to_string())
1229 .collect()
1230 }
1231}